-
Bug
-
Resolution: Done
-
Major
-
2.1.2.Final
-
None
-
False
-
-
False
Bug report
For bug reports, provide this information, please:
What Debezium connector do you use and what version?
debezium-connector-postgres version 2.1.2.Final
What is the connector configuration?
{
"name": "postgres-source-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgresql",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "postgres",
"topic.prefix": "test_postgres",
"schema.name.adjustment.mode": "avro",
"table.include.list": "public.test_table,public.test_empty_table",
"snapshot.mode": "always",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}
}
What is the captured database version and mode of depoyment?
PostgreSQL v.15.1, Kubernetes
What behaviour do you expect?
Last record in snapshot contains:
"snapshot": "last",
What behaviour do you see?
Last record in snapshot contains:
"snapshot": "last_in_data_collection",
Do you see the same behaviour using the latest relesead Debezium version?
No
How to reproduce the issue using our tutorial deployment?
1. Create tables:
CREATE TABLE test_table ( id BIGSERIAL NOT NULL, value INTEGER, PRIMARY KEY (id, value) ); CREATE TABLE test_empty_table ( id BIGSERIAL NOT NULL, value INTEGER, PRIMARY KEY (id, value) );
2. Insert data:
INSERT INTO test_table VALUES (1, 1); INSERT INTO test_table VALUES (2, 1);
3. Create a new connector:
curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d'
{
"name": "postgres-source-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgresql",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "postgres",
"topic.prefix": "test_postgres",
"schema.name.adjustment.mode": "avro",
"table.include.list": "public.test_table,public.test_empty_table",
"snapshot.mode": "always",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}
}'
4. Check last record in topic test_postgres.public.test_table:
ER:
{
"payload": {
"before": null,
"after": {
"id": 2,
"value": 1
},
"source": {
"version": "2.1.2.Final",
"connector": "postgresql",
"name": "test_postgres",
"ts_ms": 1684139539622,
"snapshot": "last",
"db": "postgres",
"sequence": "[null,\"26183120\"]",
"schema": "public",
"table": "test_table",
"txId": 765,
"lsn": 26183120,
"xmin": null
},
"op": "r",
"ts_ms": 1684139539766,
"transaction": null
}
}
AR:
{
"payload": {
"before": null,
"after": {
"id": 2,
"value": 1
},
"source": {
"version": "2.1.2.Final",
"connector": "postgresql",
"name": "test_postgres",
"ts_ms": 1684139539622,
"snapshot": "last_in_data_collection",
"db": "postgres",
"sequence": "[null,\"26183120\"]",
"schema": "public",
"table": "test_table",
"txId": 765,
"lsn": 26183120,
"xmin": null
},
"op": "r",
"ts_ms": 1684139539766,
"transaction": null
}
}
Implementation ideas (optional)
I suggest removing snapshot record check in https://github.com/debezium/debezium/blob/2.1/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java#L485 to be the same as in main branch: https://github.com/debezium/debezium/blob/main/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java#L568