-
Bug
-
Resolution: Done
-
Major
-
2.1.2.Final
-
None
-
False
-
None
-
False
Bug report
For bug reports, provide this information, please:
What Debezium connector do you use and what version?
debezium-connector-postgres version 2.1.2.Final
What is the connector configuration?
{ "name": "postgres-source-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "postgresql", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.dbname": "postgres", "topic.prefix": "test_postgres", "schema.name.adjustment.mode": "avro", "table.include.list": "public.test_table,public.test_empty_table", "snapshot.mode": "always", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter" } }
What is the captured database version and mode of depoyment?
PostgreSQL v.15.1, Kubernetes
What behaviour do you expect?
Last record in snapshot contains:
"snapshot": "last",
What behaviour do you see?
Last record in snapshot contains:
"snapshot": "last_in_data_collection",
Do you see the same behaviour using the latest relesead Debezium version?
No
How to reproduce the issue using our tutorial deployment?
1. Create tables:
CREATE TABLE test_table ( id BIGSERIAL NOT NULL, value INTEGER, PRIMARY KEY (id, value) ); CREATE TABLE test_empty_table ( id BIGSERIAL NOT NULL, value INTEGER, PRIMARY KEY (id, value) );
2. Insert data:
INSERT INTO test_table VALUES (1, 1); INSERT INTO test_table VALUES (2, 1);
3. Create a new connector:
curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d' { "name": "postgres-source-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "postgresql", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.dbname": "postgres", "topic.prefix": "test_postgres", "schema.name.adjustment.mode": "avro", "table.include.list": "public.test_table,public.test_empty_table", "snapshot.mode": "always", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter" } }'
4. Check last record in topic test_postgres.public.test_table:
ER:
{ "payload": { "before": null, "after": { "id": 2, "value": 1 }, "source": { "version": "2.1.2.Final", "connector": "postgresql", "name": "test_postgres", "ts_ms": 1684139539622, "snapshot": "last", "db": "postgres", "sequence": "[null,\"26183120\"]", "schema": "public", "table": "test_table", "txId": 765, "lsn": 26183120, "xmin": null }, "op": "r", "ts_ms": 1684139539766, "transaction": null } }
AR:
{ "payload": { "before": null, "after": { "id": 2, "value": 1 }, "source": { "version": "2.1.2.Final", "connector": "postgresql", "name": "test_postgres", "ts_ms": 1684139539622, "snapshot": "last_in_data_collection", "db": "postgres", "sequence": "[null,\"26183120\"]", "schema": "public", "table": "test_table", "txId": 765, "lsn": 26183120, "xmin": null }, "op": "r", "ts_ms": 1684139539766, "transaction": null } }
Implementation ideas (optional)
I suggest removing snapshot record check in https://github.com/debezium/debezium/blob/2.1/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java#L485 to be the same as in main branch: https://github.com/debezium/debezium/blob/main/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java#L568