Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-1909

Heartbeat messages cause NPE when used with ExtractField SMT

    XMLWordPrintable

Details

    Description

      When configuring the postgres connector with the following:

      "table.whitelist": "public.my_table,public.kafka_connector_heartbeat",
      "heartbeat.action.query": "INSERT INTO kafka_connector_heartbeat DEFAULT VALUES;",
      "transforms": "key",
      "transforms.key.field": "id",
      "transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
      "heartbeat.interval.ms": "10000"
      "plugin.name": "pgoutput",
      "publication.name": "my_publication"
      

      This leads to

      2020-03-26 16:31:28,121 ERROR  ||  WorkerSourceTask{id=my-connector-worker-0} Task threw an uncaught and unrecoverable exception   [org.apache.kafka.connect.runtime.WorkerTask]
      org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
          at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
          at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:315)
          at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:240)
          at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
          at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
          at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
          at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
          at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
          at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
          at java.base/java.lang.Thread.run(Thread.java:834)
      Caused by: java.lang.NullPointerException
          at org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:61)
          at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
          ... 11 more
      2020-03-26 16:31:28,121 ERROR  ||  WorkerSourceTask{id=my-connector-worker-0} Task is being killed and will not recover until manually restarted   [org.apache.kafka.connect.runtime.WorkerTask]
      

      It would seem in this scenario that https://github.com/apache/kafka/blob/2.1/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java#L61 is to blame here as the id field is not found in the schema, this triggering NPE. Looking at later releases of Connect, this transform would actually throw an IllegalArgumentException stating "Unknown field: id" rather than a NPE.

      See discussions https://gitter.im/debezium/user?at=5e7cce6a9091af6d001ad36c

      Attachments

        Activity

          People

            Unassigned Unassigned
            ccranfor@redhat.com Chris Cranford
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: