-
Bug
-
Resolution: Done
-
Major
-
0.9.0.CR1, 0.9.0.Final, 0.9.1.Final
-
None
We have a table with no nulls in timestamptz field and a not-null constraint on the field, but we see this error:
2019-01-31 02:10:07,255 ERROR Postgres|our_db|records-stream-producer Failed to properly convert data value for 'public.some_table_on_our_db.created_at' of type timestamptz for row [61415778, null, null, null, null, null, null, null, null, null, null, null, null]: [io.debezium.relational.TableSchemaBuilder] org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type STRING: class java.time.OffsetDateTime for field: "created_at" at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:240) at org.apache.kafka.connect.data.Struct.put(Struct.java:216) at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$2(TableSchemaBuilder.java:215) at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:135) at io.debezium.connector.postgresql.RecordsStreamProducer.generateUpdateRecord(RecordsStreamProducer.java:325) at io.debezium.connector.postgresql.RecordsStreamProducer.process(RecordsStreamProducer.java:262) at io.debezium.connector.postgresql.RecordsStreamProducer.lambda$streamChanges$1(RecordsStreamProducer.java:131) at io.debezium.connector.postgresql.connection.wal2json.NonStreamingWal2JsonMessageDecoder.processMessage(NonStreamingWal2JsonMessageDecoder.java:62) at …
and Debezium raises an exception:
2019-01-31 02:10:07,257 ERROR Postgres|our_db|records-stream-producer unexpected exception while streaming logical changes [io.debezium.connector.postgresql.RecordsStreamProducer] org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field: "created_at", schema type: STRING at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:220) at org.apache.kafka.connect.data.Struct.validate(Struct.java:233) at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:248) at org.apache.kafka.connect.data.Struct.put(Struct.java:216) at org.apache.kafka.connect.data.Struct.put(Struct.java:203) at io.debezium.data.Envelope.update(Envelope.java:280) at io.debezium.connector.postgresql.RecordsStreamProducer.generateUpdateRecord(RecordsStreamProducer.java:376) at io.debezium.connector.postgresql.RecordsStreamProducer.process(RecordsStreamProducer.java:262) at io.debezium.connector.postgresql.RecordsStreamProducer.lambda$streamChanges$1(RecordsStreamProducer.java:131) at …
If you enable debug mode, you can find the incoming message:
2019-02-05 16:16:07,455 INFO Postgres|our_db|records-stream-producer Message arrived for decoding { "xid" : 8268042083, "timestamp" : "2019-02-05 16:16:07.455878+00", "change" : [ { "kind" : "update", "schema" : "public", "table" : "some_table_on_our_db", "columnnames" : [ "id", "user_id", "mission_id", "status", "payment_id", "user_data", "completed_at", "created_at", "payment_uuid", "amount", "currency", "reference_id", "document_data" ], "columntypes" : [ "integer", "character varying(64)", "character varying(32)", "character varying(50)", "character varying(32)", "character varying(100)", "timestamp with time zone", "timestamp with time zone", "character varying(32)", "numeric(20,8)", "character varying(10)", "character varying(128)", "jsonb" ], "columnoptionals" : [ false, false, false, false, true, false, true, false, true, false, false, true, true ], "columnvalues" : [ 92504632, "6g0ca16dddc59e8eb0fdfdggej4c7d6g4", "some_product", "completed", null, "y6fbj6d3je50ya3684656aec24782j2c", "2019-02-05 16:16:07.453909+00", "2019-02-05 16:16:07.443553+00", "", "82.00000000", "CURRENCY", "y6fbj6d3je50ya3684656aec24782j2c", "{}" ], "oldkeys" : { "keynames" : [ "id" ], "keytypes" : [ "integer" ], "keyvalues" : [ 92504632 ] } } ] } [io.debezium.connector.postgresql.connection.wal2json.NonStreamingWal2JsonMessageDecoder]
We noticed that after setting REPLICA IDENTITY FULL the same error did not occur, and "oldkeys" now includes all fields:
"oldkeys" : { "keynames" : [ "id", "user_id", "mission_id", "status", "user_data", "completed_at", "created_at", "payment_uuid", "amount", "currency", "reference_id", "auxiliary_data" ], "keytypes" : [ "integer", "character varying(64)", "character varying(32)", "character varying(50)", "character varying(100)", "timestamp with time zone", "timestamp with time zone", "character varying(32)", "numeric(20,8)", "character varying(10)", "character varying(128)", "jsonb" ], "keyvalues" : [ 92504632, "6g0ca16dddc59e8eb0fdfdggej4c7d6g4", "some_product", "completed", "y6fbj6d3je50ya3684656aec24782j2c", "2019-02-05 16:16:07.453909+00", "2019-02-05 16:16:07.443553+00", "", "82.00000000", "CURRENCY", "y6fbj6d3je50ya3684656aec24782j2c", "{}" ] }
Replica identity FULL mode should be optional. In that case, the fact Debezium raises an exception if table is in replica identity DEFAULT mode is a bug.