Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-6328

event.processing​.failure.handling.mode = warn does not skip data

    XMLWordPrintable

Details

    • False
    • None
    • False
    • Important

    Description

       

      Bug report

      I have configured following setting on kafka connector to skip unparsable events but seems to be it is not working. Error occurred while JSON expansion and kafka task is failed.

      event.processing.failure.handling.mode= warn 

      What Debezium connector do you use and what version?

      Debezium connector version : 1.9.5_Final , Postgres connector

      What is the connector configuration?

      config:
          database.dbname: main
          database.hostname: HOST
          database.password: PASSWORD
          database.port: 5432
          database.server.name: server-0
          database.user: main
          decimal.handling.mode: double
          heartbeat.action.query: INSERT INTO kafka_connect_heartbeat (id, last_heartbeat_time)
            VALUES (1, NOW() at time zone 'utc') ON CONFLICT(id) DO UPDATE SET last_heartbeat_time=EXCLUDED.last_heartbeat_time;
          heartbeat.interval.ms: "10000"
          heartbeat.topics.prefix: debezium-heartbeat
          heartbeat.writeback.enabled: "true"
          heartbeat.writeback.table: public.kafka_connect_heartbeat
          plugin.name: pgoutput
          publication.autocreate.mode: filtered
          publication.name: uno_main_0
          slot.name: uno_main_0
          table.include.list: public.outbox_wal
          table.whitelist: public.outbox_wal
          tasksMax: 1
          transforms: sample
          transforms.sample.route.topic.replacement: ${routedByValue}
          transforms.sample.table.expand.json.payload: "true"
          transforms.sample.table.fields.additional.placement: type:header:eventType,id:header:messageId
          transforms.sample.type: io.debezium.transforms.outbox.EventRouter
          event.processing.failure.handling.mode= warn
          

      What is the captured database version and mode of deployment?

      DB : AWS Postgres RDS 12.11 (Managed Service)

      What behaviour do you see?

      • In case of error while JSON expansion, Kafka tasks fails and thus stop processing further messages. 
      • Error occurred when using scientific notation in JSON.

      Do you see the same behaviour using the latest released Debezium version?

      Not Tested on latest version

      Do you have the connector logs, ideally from start till finish?

      2023-04-11 10:41:06,151 WARN [task-0] JSON expansion failed (io.debezium.transforms.outbox.EventRouterDelegate) [task-thread-main-0-0]
      org.apache.kafka.connect.errors.DataException: Invalid Java object for schema with type INT32: class java.lang.Double for field: "transferQuantity"
          at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:242)
          at org.apache.kafka.connect.data.Struct.put(Struct.java:216)
          at org.apache.kafka.connect.data.Struct.put(Struct.java:203)
          at io.debezium.transforms.outbox.StructBuilderUtil.jsonNodeToStructInternal(StructBuilderUtil.java:43)
          at io.debezium.transforms.outbox.StructBuilderUtil.getStructFieldValue(StructBuilderUtil.java:73)
          at io.debezium.transforms.outbox.StructBuilderUtil.getArrayAsList(StructBuilderUtil.java:83)
          at io.debezium.transforms.outbox.StructBuilderUtil.getStructFieldValue(StructBuilderUtil.java:71)
          at io.debezium.transforms.outbox.StructBuilderUtil.jsonNodeToStructInternal(StructBuilderUtil.java:44)
          at io.debezium.transforms.outbox.StructBuilderUtil.getStructFieldValue(StructBuilderUtil.java:73)
          at io.debezium.transforms.outbox.StructBuilderUtil.jsonNodeToStructInternal(StructBuilderUtil.java:44)
          at io.debezium.transforms.outbox.StructBuilderUtil.jsonNodeToStruct(StructBuilderUtil.java:36)
          at io.debezium.transforms.outbox.EventRouterDelegate.apply(EventRouterDelegate.java:164)
          at io.debezium.transforms.outbox.EventRouter.apply(EventRouter.java:25)
          at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
          at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
          at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:354)
          at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)
          at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
          at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
          at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
          at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
          at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
          at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
          at java.base/java.lang.Thread.run(Thread.java:829)
      INFO [task-0] WorkerSourceTask{id=main-0-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask) [task-thread-main-0-0]
      ERROR [task-0] WorkerSourceTask{id=main-0-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-main-0-0]
      org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
          at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:329)
          at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:355)
          at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)
          at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
          at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
          at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
          at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
          at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
          at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
          at java.base/java.lang.Thread.run(Thread.java:829)
      Caused by: org.apache.kafka.connect.errors.DataException: Invalid type for STRUCT: class java.lang.String
          at org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:671)
          at org.apache.kafka.connect.json.JsonConverter.convertToJsonWithoutEnvelope(JsonConverter.java:554)
          at org.apache.kafka.connect.json.JsonConverter.fromConnectData(JsonConverter.java:304)
          at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63)
          at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$3(WorkerSourceTask.java:329)
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
          ... 11 more 

      How to reproduce the issue using our tutorial deployment?

      <Your answer>

      Feature request or enhancement

      For feature requests or enhancements, provide this information, please:

      Which use case/requirement will be addressed by the proposed feature?

      <Your answer>

      Implementation ideas (optional)

      <Your answer>

      Attachments

        Activity

          People

            Unassigned Unassigned
            hemantpardeshi01 Hemant Pardeshi (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: