-
Bug
-
Resolution: Done
-
Major
-
None
-
None
-
- Activate outbox transforms & Outbox Event Router
- activate the heartbeat with "heartbeat.interval.ms" configuration option
- use ByteBufferConverter to send payload as bytes to Kafka.
We're experiencing a strange behaviour with Debezium connector + PostgreSQL 9.6.11 on RDS.
We're serializing our data with the KafkaAvroSerializer class. Debezium version is 1.2.0Final and using io.debezium.converters.ByteBufferConverter as value.converter
We're able to create new records and receive events on the output topic, but, after some time we receive the following (event without creating new records):
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104) at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:290) at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:316) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:240) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.connect.errors.DataException: Invalid schema type for ByteBufferConverter: STRUCT at io.debezium.converters.ByteBufferConverter.validateSchemaType(ByteBufferConverter.java:60) at io.debezium.converters.ByteBufferConverter.fromConnectData(ByteBufferConverter.java:47) at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:62) at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:290) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) ... 11 more
Here's our config:
database.hostname: <hostname> database.port: <port> database.user: <user> database.password: <psw> database.dbname: <dbname> database.server.name: <dbname> database.whitelist: outbox table.whitelist: public.outbox heartbeat.interval.ms: 60000 slot.name: testdbztable plugin.name: wal2json transforms : outbox transforms.outbox.type: io.debezium.transforms.outbox.EventRouter transforms.outbox.route.topic.replacement: "debezium-outbox-events" transforms.outbox.table.field.event.id: uuid transforms.outbox.table.field.event.key: aggregate_id transforms.outbox.table.field.event.type: type transforms.outbox.table.field.event.payload.id: aggregate_id transforms.outbox.route.by.field: aggregate_type value.converter: io.debezium.converters.ByteBufferConverter key.converter.schemas.enable: false value.converter.schemas.enable: false
It seems that with heartbeat enabled, the underlying heartbeat message isn't handled well by the ByteBufferConverter or by the EventRouter.
Disabling the heartbeat solved the problem.