Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-4298

some data type is not working for sending signals to a Debezium connector

    XMLWordPrintable

Details

    • False
    • False

    Description

      Hi
      I tested new feature of Sending signals to a debezium connector.
      It is huge!!! and exactly what i needed.
       
      but As docs mentioned, it is incubating state. so, I report an issue what i found on test.
       
      FYI, debezium version 1.7.1
       
      when i sending signal 'execute-snapshot' for a table which has string primary key, it worked.
      but, sending signal for a table which has numeric primary key, it is not worked.
       
      error log

      [2021-11-16 10:22:22,844] INFO [pg_flow_t4|task-0] Requested 'INCREMENTAL' snapshot of data collections '[public.t_test_numeric]' (io.debezium.pipeline.signal.ExecuteSnapshot:53)
      [2021-11-16 10:22:22,848] INFO [pg_flow_t4|task-0] Incremental snapshot for table 'public.t_test_numeric' will end at position [8978943] (io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource:275)
      [2021-11-16 10:22:22,918] ERROR [pg_flow_t4|task-0] Producer failure (io.debezium.pipeline.ErrorHandler:31)
      org.apache.kafka.connect.errors.ConnectException: Error while processing event at offset \{transaction_id=null, lsn_proc=905752556728, lsn_commit=905752556728, lsn=905752556728, incremental_snapshot_maximum_key=aced0005757200135b4c6a6176612e6c616e672e4f626a6563743b90ce589f1073296c02000078700000000173720024696f2e646562657a69756d2e646174612e5370656369616c56616c7565446563696d616c00000000000000010200024c000c646563696d616c56616c75657400164c6a6176612f6d6174682f426967446563696d616c3b4c000c7370656369616c56616c75657400334c696f2f646562657a69756d2f646174612f5370656369616c56616c7565446563696d616c245370656369616c56616c75653b7870737200146a6176612e6d6174682e426967446563696d616c54c71557f981284f0300024900057363616c654c0006696e7456616c7400164c6a6176612f6d6174682f426967496e74656765723b787200106a6176612e6c616e672e4e756d62657286ac951d0b94e08b020000787000000000737200146a6176612e6d6174682e426967496e74656765728cfc9f1fa93bfb1d030006490008626974436f756e744900096269744c656e67746849001366697273744e6f6e7a65726f427974654e756d49000c6c6f776573745365744269744900067369676e756d5b00096d61676e69747564657400025b427871007e0008fffffffffffffffffffffffefffffffe00000001757200025b42acf317f8060854e00200007870000000038901ff787870, txId=113669110, incremental_snapshot_collections=public.t_test_numeric, ts_usec=1637025742915000, incremental_snapshot_primary_key=aced0005757200135b4c6a6176612e6c616e672e4f626a6563743b90ce589f1073296c02000078700000000173720024696f2e646562657a69756d2e646174612e5370656369616c56616c7565446563696d616c00000000000000010200024c000c646563696d616c56616c75657400164c6a6176612f6d6174682f426967446563696d616c3b4c000c7370656369616c56616c75657400334c696f2f646562657a69756d2f646174612f5370656369616c56616c7565446563696d616c245370656369616c56616c75653b7870737200146a6176612e6d6174682e426967446563696d616c54c71557f981284f0300024900057363616c654c0006696e7456616c7400164c6a6176612f6d6174682f426967496e74656765723b787200106a6176612e6c616e672e4e756d62657286ac951d0b94e08b020000787000000000737200146a6176612e6d6174682e426967496e74656765728cfc9f1fa93bfb1d030006490008626974436f756e744900096269744c656e67746849001366697273744e6f6e7a65726f427974654e756d49000c6c6f776573745365744269744900067369676e756d5b00096d61676e69747564657400025b427871007e0008fffffffffffffffffffffffefffffffe00000001757200025b42acf317f8060854e0020000787000000002041d787870}
              at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:255)
              at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.lambda$processMessages$0(PostgresStreamingChangeEventSource.java:246)
              at io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder.decodeInsert(PgOutputMessageDecoder.java:395)
              at io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder.processNotEmptyMessage(PgOutputMessageDecoder.java:179)
              at io.debezium.connector.postgresql.connection.AbstractMessageDecoder.processMessage(AbstractMessageDecoder.java:33)
              at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.deserializeMessages(PostgresReplicationConnection.java:493)
              at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.readPending(PostgresReplicationConnection.java:485)
              at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.processMessages(PostgresStreamingChangeEventSource.java:205)
              at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:167)
              at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:40)
              at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:166)
              at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:127)
              at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
              at java.util.concurrent.FutureTask.run(FutureTask.java:266)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
              at java.lang.Thread.run(Thread.java:748)
      Caused by: io.debezium.DebeziumException: Snapshotting of table public.t_test_numeric failed
              at io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource.createDataEventsForTable(AbstractIncrementalSnapshotChangeEventSource.java:389)
              at io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource.readChunk(AbstractIncrementalSnapshotChangeEventSource.java:279)
              at io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource.closeWindow(AbstractIncrementalSnapshotChangeEventSource.java:90)
              at io.debezium.pipeline.source.snapshot.incremental.CloseIncrementalSnapshotWindow.arrived(CloseIncrementalSnapshotWindow.java:31)
              at io.debezium.pipeline.signal.Signal.process(Signal.java:140)
              at io.debezium.pipeline.signal.Signal.process(Signal.java:184)
              at io.debezium.pipeline.EventDispatcher$2.changeRecord(EventDispatcher.java:229)
              at io.debezium.relational.RelationalChangeRecordEmitter.emitCreateRecord(RelationalChangeRecordEmitter.java:78)
              at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:46)
              at io.debezium.connector.postgresql.PostgresChangeRecordEmitter.emitChangeRecords(PostgresChangeRecordEmitter.java:93)
              at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:218)
              ... 16 more
      Caused by: org.postgresql.util.PSQLException: Can't infer the SQL type to use for an instance of io.debezium.data.SpecialValueDecimal. Use setObject() with an explicit Types value to specify the type to use.
              at org.postgresql.jdbc.PgPreparedStatement.setObject(PgPreparedStatement.java:1011)
              at io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource.readTableChunkStatement(AbstractIncrementalSnapshotChangeEventSource.java:415)
              at io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource.createDataEventsForTable(AbstractIncrementalSnapshotChangeEventSource.java:346)
              ... 26 more
      [2021-11-16 10:22:22,920] INFO [pg_flow_t4|task-0] Connection gracefully closed (io.debezium.jdbc.JdbcConnection:965)
      [2021-11-16 10:22:22,920] INFO [pg_flow_t4|task-0] Connection gracefully closed (io.debezium.jdbc.JdbcConnection:965)
      [2021-11-16 10:22:22,920] INFO [pg_flow_t4|task-0] Finished streaming (io.debezium.pipeline.ChangeEventSourceCoordinator:167)
      [2021-11-16 10:22:22,921] INFO [pg_flow_t4|task-0] Connected metrics set to 'false' (io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics:70)
      [2021-11-16 10:22:23,107] INFO [pg_flow_t4|task-0] WorkerSourceTask\{id=pg_flow_t4-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487)
      [2021-11-16 10:22:23,107] ERROR [pg_flow_t4|task-0] WorkerSourceTask\{id=pg_flow_t4-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:184)
      org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
              at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
              at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:170)
              at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:40)
              at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:166)
              at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:127)
              at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
              at java.util.concurrent.FutureTask.run(FutureTask.java:266)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
              at java.lang.Thread.run(Thread.java:748)
      Caused by: org.apache.kafka.connect.errors.ConnectException: Error while processing event at offset \{transaction_id=null, lsn_proc=905752556728, lsn_commit=905752556728, lsn=905752556728, incremental_snapshot_maximum_key=aced0005757200135b4c6a6176612e6c616e672e4f626a6563743b90ce589f1073296c02000078700000000173720024696f2e646562657a69756d2e646174612e5370656369616c56616c7565446563696d616c00000000000000010200024c000c646563696d616c56616c75657400164c6a6176612f6d6174682f426967446563696d616c3b4c000c7370656369616c56616c75657400334c696f2f646562657a69756d2f646174612f5370656369616c56616c7565446563696d616c245370656369616c56616c75653b7870737200146a6176612e6d6174682e426967446563696d616c54c71557f981284f0300024900057363616c654c0006696e7456616c7400164c6a6176612f6d6174682f426967496e74656765723b787200106a6176612e6c616e672e4e756d62657286ac951d0b94e08b020000787000000000737200146a6176612e6d6174682e426967496e74656765728cfc9f1fa93bfb1d030006490008626974436f756e744900096269744c656e67746849001366697273744e6f6e7a65726f427974654e756d49000c6c6f776573745365744269744900067369676e756d5b00096d61676e69747564657400025b427871007e0008fffffffffffffffffffffffefffffffe00000001757200025b42acf317f8060854e00200007870000000038901ff787870, txId=113669110, incremental_snapshot_collections=public.t_test_numeric, ts_usec=1637025742915000, incremental_snapshot_primary_key=aced0005757200135b4c6a6176612e6c616e672e4f626a6563743b90ce589f1073296c02000078700000000173720024696f2e646562657a69756d2e646174612e5370656369616c56616c7565446563696d616c00000000000000010200024c000c646563696d616c56616c75657400164c6a6176612f6d6174682f426967446563696d616c3b4c000c7370656369616c56616c75657400334c696f2f646562657a69756d2f646174612f5370656369616c56616c7565446563696d616c245370656369616c56616c75653b7870737200146a6176612e6d6174682e426967446563696d616c54c71557f981284f0300024900057363616c654c0006696e7456616c7400164c6a6176612f6d6174682f426967496e74656765723b787200106a6176612e6c616e672e4e756d62657286ac951d0b94e08b020000787000000000737200146a6176612e6d6174682e426967496e74656765728cfc9f1fa93bfb1d030006490008626974436f756e744900096269744c656e67746849001366697273744e6f6e7a65726f427974654e756d49000c6c6f776573745365744269744900067369676e756d5b00096d61676e69747564657400025b427871007e0008fffffffffffffffffffffffefffffffe00000001757200025b42acf317f8060854e0020000787000000002041d787870}
              at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:255)
              at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.lambda$processMessages$0(PostgresStreamingChangeEventSource.java:246)
              at io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder.decodeInsert(PgOutputMessageDecoder.java:395)
              at io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder.processNotEmptyMessage(PgOutputMessageDecoder.java:179)
              at io.debezium.connector.postgresql.connection.AbstractMessageDecoder.processMessage(AbstractMessageDecoder.java:33)
              at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.deserializeMessages(PostgresReplicationConnection.java:493)
              at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.readPending(PostgresReplicationConnection.java:485)
              at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.processMessages(PostgresStreamingChangeEventSource.java:205)
              at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:167)
              ... 8 more
      Caused by: io.debezium.DebeziumException: Snapshotting of table public.t_test_numeric failed
              at io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource.createDataEventsForTable(AbstractIncrementalSnapshotChangeEventSource.java:389)
              at io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource.readChunk(AbstractIncrementalSnapshotChangeEventSource.java:279)
              at io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource.closeWindow(AbstractIncrementalSnapshotChangeEventSource.java:90)
              at io.debezium.pipeline.source.snapshot.incremental.CloseIncrementalSnapshotWindow.arrived(CloseIncrementalSnapshotWindow.java:31)
              at io.debezium.pipeline.signal.Signal.process(Signal.java:140)
              at io.debezium.pipeline.signal.Signal.process(Signal.java:184)
              at io.debezium.pipeline.EventDispatcher$2.changeRecord(EventDispatcher.java:229)
              at io.debezium.relational.RelationalChangeRecordEmitter.emitCreateRecord(RelationalChangeRecordEmitter.java:78)
              at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:46)
              at io.debezium.connector.postgresql.PostgresChangeRecordEmitter.emitChangeRecords(PostgresChangeRecordEmitter.java:93)
              at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:218)
              ... 16 more
      Caused by: org.postgresql.util.PSQLException: Can't infer the SQL type to use for an instance of io.debezium.data.SpecialValueDecimal. Use setObject() with an explicit Types value to specify the type to use.
              at org.postgresql.jdbc.PgPreparedStatement.setObject(PgPreparedStatement.java:1011)
              at io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource.readTableChunkStatement(AbstractIncrementalSnapshotChangeEventSource.java:415)
              at io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource.createDataEventsForTable(AbstractIncrementalSnapshotChangeEventSource.java:346)
              ... 26 more
      

       
      Thanks.

      Attachments

        Activity

          People

            jpechane Jiri Pechanec
            chu1070y@naver.com Steven Chu
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: