-
Bug
-
Resolution: Done
-
Major
-
None
-
False
-
None
-
False
Bug report
Debezium Server Version: 2.4.0
Postgres Version: 13
Using SMT ExtractNewRecordState will cause the extractor not working properly in any RDBMS Connector (MySQL, PostgreSQL, etc)
What Debezium connector do you use and what version?
PostgresConnector
What is the connector configuration?
# Sink config debezium.sink.type=pubsub debezium.sink.pubsub.project.id=test # Postgres configdebezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector debezium.source.database.hostname=postgres debezium.source.database.port=5432 debezium.source.database.user=cdc debezium.source.database.password=cdc debezium.source.database.dbname=testing_db debezium.source.table.include.list=public.kdrama,public.products debezium.source.plugin.name=pgoutput debezium.source.publication.name=dbz_publication debezium.source.slot.name=debezium debezium.source.decimal.handling.mode=double # Debezium config debezium.source.offset.storage.file.filename=data/offsets.dat debezium.source.topic.prefix=cdc debezium.source.snapshot.mode=never debezium.source.tombstones.on.delete=false debezium.source.max.batch.size=4000 debezium.source.max.queue.size=14000 debezium.source.heartbeat.interval.ms=5000 debezium.source.heartbeat.action.query=INSERT INTO debezium_heartbeat (id, ts) VALUES ('cdc', now()) ON CONFLICT (id) DO UPDATE SET ts = now(); debezium.source.topic.heartbeat.prefix=test_heartbeat debezium.source.key.converter=org.apache.kafka.connect.json.JsonConverter debezium.source.key.converter.schemas.enable=false debezium.source.key.converter.replace.null.with.default=false debezium.source.value.converter=org.apache.kafka.connect.json.JsonConverter debezium.source.value.converter.schemas.enable=false debezium.source.value.converter.replace.null.with.default=false debezium.source.header.converter=org.apache.kafka.connect.json.JsonConverter debezium.source.header.converter.schemas.enable=false debezium.source.header.converter.replace.null.with.default=false debezium.source.offset.flush.interval.ms=0 # SMT config debezium.transforms=unwrap debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState debezium.transforms.unwrap.drop.tombstones=true debezium.transforms.unwrap.delete.handling.mode=rewrite debezium.transforms.unwrap.add.fields=op,ts_ms,source.ts_ms,db,schema,table,lsn,txId,snapshot # Quarkus config quarkus.log.level=INFO
What behaviour do you expect?
DDL:
CREATE TABLE public.products ( id VARCHAR(255) PRIMARY KEY, color VARCHAR(255), quantity FLOAT8 ); -- add default ALTER TABLE products ALTER COLUMN quantity SET DEFAULT 1.0;
Expected Output:
{ "id": "1", "color": "Blue", "quantity": null }
What behaviour do you see?
Actual Result:
{ "id": "1", "color": "Blue", "quantity": 1.0 }
Do you see the same behaviour using the latest relesead Debezium version?
(Ideally, also verify with latest Alpha/Beta/CR version)
Yes, not using SMT and only uses JSON Converter would fix the issue. But currently every SMT that uses ExtractField or InsertField class from Kafka Connect would cause this issue.
Do you have the connector logs, ideally from start till finish?
(You might be asked later to provide DEBUG/TRACE level log)
Yes, see the trace.log file, and look for Line 132-135. Those lines are the dummy data I've inserted and print the logs. You could see that the final results are showing that the "quantity" fields are changed from null to 1.0 (default value).
How to reproduce the issue using our tutorial deployment?
Use this DDL:
CREATE TABLE public.products ( id VARCHAR(255) PRIMARY KEY, color VARCHAR(255), quantity FLOAT8 ); -- add default ALTER TABLE products ALTER COLUMN quantity SET DEFAULT 1.0;
And insert data into it:
INSERT INTO public.products VALUES('1', 'Blue', null);
You should also enables the ExtractNewRecordState for the SMT, and you'll see the result is not expected.
Feature request or enhancement
There are 2 options, which is:
- Raise a KIP ticket, ask to fix the bug by modifying the ExtractField and InsertField classes in kafka project.
- Copy the ExtractField and InsertField into Debezium project, and customize it until kafka releases features to fix the bug.
References:
- links to