Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-8096

DataException: Not a struct schema: Schema{ARRAY} when send a array json with outbox transforms

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Can't Do
    • Icon: Major Major
    • None
    • 2.6.1.Final
    • outbox
    • None
    • False
    • None
    • False

      In order to make your issue reports as actionable as possible, please provide the following information, depending on the issue type.

      Bug report

      I'm using debezium 2.6.1 with outbox event router. When I send an event with payload 

      [{"code":"mycode", "name","myname"}]  

      Debezium crushed with error:

       

      WARN   ||  JSON expansion failed   [io.debezium.transforms.outbox.EventRouterDelegate]
      org.apache.kafka.connect.errors.DataException: Not a struct schema: Schema{ARRAY}
          at org.apache.kafka.connect.data.Struct.<init>(Struct.java:53)
          at io.debezium.transforms.outbox.JsonSchemaData.jsonNodeToStructInternal(JsonSchemaData.java:200)
          at io.debezium.transforms.outbox.JsonSchemaData.toConnectData(JsonSchemaData.java:196)
          at io.debezium.transforms.outbox.EventRouterDelegate.apply(EventRouterDelegate.java:173)
          at io.debezium.transforms.outbox.EventRouter.apply(EventRouter.java:28)
          at org.apache.kafka.connect.runtime.TransformationStage.apply(TransformationStage.java:57)
          at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:54)
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
          at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:54)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:401)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:367)
          at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
          at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)
          at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
          at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
          at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
          at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
          at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
          at java.base/java.lang.Thread.run(Thread.java:829)older
      

      It seems like array is not supported when 

      "transforms.outbox.table.expand.json.payload": "true", 

       

       

      What Debezium connector do you use and what version?

      version 2.6.1 with io.debezium.connector.mysql.MySqlConnector

      What is the connector configuration?

      {    "connector.class": "io.debezium.connector.mysql.MySqlConnector",    "transforms.outbox.table.fields.additional.error.on.missing": "false",    "predicates.IsOutboxTable.pattern": "outbox\\..*\\.event_outbox",    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",    "topic.creation.default.partitions": "10",    "transforms.outbox.predicate": "IsOutboxTable",    "transforms": "outbox",    "retriable.restart.connector.wait.ms": "10000",    "topic.creation.events.cleanup.policy": "delete",    "topic.prefix": "outbox",    "transforms.outbox.table.expand.json.payload": "true",    "schema.history.internal.kafka.topic": "outbox.dbhistory",    "transforms.outbox.route.topic.replacement": "event.${routedByValue}",    "topic.creation.default.replication.factor": "1",    "predicates.IsOutboxTable.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",    "value.converter": "org.apache.kafka.connect.json.JsonConverter",    "key.converter": "org.apache.kafka.connect.json.JsonConverter",    "topic.creation.events.include": "event\\..*",    "errors.retry.timeout": "300000",    "database.user": "debezium",    "database.server.id": "5110",    "topic.creation.default.cleanup.policy": "compact",    "transforms.outbox.table.fields.additional.placement": "routing:header:x-routing,id:header:x-event-id,type:header:x-event-type,aggregatetype:header:x-aggr-type,aggregateid:header:x-aggr-id,created_by:header:x-auth-token",    "schema.history.internal.kafka.bootstrap.servers": "127.0.0.1:9092",    "event.processing.failure.handling.mode": "warn",    "schema.history.internal.skip.unparseable.ddl": "true",    "database.port": "3001",    "topic.creation.events.replication.factor": "1",    "topic.creation.groups": "events",    "topic.creation.enable": "true",    "key.converter.schemas.enable": "false",    "predicates": "IsOutboxTable",    "errors.max.retries": "-1",    "database.hostname": "127.0.0.1",    "database.password": "password",    "name": "outbox",    "value.converter.schemas.enable": "false",    "table.include.list": ".*\\.event_outbox",    "topic.creation.events.partitions": "20",    "snapshot.mode": "no_data"} 

      What is the captured database version and mode of deployment?

      mysql 8.0.1

      What behavior do you expect?

      Parse array json or at least  not crushed when parse json error raised.

      Do you see the same behaviour using the latest released Debezium version?

      yes

      Do you have the connector logs, ideally from start till finish?

      2024-07-25 08:31:34,548 WARN   ||  JSON expansion failed   [io.debezium.transforms.outbox.EventRouterDelegate]
      org.apache.kafka.connect.errors.DataException: Not a struct schema: Schema{ARRAY}
          at org.apache.kafka.connect.data.Struct.<init>(Struct.java:53)
          at io.debezium.transforms.outbox.JsonSchemaData.jsonNodeToStructInternal(JsonSchemaData.java:200)
          at io.debezium.transforms.outbox.JsonSchemaData.toConnectData(JsonSchemaData.java:196)
          at io.debezium.transforms.outbox.EventRouterDelegate.apply(EventRouterDelegate.java:173)
          at io.debezium.transforms.outbox.EventRouter.apply(EventRouter.java:28)
          at org.apache.kafka.connect.runtime.TransformationStage.apply(TransformationStage.java:57)
          at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:54)
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
          at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:54)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:401)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:367)
          at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
          at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)
          at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
          at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
          at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
          at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
          at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
          at java.base/java.lang.Thread.run(Thread.java:829)
      2024-07-25 08:31:34,553 ERROR  ||  WorkerSourceTask{id=ucmp-outbox-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted   [org.apache.kafka.connect.runtime.WorkerTask]
      org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230)
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.convertTransformedRecord(AbstractWorkerSourceTask.java:494)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:402)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:367)
          at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
          at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)
          at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
          at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
          at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
          at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
          at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
          at java.base/java.lang.Thread.run(Thread.java:829)
      Caused by: org.apache.kafka.connect.errors.DataException: Invalid type for ARRAY: class java.lang.String
          at org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:680)
          at org.apache.kafka.connect.json.JsonConverter.convertToJsonWithoutEnvelope(JsonConverter.java:563)
          at org.apache.kafka.connect.json.JsonConverter.fromConnectData(JsonConverter.java:313)
          at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:67)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.lambda$convertTransformedRecord$6(AbstractWorkerSourceTask.java:494)
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
          ... 13 more 

      How to reproduce the issue using our tutorial deployment?

      Just run any test case in io.debezium.transforms.outbox.EventRouterTest class, and make record payload like this : 

      Feature request or enhancement

      For feature requests or enhancements, provide this information, please:

      Which use case/requirement will be addressed by the proposed feature?

      <Your answer>

      Implementation ideas (optional)

      I found the error throw at 


      Again, when this issue arises, the payloadSchema is still altered (in this example, it becomes an array schema type), and then, when AbstractWorkerSourceTask processes a Record, it will throw another exception (org.apache.kafka.connect.errors.DataException: Invalid type for ARRAY: class java.lang.String) just like the logs above.
       
       

              Unassigned Unassigned
              dazzhang daz zhang (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

                Created:
                Updated:
                Resolved: