Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-3835

TOASTed placeholder doesn't fit typed non-string fields

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Not a Bug
    • Icon: Major Major
    • None
    • None
    • postgresql-connector
    • None
    • False
    • False
    • Undefined
    • Hide
      1. Use PostgreSQL DB with default replica identity;
      2. Create character varying[] column;
      3. Insert large enough array to that column (1MB reproduces the issues);
      4. Update new row without touching array column;

      Result:

      • ERROR In connect log:
      org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type ARRAY: class java.lang.String for field: "array_column"
      	at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:245)
      	at org.apache.kafka.connect.data.Struct.put(Struct.java:216)
      	at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:265)
      	at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:141)
      	at io.debezium.relational.RelationalChangeRecordEmitter.emitUpdateRecord(RelationalChangeRecordEmitter.java:100)
      	at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:51)
      	at io.debezium.connector.postgresql.PostgresChangeRecordEmitter.emitChangeRecords(PostgresChangeRecordEmitter.java:91)
      	at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:218)
      	at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.lambda$processMessages$0(PostgresStreamingChangeEventSource.java:236)
      	at io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder.decodeUpdate(PgOutputMessageDecoder.java:442)
      	at io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder.processNotEmptyMessage(PgOutputMessageDecoder.java:182)
      	at io.debezium.connector.postgresql.connection.AbstractMessageDecoder.processMessage(AbstractMessageDecoder.java:33)
      	at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.deserializeMessages(PostgresReplicationConnection.java:493)
      	at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.readPending(PostgresReplicationConnection.java:485)
      	at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.processMessages(PostgresStreamingChangeEventSource.java:195)
      	at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:157)
      	at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:39)
      	at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:172)
      	at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:134)
      	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
      	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      	at java.base/java.lang.Thread.run(Thread.java:834)
      
      Show
      Use PostgreSQL DB with default replica identity; Create character varying[] column; Insert large enough array to that column (1MB reproduces the issues); Update new row without touching array column; Result: ERROR In connect log: org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type ARRAY: class java.lang. String for field: "array_column" at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:245) at org.apache.kafka.connect.data.Struct.put(Struct.java:216) at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:265) at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:141) at io.debezium.relational.RelationalChangeRecordEmitter.emitUpdateRecord(RelationalChangeRecordEmitter.java:100) at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:51) at io.debezium.connector.postgresql.PostgresChangeRecordEmitter.emitChangeRecords(PostgresChangeRecordEmitter.java:91) at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:218) at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.lambda$processMessages$0(PostgresStreamingChangeEventSource.java:236) at io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder.decodeUpdate(PgOutputMessageDecoder.java:442) at io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder.processNotEmptyMessage(PgOutputMessageDecoder.java:182) at io.debezium.connector.postgresql.connection.AbstractMessageDecoder.processMessage(AbstractMessageDecoder.java:33) at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.deserializeMessages(PostgresReplicationConnection.java:493) at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.readPending(PostgresReplicationConnection.java:485) at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.processMessages(PostgresStreamingChangeEventSource.java:195) at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:157) at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:39) at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:172) at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:134) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang. Thread .run( Thread .java:834)

      Using latest master of debezium-connector-postgres with Avro value converter (protobuf gives the same result) and default replica identity on DB side.
      In this case PostgreSQL doesn't output unchanged large enough TOASTed values to WAL, and Debezium writes "__debezium_unavailable_value" to corresponding fields, which leads to failures on non-string fields when using typed schema because placeholder is String.
      We may want to have different toasted.value.placeholder for different types or completely different solution to TOASTed columns handling.

            Unassigned Unassigned
            dmytro_starosud Dmytro Starosud (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

              Created:
              Updated:
              Resolved: