Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-6901

ExtractNewRecordState's schema cache is not updated with arrival of the ddl change event

XMLWordPrintable

    • False
    • None
    • False
    • Important

      Version: 2.3.0.Final

      We use debezium source mysql data to kafka and use kafka-connect-jdbc with ExtractNewRecord sinkg data to other system.
      When we add field `started_at` to MySQL table, error occured.
      Log:

      org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
              at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230)
              at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
              at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
              at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:542)
              at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:495)
              at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:335)
              at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
              at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
              at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
              at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
              at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
              at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
              at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
              at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
              at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
              at java.base/java.lang.Thread.run(Thread.java:829)
      Caused by: org.apache.kafka.connect.errors.DataException: started_at is not a valid field name
              at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)
              at org.apache.kafka.connect.data.Struct.put(Struct.java:202)
              at io.debezium.transforms.ExtractNewRecordState.addFields(ExtractNewRecordState.java:294)
              at io.debezium.transforms.ExtractNewRecordState.apply(ExtractNewRecordState.java:231)
              at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
              at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
              at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
              ... 15 more
      

      I think we should use schema.name+shcema.version+operation as cache key, in ExtractNewRecordState.buildCacheKey

            ywyuewei Harvey Yue
            j2gg0s Wang j2gg0s
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

              Created:
              Updated:
              Resolved: