Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-7094

ExtractNewRecordState value of optional null field which has default value

XMLWordPrintable

    • False
    • None
    • False

      Bug report

      Debezium Server Version: 2.4.0

      Postgres Version: 13

      Using SMT ExtractNewRecordState will cause the extractor not working properly in any RDBMS Connector (MySQL, PostgreSQL, etc)

      What Debezium connector do you use and what version?

      PostgresConnector

      What is the connector configuration?

       

      # Sink config
      debezium.sink.type=pubsub
      debezium.sink.pubsub.project.id=test
      
      # Postgres configdebezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
      debezium.source.database.hostname=postgres
      debezium.source.database.port=5432
      debezium.source.database.user=cdc
      debezium.source.database.password=cdc
      debezium.source.database.dbname=testing_db
      debezium.source.table.include.list=public.kdrama,public.products
      debezium.source.plugin.name=pgoutput
      debezium.source.publication.name=dbz_publication
      debezium.source.slot.name=debezium
      debezium.source.decimal.handling.mode=double
      
      # Debezium config
      debezium.source.offset.storage.file.filename=data/offsets.dat
      debezium.source.topic.prefix=cdc
      debezium.source.snapshot.mode=never
      debezium.source.tombstones.on.delete=false
      debezium.source.max.batch.size=4000
      debezium.source.max.queue.size=14000
      debezium.source.heartbeat.interval.ms=5000
      debezium.source.heartbeat.action.query=INSERT INTO debezium_heartbeat (id, ts) VALUES ('cdc', now()) ON CONFLICT (id) DO UPDATE SET ts = now();
      debezium.source.topic.heartbeat.prefix=test_heartbeat
      debezium.source.key.converter=org.apache.kafka.connect.json.JsonConverter
      debezium.source.key.converter.schemas.enable=false
      debezium.source.key.converter.replace.null.with.default=false
      debezium.source.value.converter=org.apache.kafka.connect.json.JsonConverter
      debezium.source.value.converter.schemas.enable=false
      debezium.source.value.converter.replace.null.with.default=false
      debezium.source.header.converter=org.apache.kafka.connect.json.JsonConverter
      debezium.source.header.converter.schemas.enable=false
      debezium.source.header.converter.replace.null.with.default=false
      debezium.source.offset.flush.interval.ms=0
      # SMT config
      debezium.transforms=unwrap
      debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
      debezium.transforms.unwrap.drop.tombstones=true
      debezium.transforms.unwrap.delete.handling.mode=rewrite
      debezium.transforms.unwrap.add.fields=op,ts_ms,source.ts_ms,db,schema,table,lsn,txId,snapshot
      
      # Quarkus config
      quarkus.log.level=INFO

      What behaviour do you expect?

      DDL:

      CREATE TABLE public.products (
         id VARCHAR(255) PRIMARY KEY,
         color VARCHAR(255),
         quantity FLOAT8
      );
      -- add default
      ALTER TABLE products ALTER COLUMN quantity SET  DEFAULT 1.0;

       

      Expected Output:

       

      {
         "id": "1",
         "color": "Blue",
         "quantity": null
      }

       

      What behaviour do you see?

      Actual Result:

      {
         "id": "1",
         "color": "Blue",
         "quantity": 1.0
      }

      Do you see the same behaviour using the latest relesead Debezium version?

      (Ideally, also verify with latest Alpha/Beta/CR version)

      Yes, not using SMT and only uses JSON Converter would fix the issue. But currently every SMT that uses ExtractField or InsertField class from Kafka Connect would cause this issue. 

      Do you have the connector logs, ideally from start till finish?

      (You might be asked later to provide DEBUG/TRACE level log)

      Yes, see the trace.log file, and look for Line 132-135. Those lines are the dummy data I've inserted and print the logs. You could see that the final results are showing that the "quantity" fields are changed from null to 1.0 (default value).

      How to reproduce the issue using our tutorial deployment?

      Use this DDL:

      CREATE TABLE public.products (
         id VARCHAR(255) PRIMARY KEY,
         color VARCHAR(255),
         quantity FLOAT8
      );
      -- add default
      ALTER TABLE products ALTER COLUMN quantity SET  DEFAULT 1.0;

      And insert data into it:

      INSERT INTO public.products VALUES('1', 'Blue', null);

      You should also enables the ExtractNewRecordState for the SMT, and you'll see the result is not expected.

      Feature request or enhancement

      There are 2 options, which is:

      • Raise a KIP ticket, ask to fix the bug by modifying the ExtractField and InsertField classes in kafka project.
      • Copy the ExtractField and InsertField into Debezium project, and customize it until kafka releases features to fix the bug.

      References:

      https://github.com/a0x8o/kafka/blob/master/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java#L61-L68

      https://github.com/a0x8o/kafka/blob/master/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java#L163-L195

      https://cwiki.apache.org/confluence/display/KAFKA/KIP-581%3A+Value+of+optional+null+field+which+has+default+value

        1. trace.log
          73 kB
          Eric Pangiawan

              Unassigned Unassigned
              ericpangiawan Eric Pangiawan (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              8 Start watching this issue

                Created:
                Updated:
                Resolved: