-
Bug
-
Resolution: Done
-
Major
-
1.1.0.Final
-
None
Taken from Gitter conversation at https://gitter.im/debezium/user?at=5e8390679a1e483f3d353d7c.
When publish.transaction.metadata=true and using the Avro converter, the following exception is observed in the connect logs:
connect_1 | 2020-03-31 19:29:49,642 INFO || WorkerSourceTask{id=postgres-connector-0} Committing offsets [org.apache.kafka.connect.runtime.WorkerSourceTask] connect_1 | 2020-03-31 19:29:49,642 INFO || WorkerSourceTask{id=postgres-connector-0} flushing 0 outstanding messages for offset commit [org.apache.kafka.connect.runtime.WorkerSourceTask] connect_1 | 2020-03-31 19:29:49,650 INFO || WorkerSourceTask{id=postgres-connector-0} Finished commitOffsets successfully in 8 ms [org.apache.kafka.connect.runtime.WorkerSourceTask] connect_1 | 2020-03-31 19:29:49,653 ERROR || WorkerSourceTask{id=postgres-connector-0} Task threw an uncaught and unrecoverable exception [org.apache.kafka.connect.runtime.WorkerTask] connect_1 | org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler connect_1 | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178) connect_1 | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104) connect_1 | at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:290) connect_1 | at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:316) connect_1 | at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:240) connect_1 | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) connect_1 | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) connect_1 | at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) connect_1 | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) connect_1 | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) connect_1 | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) connect_1 | at java.base/java.lang.Thread.run(Thread.java:834) connect_1 | Caused by: org.apache.avro.AvroRuntimeException: Not a record: ["null",{"type":"record","name":"ConnectDefault","namespace":"io.confluent.connect.avro","fields":[{"name":"id","type":"string"},{"name":"total_order","type":"long"},{"name":"data_collection_order","type":"long"}]}] connect_1 | at org.apache.avro.Schema.getFields(Schema.java:220) connect_1 | at org.apache.avro.data.RecordBuilderBase.<init>(RecordBuilderBase.java:54) connect_1 | at org.apache.avro.generic.GenericRecordBuilder.<init>(GenericRecordBuilder.java:38) connect_1 | at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:600) connect_1 | at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:606) connect_1 | at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:365) connect_1 | at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:77) connect_1 | at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:62) connect_1 | at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:290) connect_1 | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128) connect_1 | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) connect_1 | ... 11 more connect_1 | 2020-03-31 19:29:49,654 ERROR || WorkerSourceTask{id=postgres-connector-0} Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask]
To reproduce, use the attached docker-compose.yaml to start the necessary containers, then register the PostgreSQL connector using the following JSON:
{ "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "1", "database.hostname": "postgres", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.dbname": "postgres", "database.server.name": "dbserver1", "key.converter": "io.confluent.connect.avro.AvroConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://schema-registry:8081", "value.converter.schema.registry.url": "http://schema-registry:8081", "snapshot.mode": "never", "schema.whitelist": "inventory", "table.whitelist": "inventory.customers", "provide.transaction.metadata": "true" }
Then connect to the running postgres container:
docker exec -it postgres bash
Once in the container, run:
psql -U postgres
followed by performing the following insert into the monitored table:
INSERT INTO inventory.customers (first_name, last_name, email) values ('First', 'Last', 'myemail@host.com');