-
Bug
-
Resolution: Done
-
Major
-
0.8.0.Beta1
-
None
I got the same problem as in the ticket DBZ-623 and collected an additional information on it.
When a new replication slot is created using the streaming replication protocol interface, a snapshot is also created. The snapshot building waits for all concurrent transactions to finish, so you can see the following messages in the PostgreSQL log:
2018-08-15 08:33:05.915 GMT [2842] LOG: logical decoding found initial starting point at 0/17642F0 2018-08-15 08:33:05.915 GMT [2842] DETAIL: Waiting for transactions (approximately 1) older than 565 to end.
For more details refer to snapbuild.c.
Let's try to reproduce the issue, for this you need to start the transaction (auto-commit must be disabled) in the PostgreSQL and don't finish it:
begin; insert into test_table values (1);
where test_table is any your table.
Then you need to create the PostgreSQL connector using the following command:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d' { "name": "postgresql-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "slot.name": "debezium", "database.hostname": "postgresql", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.dbname" : "postgres", "database.server.name": "test" } }'
After that, the PostgreSQL connector starts and waits for a new replication slot is created:
"pool-5-thread-3" #138 prio=5 os_prio=0 tid=0x00005555f2b81800 nid=0x32e runnable [0x00007f64348ea000] java.lang.Thread.State: RUNNABLE at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at java.net.SocketInputStream.read(SocketInputStream.java:171) at java.net.SocketInputStream.read(SocketInputStream.java:141) at org.postgresql.core.VisibleBufferedInputStream.readMore(VisibleBufferedInputStream.java:140) at org.postgresql.core.VisibleBufferedInputStream.ensureBytes(VisibleBufferedInputStream.java:109) at org.postgresql.core.VisibleBufferedInputStream.read(VisibleBufferedInputStream.java:67) at org.postgresql.core.PGStream.receiveChar(PGStream.java:282) at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:1898) at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:297) - locked <0x00000000fd683b60> (a org.postgresql.core.v3.QueryExecutorImpl) at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:428) at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:354) at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:301) at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:287) at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:264) at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:260) at org.postgresql.replication.fluent.logical.LogicalCreateSlotBuilder.make(LogicalCreateSlotBuilder.java:48) at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.initReplicationSlot(PostgresReplicationConnection.java:108) at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.<init>(PostgresReplicationConnection.java:79) at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.<init>(PostgresReplicationConnection.java:38) at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$ReplicationConnectionBuilder.build(PostgresReplicationConnection.java:362) at io.debezium.connector.postgresql.PostgresTaskContext.createReplicationConnection(PostgresTaskContext.java:63) at io.debezium.connector.postgresql.RecordsStreamProducer.<init>(RecordsStreamProducer.java:75) at io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:105) at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:45)
At this time, pg_replication_slots table contains the following row:
select slot_name, plugin, slot_type, restart_lsn, confirmed_flush_lsn from pg_replication_slots where slot_name = 'debezium';
slot_name | plugin | slot_type | restart_lsn | confirmed_flush_lsn |
---|---|---|---|---|
debezium | decoderbufs | logical | 0/17642F0 | NULL |
where confirmed_flush_lsn column has no value.
And pg_stat_activity table shows PostgreSQL JDBC Driver waits for a transaction to finish:
select application_name, wait_event_type, wait_event, state, backend_type from pg_stat_activity where backend_type = 'walsender';
application_name | wait_event_type | wait_event | state | backend_type |
---|---|---|---|---|
PostgreSQL JDBC Driver | Lock | transactionid | idle | walsender |
For more details refer to The Statistics Collector.
If you perform to restart a task of the PostgreSQL connector using the following command:
curl -X POST http://localhost:8083/connectors/postgresql-connector/tasks/0/restart
Then you can see in the Kafka Connect log the following error:
[2018-08-16T09:26:51,797][INFO] Stopping task postgresql-connector-0 [2018-08-16T09:27:01,800][ERROR] Graceful stop of task postgresql-connector-0 failed. [2018-08-16T09:27:02,631][ERROR] WorkerSourceTask{id=postgresql-connector-0} Task threw an uncaught and unrecoverable exception org.apache.kafka.connect.errors.ConnectException: Value confirmed_flush_lsn is missing from the pg_replication_slots table for slot = 'debezium', plugin = 'decoderbufs', database = 'postgres'. This is an abnormal situation and the database status should be checked. at io.debezium.connector.postgresql.connection.PostgresConnection.parseConfirmedFlushLsn(PostgresConnection.java:157) at io.debezium.connector.postgresql.connection.PostgresConnection.lambda$readReplicationSlotInfo$3(PostgresConnection.java:137) at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:446) at io.debezium.connector.postgresql.connection.PostgresConnection.readReplicationSlotInfo(PostgresConnection.java:128) at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.initReplicationSlot(PostgresReplicationConnection.java:95) at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.<init>(PostgresReplicationConnection.java:79) at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.<init>(PostgresReplicationConnection.java:38) at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$ReplicationConnectionBuilder.build(PostgresReplicationConnection.java:362) at io.debezium.connector.postgresql.PostgresTaskContext.createReplicationConnection(PostgresTaskContext.java:63) at io.debezium.connector.postgresql.RecordsStreamProducer.<init>(RecordsStreamProducer.java:75) at io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:105) at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:45)
After the transaction is completed:
commit;
the snapshot is built and confirmed_flush_lsn is filled by LSN value, PostgreSQL JDBC Driver is unlocked:
application_name | wait_event_type | wait_event | state | backend_type |
---|---|---|---|---|
PostgreSQL JDBC Driver | Client | WalSenderWaitForWAL | active | walsender |
I think you need to add the metrics to monitor the problem and add a description about the problem to the PostgreSQL connector documentation.