-
Bug
-
Resolution: Can't Do
-
Major
-
None
-
2.6.1.Final
-
None
-
False
-
None
-
False
In order to make your issue reports as actionable as possible, please provide the following information, depending on the issue type.
Bug report
I'm using debezium 2.6.1 with outbox event router. When I send an event with payload
[{"code":"mycode", "name","myname"}]
Debezium crushed with error:
WARN || JSON expansion failed [io.debezium.transforms.outbox.EventRouterDelegate] org.apache.kafka.connect.errors.DataException: Not a struct schema: Schema{ARRAY} at org.apache.kafka.connect.data.Struct.<init>(Struct.java:53) at io.debezium.transforms.outbox.JsonSchemaData.jsonNodeToStructInternal(JsonSchemaData.java:200) at io.debezium.transforms.outbox.JsonSchemaData.toConnectData(JsonSchemaData.java:196) at io.debezium.transforms.outbox.EventRouterDelegate.apply(EventRouterDelegate.java:173) at io.debezium.transforms.outbox.EventRouter.apply(EventRouter.java:28) at org.apache.kafka.connect.runtime.TransformationStage.apply(TransformationStage.java:57) at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:54) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156) at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:54) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:401) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:367) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829)older
It seems like array is not supported when
"transforms.outbox.table.expand.json.payload": "true",
What Debezium connector do you use and what version?
version 2.6.1 with io.debezium.connector.mysql.MySqlConnector
What is the connector configuration?
{ "connector.class": "io.debezium.connector.mysql.MySqlConnector", "transforms.outbox.table.fields.additional.error.on.missing": "false", "predicates.IsOutboxTable.pattern": "outbox\\..*\\.event_outbox", "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter", "topic.creation.default.partitions": "10", "transforms.outbox.predicate": "IsOutboxTable", "transforms": "outbox", "retriable.restart.connector.wait.ms": "10000", "topic.creation.events.cleanup.policy": "delete", "topic.prefix": "outbox", "transforms.outbox.table.expand.json.payload": "true", "schema.history.internal.kafka.topic": "outbox.dbhistory", "transforms.outbox.route.topic.replacement": "event.${routedByValue}", "topic.creation.default.replication.factor": "1", "predicates.IsOutboxTable.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "topic.creation.events.include": "event\\..*", "errors.retry.timeout": "300000", "database.user": "debezium", "database.server.id": "5110", "topic.creation.default.cleanup.policy": "compact", "transforms.outbox.table.fields.additional.placement": "routing:header:x-routing,id:header:x-event-id,type:header:x-event-type,aggregatetype:header:x-aggr-type,aggregateid:header:x-aggr-id,created_by:header:x-auth-token", "schema.history.internal.kafka.bootstrap.servers": "127.0.0.1:9092", "event.processing.failure.handling.mode": "warn", "schema.history.internal.skip.unparseable.ddl": "true", "database.port": "3001", "topic.creation.events.replication.factor": "1", "topic.creation.groups": "events", "topic.creation.enable": "true", "key.converter.schemas.enable": "false", "predicates": "IsOutboxTable", "errors.max.retries": "-1", "database.hostname": "127.0.0.1", "database.password": "password", "name": "outbox", "value.converter.schemas.enable": "false", "table.include.list": ".*\\.event_outbox", "topic.creation.events.partitions": "20", "snapshot.mode": "no_data"}
What is the captured database version and mode of deployment?
mysql 8.0.1
What behavior do you expect?
Parse array json or at least not crushed when parse json error raised.
Do you see the same behaviour using the latest released Debezium version?
yes
Do you have the connector logs, ideally from start till finish?
2024-07-25 08:31:34,548 WARN || JSON expansion failed [io.debezium.transforms.outbox.EventRouterDelegate] org.apache.kafka.connect.errors.DataException: Not a struct schema: Schema{ARRAY} at org.apache.kafka.connect.data.Struct.<init>(Struct.java:53) at io.debezium.transforms.outbox.JsonSchemaData.jsonNodeToStructInternal(JsonSchemaData.java:200) at io.debezium.transforms.outbox.JsonSchemaData.toConnectData(JsonSchemaData.java:196) at io.debezium.transforms.outbox.EventRouterDelegate.apply(EventRouterDelegate.java:173) at io.debezium.transforms.outbox.EventRouter.apply(EventRouter.java:28) at org.apache.kafka.connect.runtime.TransformationStage.apply(TransformationStage.java:57) at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:54) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156) at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:54) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:401) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:367) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) 2024-07-25 08:31:34,553 ERROR || WorkerSourceTask{id=ucmp-outbox-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask] org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.convertTransformedRecord(AbstractWorkerSourceTask.java:494) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:402) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:367) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.kafka.connect.errors.DataException: Invalid type for ARRAY: class java.lang.String at org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:680) at org.apache.kafka.connect.json.JsonConverter.convertToJsonWithoutEnvelope(JsonConverter.java:563) at org.apache.kafka.connect.json.JsonConverter.fromConnectData(JsonConverter.java:313) at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:67) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.lambda$convertTransformedRecord$6(AbstractWorkerSourceTask.java:494) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214) ... 13 more
How to reproduce the issue using our tutorial deployment?
Just run any test case in io.debezium.transforms.outbox.EventRouterTest class, and make record payload like this :
Feature request or enhancement
For feature requests or enhancements, provide this information, please:
Which use case/requirement will be addressed by the proposed feature?
<Your answer>
Implementation ideas (optional)
I found the error throw at
Again, when this issue arises, the payloadSchema is still altered (in this example, it becomes an array schema type), and then, when AbstractWorkerSourceTask processes a Record, it will throw another exception (org.apache.kafka.connect.errors.DataException: Invalid type for ARRAY: class java.lang.String) just like the logs above.