Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-3642

Debezium Outbox not working with CloudEventsConverter

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Done
    • Icon: Major Major
    • 2.4.0.Final
    • 1.4.2.Final, 1.5.2.Final, 1.6.0.Beta2
    • outbox
    • None
    • False
    • False
    • Undefined
    • Hide

      A minimal-(non-)working-example can be found here (it's a colleague of mine):

      https://github.com/tkurzydym/quarkus-debezium-showcase

      I tested this with version 1.4.2.Final, 1.5.2.Final and 1.6.0.Beta2 and everytime get the same result.

      Show
      A minimal-(non-)working-example can be found here (it's a colleague of mine): https://github.com/tkurzydym/quarkus-debezium-showcase I tested this with version 1.4.2.Final , 1.5.2.Final and 1.6.0.Beta2 and everytime get the same result.

      FYI: This bugticket resulted from the following Gitter thread: https://gitter.im/debezium/user?at=60cb4c3ed161a54f051e7f08

      I try to use Debezium with the Debezium-Quarkus-Outbox extension to get the Outbox Pattern working for my service. This works and I can even send multiple events on the same topics (based on the aggregatetype).

      But when I also configure the CloudEventsConverter to be used an NPE occurs:
       

      debezium-connect    | [2021-06-17 13:10:26,173] INFO Processing messages (io.debezium.connector.postgresql.PostgresStreamingChangeEventSource:180)
      debezium-connect    | [2021-06-17 13:10:26,179] INFO Message with LSN 'LSN{0/17AC560}' arrived, switching off the filtering (io.debezium.connector.postgresql.connection.WalPositionLocator:134)
      debezium-connect    | [2021-06-17 13:10:26,200] WARN no old values found for table '{ key : null, value : {"name" : "postgres_outbox_server.public.outboxevent.Value", "type" : "STRUCT", "optional" : "true", "fields" : [{"name" : "id", "index" : "0", "schema" : {"name" : "io.debezium.data.Uuid", "type" : "STRING", "optional" : "false", "version" : "1"}}, {"name" : "aggregateid", "index" : "1", "schema" : {"type" : "STRING", "optional" : "false"}}, {"name" : "aggregatetype", "index" : "2", "schema" : {"type" : "STRING", "optional" : "false"}}, {"name" : "type", "index" : "3", "schema" : {"type" : "STRING", "optional" : "false"}}, {"name" : "timestamp", "index" : "4", "schema" : {"name" : "io.debezium.time.MicroTimestamp", "type" : "INT64", "optional" : "false", "version" : "1"}}, {"name" : "payload", "index" : "5", "schema" : {"type" : "STRING", "optional" : "true"}}, {"name" : "tracingspancontext", "index" : "6", "schema" : {"type" : "STRING", "optional" : "true"}}]} }' from delete message at 'Struct{version=1.4.2.Final,connector=postgresql,name=postgres-outbox-server,ts_ms=1623935425731,db=scb-ref-quarkus,schema=public,table=outboxevent,txId=497,lsn=24826240}'; skipping record (io.debezium.connector.postgresql.PostgresChangeRecordEmitter:133)
      debezium-connect    | [2021-06-17 13:10:26,631] INFO WorkerSourceTask{id=outbound-order-management-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:478)
      debezium-connect    | [2021-06-17 13:10:26,632] INFO WorkerSourceTask{id=outbound-order-management-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:495)
      debezium-connect    | [2021-06-17 13:10:26,632] ERROR WorkerSourceTask{id=outbound-order-management-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:187)
      debezium-connect    | org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
      debezium-connect    |   at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:196)
      debezium-connect    |   at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:122)
      debezium-connect    |   at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:314)
      debezium-connect    |   at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:340)
      debezium-connect    |   at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:264)
      debezium-connect    |   at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
      debezium-connect    |   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
      debezium-connect    |   at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
      debezium-connect    |   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      debezium-connect    |   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      debezium-connect    |   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      debezium-connect    |   at java.base/java.lang.Thread.run(Thread.java:834)
      debezium-connect    | Caused by: java.lang.NullPointerException
      debezium-connect    |   at io.debezium.data.Envelope.isEnvelopeSchema(Envelope.java:378)
      debezium-connect    |   at io.debezium.data.Envelope.isEnvelopeSchema(Envelope.java:386)
      debezium-connect    |   at io.debezium.converters.CloudEventsConverter.fromConnectData(CloudEventsConverter.java:185)
      debezium-connect    |   at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63)
      debezium-connect    |   at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:314)
      debezium-connect    |   at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:146)
      debezium-connect    |   at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:180)
      debezium-connect    |   ... 11 more
      debezium-connect    | [2021-06-17 13:10:26,634] ERROR WorkerSourceTask{id=outbound-order-management-connector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:188)
      debezium-connect    | [2021-06-17 13:10:26,635] INFO Stopping down connector (io.debezium.connector.common.BaseSourceTask:192)
      debezium-connect    | [2021-06-17 13:10:26,708] INFO Connection gracefully closed (io.debezium.jdbc.JdbcConnection:945)
      debezium-connect    | [2021-06-17 13:10:26,709] INFO Finished streaming (io.debezium.pipeline.ChangeEventSourceCoordinator:141)
      debezium-connect    | [2021-06-17 13:10:26,709] INFO Connected metrics set to 'false' (io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics:60)
      debezium-connect    | [2021-06-17 13:10:26,711] INFO Connection gracefully closed (io.debezium.jdbc.JdbcConnection:945)
      

      For debezium-connect, I use a custom strimzi-kafka image (as described here https://github.com/debezium/debezium-examples/blob/master/outbox/debezium-strimzi/Dockerfile).

      This is the config for the connector:

      {
          "connector.class": "io.debezium.connector.postgresql.PostgresConnector", 
          "database.hostname": "database", 
          "database.user": "xxx", 
          "database.password": "xxx", 
          "database.dbname": "xxx", 
          "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter", 
          "tasks.max": "1", 
          "transforms": "outbox", 
          "transforms.outbox.table.fields.additional.placement": "type:header:eventType", 
          "transforms.outbox.table.field.event.timestamp": "timestamp", 
          "database.server.name": "postgres-outbox-server", 
          "transforms.outbox.route.by.field": "aggregatetype", 
          "database.port": "5432", 
          "table.whitelist": "public.OUTBOXEVENT", 
          "tombstones.on.delete": "false", 
          "transforms.outbox.route.topic.replacement": "${routedByValue}", 
          "value.converter": "io.debezium.converters.CloudEventsConverter", 
          "value.converter.serializer.type": "json", 
          "value.converter.data.serializer.type": "json", 
          "value.converter.json.schemas.enable": "false", 
          "key.converter": "org.apache.kafka.connect.json.JsonConverter", 
          "key.converter.schemas.enable": "false"
      }
      

      While debugging this issue, we found out, that the Schema was of type `STRING` and did not contain a name. With the missing name, the NPE is caused. But when we changed the logic to ignore the name, even then it failed, due to the Schema not being of type `STRUCT`.

            Unassigned Unassigned
            mkienitz Marvin Kienitz (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

              Created:
              Updated:
              Resolved: