-
Bug
-
Resolution: Done
-
Major
-
1.4.2.Final, 1.5.2.Final, 1.6.0.Beta2
-
None
-
False
-
False
-
Undefined
-
-
FYI: This bugticket resulted from the following Gitter thread: https://gitter.im/debezium/user?at=60cb4c3ed161a54f051e7f08
I try to use Debezium with the Debezium-Quarkus-Outbox extension to get the Outbox Pattern working for my service. This works and I can even send multiple events on the same topics (based on the aggregatetype).
But when I also configure the CloudEventsConverter to be used an NPE occurs:
debezium-connect | [2021-06-17 13:10:26,173] INFO Processing messages (io.debezium.connector.postgresql.PostgresStreamingChangeEventSource:180) debezium-connect | [2021-06-17 13:10:26,179] INFO Message with LSN 'LSN{0/17AC560}' arrived, switching off the filtering (io.debezium.connector.postgresql.connection.WalPositionLocator:134) debezium-connect | [2021-06-17 13:10:26,200] WARN no old values found for table '{ key : null, value : {"name" : "postgres_outbox_server.public.outboxevent.Value", "type" : "STRUCT", "optional" : "true", "fields" : [{"name" : "id", "index" : "0", "schema" : {"name" : "io.debezium.data.Uuid", "type" : "STRING", "optional" : "false", "version" : "1"}}, {"name" : "aggregateid", "index" : "1", "schema" : {"type" : "STRING", "optional" : "false"}}, {"name" : "aggregatetype", "index" : "2", "schema" : {"type" : "STRING", "optional" : "false"}}, {"name" : "type", "index" : "3", "schema" : {"type" : "STRING", "optional" : "false"}}, {"name" : "timestamp", "index" : "4", "schema" : {"name" : "io.debezium.time.MicroTimestamp", "type" : "INT64", "optional" : "false", "version" : "1"}}, {"name" : "payload", "index" : "5", "schema" : {"type" : "STRING", "optional" : "true"}}, {"name" : "tracingspancontext", "index" : "6", "schema" : {"type" : "STRING", "optional" : "true"}}]} }' from delete message at 'Struct{version=1.4.2.Final,connector=postgresql,name=postgres-outbox-server,ts_ms=1623935425731,db=scb-ref-quarkus,schema=public,table=outboxevent,txId=497,lsn=24826240}'; skipping record (io.debezium.connector.postgresql.PostgresChangeRecordEmitter:133) debezium-connect | [2021-06-17 13:10:26,631] INFO WorkerSourceTask{id=outbound-order-management-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:478) debezium-connect | [2021-06-17 13:10:26,632] INFO WorkerSourceTask{id=outbound-order-management-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:495) debezium-connect | [2021-06-17 13:10:26,632] ERROR WorkerSourceTask{id=outbound-order-management-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:187) debezium-connect | org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler debezium-connect | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:196) debezium-connect | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:122) debezium-connect | at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:314) debezium-connect | at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:340) debezium-connect | at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:264) debezium-connect | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) debezium-connect | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235) debezium-connect | at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) debezium-connect | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) debezium-connect | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) debezium-connect | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) debezium-connect | at java.base/java.lang.Thread.run(Thread.java:834) debezium-connect | Caused by: java.lang.NullPointerException debezium-connect | at io.debezium.data.Envelope.isEnvelopeSchema(Envelope.java:378) debezium-connect | at io.debezium.data.Envelope.isEnvelopeSchema(Envelope.java:386) debezium-connect | at io.debezium.converters.CloudEventsConverter.fromConnectData(CloudEventsConverter.java:185) debezium-connect | at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63) debezium-connect | at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:314) debezium-connect | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:146) debezium-connect | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:180) debezium-connect | ... 11 more debezium-connect | [2021-06-17 13:10:26,634] ERROR WorkerSourceTask{id=outbound-order-management-connector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:188) debezium-connect | [2021-06-17 13:10:26,635] INFO Stopping down connector (io.debezium.connector.common.BaseSourceTask:192) debezium-connect | [2021-06-17 13:10:26,708] INFO Connection gracefully closed (io.debezium.jdbc.JdbcConnection:945) debezium-connect | [2021-06-17 13:10:26,709] INFO Finished streaming (io.debezium.pipeline.ChangeEventSourceCoordinator:141) debezium-connect | [2021-06-17 13:10:26,709] INFO Connected metrics set to 'false' (io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics:60) debezium-connect | [2021-06-17 13:10:26,711] INFO Connection gracefully closed (io.debezium.jdbc.JdbcConnection:945)
For debezium-connect, I use a custom strimzi-kafka image (as described here https://github.com/debezium/debezium-examples/blob/master/outbox/debezium-strimzi/Dockerfile).
This is the config for the connector:
{ "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "database", "database.user": "xxx", "database.password": "xxx", "database.dbname": "xxx", "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter", "tasks.max": "1", "transforms": "outbox", "transforms.outbox.table.fields.additional.placement": "type:header:eventType", "transforms.outbox.table.field.event.timestamp": "timestamp", "database.server.name": "postgres-outbox-server", "transforms.outbox.route.by.field": "aggregatetype", "database.port": "5432", "table.whitelist": "public.OUTBOXEVENT", "tombstones.on.delete": "false", "transforms.outbox.route.topic.replacement": "${routedByValue}", "value.converter": "io.debezium.converters.CloudEventsConverter", "value.converter.serializer.type": "json", "value.converter.data.serializer.type": "json", "value.converter.json.schemas.enable": "false", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false" }
While debugging this issue, we found out, that the Schema was of type `STRING` and did not contain a name. With the missing name, the NPE is caused. But when we changed the logic to ignore the name, even then it failed, due to the Schema not being of type `STRUCT`.
- links to
-
RHEA-2024:129636 Red Hat build of Debezium 2.5.4 release