-
Bug
-
Resolution: Done
-
Minor
-
3.0.7.Final, 3.1.0.Alpha1
-
None
-
False
-
None
-
False
What Debezium connector do you use and what version?
The issue was reproducable with the Oracle, MySql and Postgres Connectors version 3.0.7.Final
What is the connector configuration?
binary.handling.mode=hex connector.class=io.debezium.connector.postgresql.PostgresConnector database.dbname=postgres database.hostname=localhost database.password=[REDACTED] database.port=1110 database.url=jdbc\:postgresql\://localhost\:1110/postgres database.user=dbzuser heartbeat.interval.ms=300000 include.unknown.datatypes=true lob.enabled=true name=postgres-mes-source offset.storage=io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore offset.storage.jdbc.password=test offset.storage.jdbc.url=jdbc\:postgresql\://localhost\:1111/test offset.storage.jdbc.user=test plugin.name=pgoutput predicates=filter-signals-predicate predicates.filter-signals-predicate.pattern=(?i)postgres-mes-source\\.dbzuser\\.debezium_signals predicates.filter-signals-predicate.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches publication.autocreate.mode=disabled schema.history.internal=io.debezium.storage.jdbc.history.JdbcSchemaHistory schema.history.internal.jdbc.password=test schema.history.internal.jdbc.url=jdbc\:postgresql\://localhost\:1111/test schema.history.internal.jdbc.user=test schema.history.internal.store.only.captured.tables.ddl=true signal.data.collection=dbzuser.debezium_signals table.include.list=fat333manual.batchrecordreport topic.prefix=postgres-mes-source transforms=filter-signals, changes, unwrap, rename-field, schema-exchange, tenant-id transforms.changes.header.changed.name=ChangedFields transforms.changes.header.unchanged.name=UnchangedFields transforms.changes.type=io.debezium.transforms.ExtractChangedRecordState transforms.filter-signals.predicate=filter-signals-predicate transforms.filter-signals.type=org.apache.kafka.connect.transforms.Filter transforms.rename-field.renames=ts_ms\:incrementaltime transforms.rename-field.replace.null.with.default=false transforms.rename-field.type=org.apache.kafka.connect.transforms.ReplaceField$Value transforms.schema-exchange.field=incrementaltime transforms.schema-exchange.schema=io.debezium.time.Timestamp transforms.schema-exchange.type=com.werum.pasx.pda.service.smt.SchemaNameTransform transforms.tenant-id.replace.null.with.default=false transforms.tenant-id.static.field=tenantid transforms.tenant-id.static.value=pda transforms.tenant-id.type=org.apache.kafka.connect.transforms.InsertField$Value transforms.unwrap.add.fields=op,ts_ms transforms.unwrap.add.fields.prefix= transforms.unwrap.delete.tombstone.handling.mode=rewrite transforms.unwrap.replace.null.with.default=false transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
What is the captured database version and mode of deployment?
Postgres 15.3 in a Docker container for testing purposes.
What behavior do you expect?
Given a configured ExtractChangedRecordState SMT
When an update event is transformed where the value of a column with a default value is changed from null to the default value or vice versa
Then that field is included in the changed header.
What behavior do you see?
The header is not populated with the changed field when either a null field with a default value is updated to that default value or a field populated with the default value is updated to null.
Do you see the same behaviour using the latest released Debezium version?
This also occurs on 3.1.0.Alpha1.
Do you have the connector logs, ideally from start till finish?
The logs are from a Java application that uses the Debezium engine directly. I have included the relevant parts of the log. But I don't think they are useful in this case. no-default-values.log
How to reproduce the issue using our tutorial deployment?
- Update tutorial/register-mysql.json to include the SMT
{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "topic.prefix": "dbserver1", "database.include.list": "inventory", "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", "schema.history.internal.kafka.topic": "schema-changes.inventory", "transforms": "changed-header", "transforms.changed-header.type": "io.debezium.transforms.ExtractChangedRecordState", "transforms.changed-header.header.changed.name": "changed" } }
- Start docker-compose-mysql.yaml with DEBEZIUM_VERSION=3.1
- Add a default value to the products table "alter table products alter weight set default 1;"
- Connect the kafka-console-consumer.sh with "
{}property print.headers=true{-}{}" and "-topic=dbserver1.inventory.products" - Manipulate the default value of a record
insert into products (name, description) values ('Foo', 'Bar'); update products set weight = null where name = 'Foo' and description = 'Bar'; update products set weight = 1 where name = 'Foo' and description = 'Bar';
The output of the consumer includes empty header for those update events. When the "weight" is changed to 2 for example, the header will include the "weight" column.
The reason for this behaviour is that the ExtractChangedRecordState SMT compares values using Struct#get(Field) instead of Struct#getWithoutDefault(String)
if (!Objects.equals(afterValue.get(field), beforeValue.get(field))) { changedNames.add(field.name()); } else { unchangedNames.add(field.name()); }
You might also notice that the "weight" in the "before" and "after" structs printed by the consumer appears to never be null. This seems to be a serialization issue. When debugging this the null values were in fact present. Whatever serializes this, probably also doesn't use Struct#getWithoutDefault(String).