-
Bug
-
Resolution: Done
-
Major
-
2.3.0.Final
-
None
-
False
-
-
False
-
Important
Version: 2.3.0.Final
We use debezium source mysql data to kafka and use kafka-connect-jdbc with ExtractNewRecord sinkg data to other system.
When we add field `started_at` to MySQL table, error occured.
Log:
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:542)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:495)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:335)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: started_at is not a valid field name
at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)
at org.apache.kafka.connect.data.Struct.put(Struct.java:202)
at io.debezium.transforms.ExtractNewRecordState.addFields(ExtractNewRecordState.java:294)
at io.debezium.transforms.ExtractNewRecordState.apply(ExtractNewRecordState.java:231)
at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
... 15 more
I think we should use schema.name+shcema.version+operation as cache key, in ExtractNewRecordState.buildCacheKey
- links to
-
RHEA-2023:120698
Red Hat build of Debezium 2.3.4 release