Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-1141

Emit correctly-typed fallback values for replica identity DEFAULT

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Done
    • Icon: Major Major
    • 0.9.2.Final
    • 0.9.0.CR1, 0.9.0.Final, 0.9.1.Final
    • postgresql-connector
    • None
    • Hide

      1. In debug mode, start source connector for a table in which timestamp field has attribute not null and source connector config contains "time.precision.mode": "connect". Table must be in replica identity DEFAULT. Wait until first dump is complete.
      2. Update or delete a row on source table. Observe the exception thrown by Debezium.

      Example table definition:
      create table test_d (id int primary key, val numeric(20,8), created_at timestamp not null);

      Show
      1. In debug mode, start source connector for a table in which timestamp field has attribute not null and source connector config contains "time.precision.mode": "connect". Table must be in replica identity DEFAULT. Wait until first dump is complete. 2. Update or delete a row on source table. Observe the exception thrown by Debezium. Example table definition: create table test_d (id int primary key, val numeric(20,8), created_at timestamp not null);

      We have a table with no nulls in timestamptz field and a not-null constraint on the field, but we see this error:

      2019-01-31 02:10:07,255 ERROR  Postgres|our_db|records-stream-producer  Failed to properly convert data value for 'public.some_table_on_our_db.created_at' of type timestamptz for row [61415778, null, null, null, null, null, null, null, null, null, null, null, null]:   [io.debezium.relational.TableSchemaBuilder]
      org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type STRING: class java.time.OffsetDateTime for field: "created_at"
          at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:240)
          at org.apache.kafka.connect.data.Struct.put(Struct.java:216)
          at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$2(TableSchemaBuilder.java:215)
          at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:135)
          at io.debezium.connector.postgresql.RecordsStreamProducer.generateUpdateRecord(RecordsStreamProducer.java:325)
          at io.debezium.connector.postgresql.RecordsStreamProducer.process(RecordsStreamProducer.java:262)
          at io.debezium.connector.postgresql.RecordsStreamProducer.lambda$streamChanges$1(RecordsStreamProducer.java:131)
          at io.debezium.connector.postgresql.connection.wal2json.NonStreamingWal2JsonMessageDecoder.processMessage(NonStreamingWal2JsonMessageDecoder.java:62)
          at …
      

      and Debezium raises an exception:

      2019-01-31 02:10:07,257 ERROR  Postgres|our_db|records-stream-producer  unexpected exception while streaming logical changes   [io.debezium.connector.postgresql.RecordsStreamProducer]
      org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field: "created_at", schema type: STRING
          at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:220)
          at org.apache.kafka.connect.data.Struct.validate(Struct.java:233)
          at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:248)
          at org.apache.kafka.connect.data.Struct.put(Struct.java:216)
          at org.apache.kafka.connect.data.Struct.put(Struct.java:203)
          at io.debezium.data.Envelope.update(Envelope.java:280)
          at io.debezium.connector.postgresql.RecordsStreamProducer.generateUpdateRecord(RecordsStreamProducer.java:376)
          at io.debezium.connector.postgresql.RecordsStreamProducer.process(RecordsStreamProducer.java:262)
          at io.debezium.connector.postgresql.RecordsStreamProducer.lambda$streamChanges$1(RecordsStreamProducer.java:131)
          at …
      

      If you enable debug mode, you can find the incoming message:

      2019-02-05 16:16:07,455 INFO   Postgres|our_db|records-stream-producer  Message arrived for decoding {
        "xid" : 8268042083,
        "timestamp" : "2019-02-05 16:16:07.455878+00",
        "change" : [ {
          "kind" : "update",
          "schema" : "public",
          "table" : "some_table_on_our_db",
          "columnnames" : [ "id", "user_id", "mission_id", "status", "payment_id", "user_data", "completed_at", "created_at", "payment_uuid", "amount", "currency", "reference_id", "document_data" ],
          "columntypes" : [ "integer", "character varying(64)", "character varying(32)", "character varying(50)", "character varying(32)", "character varying(100)", "timestamp with time zone", "timestamp with time zone", "character varying(32)", "numeric(20,8)", "character varying(10)", "character varying(128)", "jsonb" ],
          "columnoptionals" : [ false, false, false, false, true, false, true, false, true, false, false, true, true ],
          "columnvalues" : [ 92504632, "6g0ca16dddc59e8eb0fdfdggej4c7d6g4", "some_product", "completed", null, "y6fbj6d3je50ya3684656aec24782j2c", "2019-02-05 16:16:07.453909+00", "2019-02-05 16:16:07.443553+00", "", "82.00000000", "CURRENCY", "y6fbj6d3je50ya3684656aec24782j2c", "{}" ],
          "oldkeys" : {
            "keynames" : [ "id" ],
            "keytypes" : [ "integer" ],
            "keyvalues" : [ 92504632 ]
          }
        } ]
      }   [io.debezium.connector.postgresql.connection.wal2json.NonStreamingWal2JsonMessageDecoder]
      

      We noticed that after setting REPLICA IDENTITY FULL the same error did not occur, and "oldkeys" now includes all fields:

      "oldkeys" : {
            "keynames" : [ "id", "user_id", "mission_id", "status", "user_data", "completed_at", "created_at", "payment_uuid", "amount", "currency", "reference_id", "auxiliary_data" ],
            "keytypes" : [ "integer", "character varying(64)", "character varying(32)", "character varying(50)", "character varying(100)", "timestamp with time zone", "timestamp with time zone", "character varying(32)", "numeric(20,8)", "character varying(10)", "character varying(128)", "jsonb" ],
            "keyvalues" : [ 92504632, "6g0ca16dddc59e8eb0fdfdggej4c7d6g4", "some_product", "completed", "y6fbj6d3je50ya3684656aec24782j2c", "2019-02-05 16:16:07.453909+00", "2019-02-05 16:16:07.443553+00", "", "82.00000000", "CURRENCY", "y6fbj6d3je50ya3684656aec24782j2c", "{}" ]
          }
      

      Replica identity FULL mode should be optional. In that case, the fact Debezium raises an exception if table is in replica identity DEFAULT mode is a bug.

              jpechane Jiri Pechanec
              ljodea Liam O'Dea (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

                Created:
                Updated:
                Resolved: