-
Bug
-
Resolution: Done
-
Major
-
None
-
None
-
- Activate outbox transforms & Outbox Event Router
- activate the heartbeat with "heartbeat.interval.ms" configuration option
- use ByteBufferConverter to send payload as bytes to Kafka.
We're experiencing a strange behaviour with Debezium connector + PostgreSQL 9.6.11 on RDS.
We're serializing our data with the KafkaAvroSerializer class. Debezium version is 1.2.0Final and using io.debezium.converters.ByteBufferConverter as value.converter
We're able to create new records and receive events on the output topic, but, after some time we receive the following (event without creating new records):
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104) at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:290) at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:316) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:240) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.connect.errors.DataException: Invalid schema type for ByteBufferConverter: STRUCT at io.debezium.converters.ByteBufferConverter.validateSchemaType(ByteBufferConverter.java:60) at io.debezium.converters.ByteBufferConverter.fromConnectData(ByteBufferConverter.java:47) at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:62) at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:290) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) ... 11 more
Here's our config:
database.hostname: <hostname>
database.port: <port>
database.user: <user>
database.password: <psw>
database.dbname: <dbname>
database.server.name: <dbname>
database.whitelist: outbox
table.whitelist: public.outbox
heartbeat.interval.ms: 60000
slot.name: testdbztable
plugin.name: wal2json
transforms : outbox
transforms.outbox.type: io.debezium.transforms.outbox.EventRouter
transforms.outbox.route.topic.replacement: "debezium-outbox-events"
transforms.outbox.table.field.event.id: uuid
transforms.outbox.table.field.event.key: aggregate_id
transforms.outbox.table.field.event.type: type
transforms.outbox.table.field.event.payload.id: aggregate_id
transforms.outbox.route.by.field: aggregate_type
value.converter: io.debezium.converters.ByteBufferConverter
key.converter.schemas.enable: false
value.converter.schemas.enable: false
It seems that with heartbeat enabled, the underlying heartbeat message isn't handled well by the ByteBufferConverter or by the EventRouter.
Disabling the heartbeat solved the problem.