-
Bug
-
Resolution: Done
-
Major
-
2.3.0.Final
-
None
-
False
-
None
-
False
-
Important
Version: 2.3.0.Final
We use debezium source mysql data to kafka and use kafka-connect-jdbc with ExtractNewRecord sinkg data to other system.
When we add field `started_at` to MySQL table, error occured.
Log:
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156) at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:542) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:495) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:335) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.kafka.connect.errors.DataException: started_at is not a valid field name at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254) at org.apache.kafka.connect.data.Struct.put(Struct.java:202) at io.debezium.transforms.ExtractNewRecordState.addFields(ExtractNewRecordState.java:294) at io.debezium.transforms.ExtractNewRecordState.apply(ExtractNewRecordState.java:231) at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214) ... 15 more
I think we should use schema.name+shcema.version+operation as cache key, in ExtractNewRecordState.buildCacheKey
- links to
-
RHEA-2023:120698 Red Hat build of Debezium 2.3.4 release