-
Bug
-
Resolution: Done
-
Critical
-
None
-
None
-
False
-
None
-
False
-
-
Similar to DBZ-5915, when postgres connector restarts, it can skip one event when iterating over WAL. The root cause is same as in the DBZ-5915 - same LSN of different operations. Fix for the DBZ-5915 considered only situation when BEGIN and subsequent INSERT operations have the same LSN. However, predecessor transaction COMMIT can have the same LSN as well so the condition introduced in b03b4cdd1994 (lastProcessedMessageType == null || lastProcessedMessageType == Operation.BEGIN) is insufficient as the lastProcessedMessageType can be also of type Operation.COMMIT.
Here is the digest of the trace log where the first event after the Connect restart was lost (note: 33990816 is 206A8A0):
2023-03-09 21:40:53,159 DEBUG || Committing offset '{server=test}' for partition '{transaction_id=765, lsn_proc=33990816, messageType=COMMIT, lsn_commit=33990816, lsn=33990816, txId=765, ts_usec=1678398052764031, transaction_data_collection_order_public.table_1=1}' [io.debezium.connector.common.BaseSourceTask] 2023-03-09 21:40:53,159 DEBUG || Received offset commit request on commit LSN 'LSN{0/206A8A0}' and change LSN 'LSN{0/206A8A0}' [io.debezium.connector.postgresql.PostgresStreamingChangeEventSource] 2023-03-09 21:40:53,159 DEBUG || Flushing LSN to server: LSN{0/206A8A0} [io.debezium.connector.postgresql.PostgresStreamingChangeEventSource] 2023-03-09 21:40:53,160 INFO || Stopping down connector [io.debezium.connector.common.BaseSourceTask] [kafka connect restart] 2023-03-09 21:40:59,245 TRACE Postgres|test|streaming Streaming requested from LSN LSN{0/206A8A0}, received LSN LSN{0/206A8A0} [io.debezium.connector.postgresql.connection.PostgresReplicationConnection] [...] 2023-03-09 21:40:59,246 TRACE Postgres|test|streaming Processing LSN 'LSN{0/206A8A0}', operation 'BEGIN' [io.debezium.connector.postgresql.connection.WalPositionLocator] [...] 2023-03-09 21:40:59,254 TRACE Postgres|test|streaming Streaming requested from LSN LSN{0/206A8A0}, received LSN LSN{0/206A8A0} [io.debezium.connector.postgresql.connection.PostgresReplicationConnection] [...] 2023-03-09 21:40:59,265 TRACE Postgres|test|streaming Processing LSN 'LSN{0/206A8A0}', operation 'INSERT' [io.debezium.connector.postgresql.connection.WalPositionLocator] [...] 2023-03-09 21:40:59,265 TRACE Postgres|test|streaming Streaming requested from LSN LSN{0/206A8A0}, received LSN LSN{0/206A950} [io.debezium.connector.postgresql.connection.PostgresReplicationConnection] [...] 2023-03-09 21:40:59,265 TRACE Postgres|test|streaming Processing LSN 'LSN{0/206A950}', operation 'COMMIT' [io.debezium.connector.postgresql.connection.WalPositionLocator] 2023-03-09 21:40:59,265 INFO Postgres|test|streaming LSN after last stored change LSN 'LSN{0/206A950}' received [io.debezium.connector.postgresql.connection.WalPositionLocator]
Reproducer was found by Chad Daksha and Challen HB (see this Zulip topic, for completeness placing it also here):
docker-compose.yml
version: '2' services: zookeeper: image: quay.io/debezium/zookeeper:2.2.0.Alpha2 ports: - 2181:2181 - 2888:2888 - 3888:3888 kafka: image: quay.io/debezium/kafka:2.2 ports: - 9092:9092 links: - zookeeper environment: - ZOOKEEPER_CONNECT=zookeeper:2181 postgres: image: quay.io/debezium/example-postgres:2.2.0.Alpha2 ports: - 5432:5432 environment: - POSTGRES_USER=postgres - POSTGRES_PASSWORD=postgres connect: image: quay.io/debezium/connect:2.2.0.Alpha2 ports: - 8083:8083 links: - kafka - postgres environment: - BOOTSTRAP_SERVERS=kafka:9092 - GROUP_ID=1 - CONFIG_STORAGE_TOPIC=my_connect_configs - OFFSET_STORAGE_TOPIC=my_connect_offsets - STATUS_STORAGE_TOPIC=my_connect_statuses - LOG_LEVEL=TRACE
reproduce.sh
#!/bin/bash docker-compose up -d sleep 10 docker exec pg_data_loss_postgres_1 sh -c "psql -U postgres -c \"CREATE TABLE IF NOT EXISTS table_1 (id serial primary key, v text)\"" docker exec pg_data_loss_postgres_1 sh -c "psql -U postgres -c \"ALTER TABLE table_1 REPLICA IDENTITY FULL\"" curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d ' { "name": "my-test-ps-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "pg_data_loss_postgres_1", "database.dbname": "postgres", "database.user": "postgres", "database.password": "postgres", "plugin.name": "pgoutput", "snapshot.mode": "never", "provide.transaction.metadata": true, "transaction.topic": "t.transaction", "topic.prefix": "test" } }' curl -s -X PUT -H "Content-Type:application/json" http://localhost:8083/admin/loggers/io.debezium.connector.postgresql -d '{"level": "DEBUG"}' for i in {1..10} do docker exec pg_data_loss_postgres_1 sh -c "psql -U postgres -c \"INSERT INTO table_1 (v) VALUES ('text') \"" done docker stop pg_data_loss_connect_1 docker start pg_data_loss_connect_1 for i in {1..10} do docker exec pg_data_loss_postgres_1 sh -c "psql -U postgres -c \"INSERT INTO table_1 (v) VALUES ('text') \"" done