Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-8721

ExtractChangedRecordState SMT Now Working With Default Values

XMLWordPrintable

      What Debezium connector do you use and what version?

      The issue was reproducable with the Oracle, MySql and Postgres Connectors version 3.0.7.Final

      What is the connector configuration?

      binary.handling.mode=hex
      connector.class=io.debezium.connector.postgresql.PostgresConnector
      database.dbname=postgres
      database.hostname=localhost
      database.password=[REDACTED]
      database.port=1110
      database.url=jdbc\:postgresql\://localhost\:1110/postgres
      database.user=dbzuser
      heartbeat.interval.ms=300000
      include.unknown.datatypes=true
      lob.enabled=true
      name=postgres-mes-source
      offset.storage=io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore
      offset.storage.jdbc.password=test
      offset.storage.jdbc.url=jdbc\:postgresql\://localhost\:1111/test
      offset.storage.jdbc.user=test
      plugin.name=pgoutput
      predicates=filter-signals-predicate
      predicates.filter-signals-predicate.pattern=(?i)postgres-mes-source\\.dbzuser\\.debezium_signals
      predicates.filter-signals-predicate.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
      publication.autocreate.mode=disabled
      schema.history.internal=io.debezium.storage.jdbc.history.JdbcSchemaHistory
      schema.history.internal.jdbc.password=test
      schema.history.internal.jdbc.url=jdbc\:postgresql\://localhost\:1111/test
      schema.history.internal.jdbc.user=test
      schema.history.internal.store.only.captured.tables.ddl=true
      signal.data.collection=dbzuser.debezium_signals
      table.include.list=fat333manual.batchrecordreport
      topic.prefix=postgres-mes-source
      transforms=filter-signals, changes, unwrap, rename-field, schema-exchange, tenant-id
      transforms.changes.header.changed.name=ChangedFields
      transforms.changes.header.unchanged.name=UnchangedFields
      transforms.changes.type=io.debezium.transforms.ExtractChangedRecordState
      transforms.filter-signals.predicate=filter-signals-predicate
      transforms.filter-signals.type=org.apache.kafka.connect.transforms.Filter
      transforms.rename-field.renames=ts_ms\:incrementaltime
      transforms.rename-field.replace.null.with.default=false
      transforms.rename-field.type=org.apache.kafka.connect.transforms.ReplaceField$Value
      transforms.schema-exchange.field=incrementaltime
      transforms.schema-exchange.schema=io.debezium.time.Timestamp
      transforms.schema-exchange.type=com.werum.pasx.pda.service.smt.SchemaNameTransform
      transforms.tenant-id.replace.null.with.default=false
      transforms.tenant-id.static.field=tenantid
      transforms.tenant-id.static.value=pda
      transforms.tenant-id.type=org.apache.kafka.connect.transforms.InsertField$Value
      transforms.unwrap.add.fields=op,ts_ms
      transforms.unwrap.add.fields.prefix=
      transforms.unwrap.delete.tombstone.handling.mode=rewrite
      transforms.unwrap.replace.null.with.default=false
      transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
      

       

      What is the captured database version and mode of deployment?

      Postgres 15.3 in a Docker container for testing purposes.

      What behavior do you expect?

      Given a configured ExtractChangedRecordState SMT
      When an update event is transformed where the value of a column with a default value is changed from null to the default value or vice versa
      Then that field is included in the changed header.

      What behavior do you see?

      The header is not populated with the changed field when either a null field with a default value is updated to that default value or a field populated with the default value is updated to null.

      Do you see the same behaviour using the latest released Debezium version?

      This also occurs on 3.1.0.Alpha1.

      Do you have the connector logs, ideally from start till finish?

      The logs are from a Java application that uses the Debezium engine directly. I have included the relevant parts of the log. But I don't think they are useful in this case. no-default-values.log

      How to reproduce the issue using our tutorial deployment?

      1. Update tutorial/register-mysql.json to include the SMT
        {
            "name": "inventory-connector",
            "config": {
                "connector.class": "io.debezium.connector.mysql.MySqlConnector",
                "tasks.max": "1",
                "database.hostname": "mysql",
                "database.port": "3306",
                "database.user": "debezium",
                "database.password": "dbz",
                "database.server.id": "184054",
                "topic.prefix": "dbserver1",
                "database.include.list": "inventory",
                "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
                "schema.history.internal.kafka.topic": "schema-changes.inventory",
                "transforms": "changed-header",
                "transforms.changed-header.type": "io.debezium.transforms.ExtractChangedRecordState",
                "transforms.changed-header.header.changed.name": "changed"
            }
        }
      1. Start docker-compose-mysql.yaml with DEBEZIUM_VERSION=3.1
      2. Add a default value to the products table "alter table products alter weight set default 1;"
      3. Connect the kafka-console-consumer.sh with "{}property print.headers=true{-}{}" and "-topic=dbserver1.inventory.products"
      4. Manipulate the default value of a record
        insert into products (name, description) values ('Foo', 'Bar');
        update products set weight = null where name = 'Foo' and description = 'Bar';
        update products set weight = 1 where name = 'Foo' and description = 'Bar';

      The output of the consumer includes empty header for those update events. When the "weight" is changed to 2 for example, the header will include the "weight" column.

      The reason for this behaviour is that the ExtractChangedRecordState SMT compares values using Struct#get(Field) instead of Struct#getWithoutDefault(String)

      if (!Objects.equals(afterValue.get(field), beforeValue.get(field))) {
          changedNames.add(field.name());
      } else {
          unchangedNames.add(field.name());
      } 

      You might also notice that the "weight" in the "before" and "after" structs printed by the consumer appears to never be null. This seems to be a serialization issue. When debugging this the null values were in fact present. Whatever serializes this, probably also doesn't use Struct#getWithoutDefault(String).

        1. no-default-values.log
          19 kB
          Yannick Eisenschmidt

              Unassigned Unassigned
              y.eisenschmidt_koerber Yannick Eisenschmidt
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

                Created:
                Updated:
                Resolved: