Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-1915

AvroRuntimeException when publishing transaction metadata

XMLWordPrintable

      Taken from Gitter conversation at https://gitter.im/debezium/user?at=5e8390679a1e483f3d353d7c.

      When publish.transaction.metadata=true and using the Avro converter, the following exception is observed in the connect logs:

      connect_1          | 2020-03-31 19:29:49,642 INFO   ||  WorkerSourceTask{id=postgres-connector-0} Committing offsets   [org.apache.kafka.connect.runtime.WorkerSourceTask]
      connect_1          | 2020-03-31 19:29:49,642 INFO   ||  WorkerSourceTask{id=postgres-connector-0} flushing 0 outstanding messages for offset commit   [org.apache.kafka.connect.runtime.WorkerSourceTask]
      connect_1          | 2020-03-31 19:29:49,650 INFO   ||  WorkerSourceTask{id=postgres-connector-0} Finished commitOffsets successfully in 8 ms   [org.apache.kafka.connect.runtime.WorkerSourceTask]
      connect_1          | 2020-03-31 19:29:49,653 ERROR  ||  WorkerSourceTask{id=postgres-connector-0} Task threw an uncaught and unrecoverable exception   [org.apache.kafka.connect.runtime.WorkerTask]
      connect_1          | org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
      connect_1          |    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
      connect_1          |    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
      connect_1          |    at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:290)
      connect_1          |    at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:316)
      connect_1          |    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:240)
      connect_1          |    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
      connect_1          |    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
      connect_1          |    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
      connect_1          |    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      connect_1          |    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      connect_1          |    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      connect_1          |    at java.base/java.lang.Thread.run(Thread.java:834)
      connect_1          | Caused by: org.apache.avro.AvroRuntimeException: Not a record: ["null",{"type":"record","name":"ConnectDefault","namespace":"io.confluent.connect.avro","fields":[{"name":"id","type":"string"},{"name":"total_order","type":"long"},{"name":"data_collection_order","type":"long"}]}]
      connect_1          |    at org.apache.avro.Schema.getFields(Schema.java:220)
      connect_1          |    at org.apache.avro.data.RecordBuilderBase.<init>(RecordBuilderBase.java:54)
      connect_1          |    at org.apache.avro.generic.GenericRecordBuilder.<init>(GenericRecordBuilder.java:38)
      connect_1          |    at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:600)
      connect_1          |    at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:606)
      connect_1          |    at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:365)
      connect_1          |    at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:77)
      connect_1          |    at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:62)
      connect_1          |    at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:290)
      connect_1          |    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
      connect_1          |    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
      connect_1          |    ... 11 more
      connect_1          | 2020-03-31 19:29:49,654 ERROR  ||  WorkerSourceTask{id=postgres-connector-0} Task is being killed and will not recover until manually restarted   [org.apache.kafka.connect.runtime.WorkerTask]
      

      To reproduce, use the attached docker-compose.yaml to start the necessary containers, then register the PostgreSQL connector using the following JSON:

      {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "database.hostname": "postgres",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "postgres",
        "database.dbname": "postgres",
        "database.server.name": "dbserver1",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://schema-registry:8081",
        "value.converter.schema.registry.url": "http://schema-registry:8081",
        "snapshot.mode": "never",
        "schema.whitelist": "inventory",
        "table.whitelist": "inventory.customers",
        "provide.transaction.metadata": "true"
      }
      

      Then connect to the running postgres container:

      docker exec -it postgres bash
      

      Once in the container, run:

      psql -U postgres
      

      followed by performing the following insert into the monitored table:

      INSERT INTO inventory.customers (first_name, last_name, email) values ('First', 'Last', 'myemail@host.com');
      

            jpechane Jiri Pechanec
            ccranfor@redhat.com Chris Cranford
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

              Created:
              Updated:
              Resolved: