Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-5166

ExtractNewRecordState SMT Replaces Null Value with Column's Default Value


      Bug report

      What Debezium connector do you use and what version?

      Debezium MySQL Connector 1.8.0.Final

      What is the connector configuration?


          "connector.class": "io.debezium.connector.mysql.MySqlConnector",
          "tasks.max": "1",
          "poll.interval.ms": "1000",
          "max.batch.size": "100000",
          "max.queue.size": "400000",
          "database.hostname": <mysql_server_name>,
          "database.port": <mysql_server_port>,
          "database.user": <debezium_user>,
          "database.password": <debezium_password>,
          "database.server.id": "112233",
          "database.server.name": "mysql_test",
          "table.include.list": "test.dbz_test",
          "snapshot.mode": "schema_only",
          "time.precision.mode": "connect",
          "include.query": "false",
          "include.schema.changes": "false",
          "tombstones.on.delete": "false",
          "transforms": "unwrap",
          "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
          "transforms.unwrap.drop.tombstones": "true",
          "transforms.unwrap.delete.handling.mode": "rewrite",
          "transforms.unwrap.add.fields": "op,source.ts_ms",
          "database.history.kafka.bootstrap.servers": <kafka_hosts>,
          "database.history.kafka.topic": "dbz-mysql_test-history",
          "key.converter": "io.confluent.connect.avro.AvroConverter",
          "key.converter.schema.registry.url": <schema_registry_url>,
          "value.converter": "io.confluent.connect.avro.AvroConverter",
          "value.converter.schema.registry.url": <schema_registry_url>


      What is the captured database version and mode of depoyment?

      MySQL Version: 5.7.32-35-log

      Deployment: on-premises

      What behaviour do you expect?

      Given the following table DDL:

      CREATE TABLE test.dbz_test (
         name VARCHAR(50) DEFAULT 'default_str',
        PRIMARY KEY (id)


      When inserting explicit NULL value to name column. Example:

      INSERT INTO test.dbz_test(name) VALUES(null); 


      I expect the record's value in Kafka to be:

          "id": 1,
          "name": null,
          "__op": "c",
          "__source_ts_ms": 1653497708000,
          "__deleted": "false"


      Since the actual value of "name" in the DB is null.

      What behaviour do you see?

      The record's value in Kafka is:

          "id": 1,
          "name": "default_str",
          "__op": "c",
          "__source_ts_ms": 1653497708000,
          "__deleted": "false"


      I.e: the null value was replaced by the column's default value!

      Do you see the same behaviour using the latest relesead Debezium version?

      I believe so - the relevant source code hasn't been changed.

      I see the problematic lines in the latest release as well.

      Do you have the connector logs, ideally from start till finish?


      How to reproduce the issue using our tutorial deployment?


      Implementation/Fix ideas

      I actually found the root cause of the bug:

      In this line:

      updatedValue.put(field.name(), value.get(field)); 

      The Struct get method replaces the null value with the field's default value.

      To fix this undesired behavior the getWithoutDefault method should be used instead:

      updatedValue.put(field.name(), value.getWithoutDefault(field.name())); 

      There are a few more potential "buggy" lines in the SMT's code which can easily be fixed in the same way..


      How do I create a pull request to fix this bug?

      I already tested my fix and it seems to do the trick (:

            Unassigned Unassigned
            rotem-ad Rotem Adhoh (Inactive)
            0 Vote for this issue
            4 Start watching this issue