Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-8572

MongoDataConverter does not recognize nested empty array

XMLWordPrintable

      What Debezium connector do you use and what version?

      io.debezium.connector.mongodb.MongoDbConnector, 2.1.4.Final

      What is the connector configuration?

       

      {
          "collection.include.list": "<replace_me>",
          "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
          "errors.deadletterqueue.context.headers.enable": "true",
          "errors.deadletterqueue.topic.name": "<replace_me>",
          "errors.log.enable": "true",
          "errors.log.include.messages": "true",
          "errors.tolerance": "all",
          "key.converter": "org.apache.kafka.connect.storage.StringConverter",
          "key.converter.schemas.enable": "false",
          "mongodb.hosts": "<replace_me>",
          "mongodb.password": "<replace_me>",
          "mongodb.ssl.enabled": "true",
          "mongodb.user": "<replace_me>",
          "name": "<replace_me>",
          "tasks.max": "1",
          "topic.prefix": "<replace_me>",
          "transforms": "outbox",
          "transforms.outbox.collection.expand.json.payload": "true",
          "transforms.outbox.collection.field.event.key": "aggregateRootId",
          "transforms.outbox.route.by.field": "aggregateRootType",
          "transforms.outbox.route.topic.replacement": "<replace_me>.${routedByValue}",
          "transforms.outbox.type": "io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter",
          "value.converter": "org.apache.kafka.connect.json.JsonConverter",
          "value.converter.schemas.enable": "false"

      What is the captured database version and mode of deployment?

      MongoDB 6.0, provided by MongoDB Atlas

      What behavior do you expect?

      event is sent to kafka

       

      {
        "parentarray": [
          {
            "stringfield": "stringvalue",
            "nestedarray": []
          }
        ]
      } 
      

       

      What behavior do you see?

      Connector produces an error, skips recod, event does not appear in kafka.

      Do you see the same behaviour using the latest released Debezium version?

      3.0.6.Final

      Do you have the connector logs, ideally from start till finish?

       

      [2025-01-13 14:16:09,786] WARN [array|task-0] Failed to expand after field: nestedarray is not a valid field name (io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter:67)
      org.apache.kafka.connect.errors.DataException: nestedarray is not a valid field name
          at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)
          at org.apache.kafka.connect.data.Struct.put(Struct.java:202)
          at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:214)
          at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:265)
          at io.debezium.connector.mongodb.transforms.MongoDataConverter.lambda$convertFieldValue$0(MongoDataConverter.java:189)
          at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
          at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658)
          at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:181)
          at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:151)
          at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertRecord(MongoDataConverter.java:60)
          at io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter.buildNewAfterStruct(MongoEventRouter.java:195)
          at io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter.expandAfterField(MongoEventRouter.java:124)
          at io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter.lambda$apply$0(MongoEventRouter.java:64)
          at io.debezium.transforms.outbox.EventRouterDelegate.apply(EventRouterDelegate.java:113)
          at io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter.apply(MongoEventRouter.java:62)
          at org.apache.kafka.connect.runtime.TransformationStage.apply(TransformationStage.java:57)
          at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:54)
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
          at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:54)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:401)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:367)
          at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
          at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)
          at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
          at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
          at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
          at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
          at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
          at java.base/java.lang.Thread.run(Thread.java:829)
      [2025-01-13 14:16:09,788] ERROR [array|task-0] Error encountered in task array-0. Executing stage 'TRANSFORMATION' with class 'io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter', where source record is = SourceRecord{sourcePartition={rs=rs0, server_id=dev.private.app.fct}, sourceOffset={sec=1736777769, ord=1, transaction_id=null, resume_token=8267852029000000012B042C0100296E5A1004ABF65439FDCB46AD9CB53C276DBA0096463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F6964006467852029EED003C8505A256D000004}} ConnectRecord{topic='dev.private.app.fct.app.eventStore', kafkaPartition=null, key=Struct{id={"$oid": "67852029eed003c8505a256d"}}, keySchema=Schema{dev.private.app.fct.app.eventStore.Key:STRUCT}, value=Struct{after={"_id": {"$oid": "67852029eed003c8505a256d"},"aggregateRootId": "4991ece5-b06a-40d9-996e-a76110db8c51","aggregateRootType": "type","version": 0,"payload": {"parentarray": [{"stringfield": "stringvalue","nestedarray": []}]}},source=Struct{version=2.1.4.Final,connector=mongodb,name=dev.private.app.fct,ts_ms=1736777769000,db=app,rs=rs0,collection=eventStore,ord=1},op=c,ts_ms=1736777769545}, valueSchema=Schema{dev.private.app.fct.app.eventStore.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}. (org.apache.kafka.connect.runtime.errors.LogReporter:66)
      org.apache.kafka.connect.errors.DataException: Only Struct objects supported for [Read Outbox Event], found: java.lang.String
          at org.apache.kafka.connect.transforms.util.Requirements.requireStruct(Requirements.java:52)
          at io.debezium.transforms.outbox.EventRouterDelegate.apply(EventRouterDelegate.java:120)
          at io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter.apply(MongoEventRouter.java:62)
          at org.apache.kafka.connect.runtime.TransformationStage.apply(TransformationStage.java:57)
          at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:54)
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
          at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:54)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:401)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:367)
          at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
          at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)
          at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
          at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
          at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
          at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
          at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
          at java.base/java.lang.Thread.run(Thread.java:829)
       

       

      How to reproduce the issue using our tutorial deployment?

      Basically the similar issue was closed before: https://issues.redhat.com/browse/DBZ-5434, but the provided fix does not cover my scenario. 

      Parent array should have an object which contains some string field and empty array, then this test will fail.

      So the issue can be reproduced in existent ExtractNewDocumentStateTestIT#shouldSupportNestedArrays test if input is updated accordingly (see attachment). 

       

              Unassigned Unassigned
              regular154 Sergey Seroshtan
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

                Created:
                Updated:
                Resolved: