Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-5434

Translation from mongodb document to kafka connect schema fails when nested arrays contain no elements

XMLWordPrintable

    • False
    • None
    • False

      What Debezium connector do you use and what version?

      Debezium MongoDb Connector, 1.9.5.Final

      What is the connector configuration?

      key.converter.schemas.enable: false
      value.converter.schemas.enable: false
      offset.flush.interval.ms: 5000
      mongodb.hosts: localhost:27017
      mongodb.name: xxxx
      mongodb.user: xxxx
      mongodb.password: xxxx
      mongodb.ssl.enabled: false
      connector.class: io.debezium.connector.mongodb.MongoDbConnector
      database.include.list: xxxx
      collection.include.list: xxxx
      connect.backoff.initial.delay.ms: 1000
      connect.backoff.max.delay.ms: 10000
      connect.max.attempts: 3
      snapshot.fetch.size: 2000

      What is the captured database version and mode of depoyment?

      mongo:4.2

      What behaviour do you expect?

      MongoDB change event converted correctly and reached handleEvent method of the Debezium consumer.

      What behaviour do you see?

      Error in converting mongodb document to kafka connect schema.

      Do you see the same behaviour using the latest released Debezium version?

      Yes

      Do you have the connector logs, ideally from start till finish?

      For this document:
      {
          "_id" : ObjectId("6182b1a25711ed59dd6a1d6c"),
          "planExecutionId" : "MJXINSMZ",
          "f1" : {
              "f2" : [ 
                  {
                      "f3" : {}
                  }, 
                  {
                      "f3" : {"f5": 5}
                      
                  }
              ]
      }
      }
      
      I'm getting below error:
      
      ERROR io.debezium.embedded.EmbeddedEngine - Stopping connector after error in the application's handler method: f5 is not a valid field name  
      org.apache.kafka.connect.errors.DataException: f5 is not a valid field name
          at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)
          at org.apache.kafka.connect.data.Struct.put(Struct.java:202)
          at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:214)
          at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:151)
          at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:265)
          at io.debezium.connector.mongodb.transforms.MongoDataConverter.lambda$convertFieldValue$0(MongoDataConverter.java:189)
          at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
          at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658)
          at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:181)
          at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:151)
          at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertRecord(MongoDataConverter.java:60)
          at io.debezium.connector.mongodb.transforms.ExtractNewDocumentState.newRecord(ExtractNewDocumentState.java:324)
          at io.debezium.connector.mongodb.transforms.ExtractNewDocumentState.apply(ExtractNewDocumentState.java:264)
          at io.debezium.embedded.Transformations.transform(Transformations.java:74)
          at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
          at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
          at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
          at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
          at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
          at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
          at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
          at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:814)
          at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:192)
          at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
          at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
          at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
          at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
          at java.base/java.lang.Thread.run(Thread.java:829)
      2022-07-29 11:24:57,094 [debezium-controller-thread-0] INFO  io.debezium.embedded.EmbeddedEngine - Stopping the embedded engine   
      
      For this document:
      {
          "_id" : ObjectId("6182b1a25711ed59dd6a1d6c"),
          "planExecutionId" : "MJXINSMZ",
          "f1" : {
              "f2" : [ 
                  {
                      "f3" : []
                  }, 
                  {
                      "f3" : [{"f5": 5}]
                      
                  }
              ]
      }
      }
      
      I'm getting below error:
      ERROR io.debezium.embedded.EmbeddedEngine - Stopping connector after error in the application's handler method: Not a struct schema: Schema{STRING}  
      org.apache.kafka.connect.errors.DataException: Not a struct schema: Schema{STRING}
          at org.apache.kafka.connect.data.Struct.<init>(Struct.java:53)
          at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:263)
          at io.debezium.connector.mongodb.transforms.MongoDataConverter.lambda$convertFieldValue$0(MongoDataConverter.java:189)
          at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
          at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658)
          at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:181)
          at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:265)
          at io.debezium.connector.mongodb.transforms.MongoDataConverter.lambda$convertFieldValue$0(MongoDataConverter.java:189)
          at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
          at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658)
          at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:181)
          at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:151)
          at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertRecord(MongoDataConverter.java:60)
          at io.debezium.connector.mongodb.transforms.ExtractNewDocumentState.newRecord(ExtractNewDocumentState.java:324)
          at io.debezium.connector.mongodb.transforms.ExtractNewDocumentState.apply(ExtractNewDocumentState.java:264)
          at io.debezium.embedded.Transformations.transform(Transformations.java:74)
          at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
          at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
          at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
          at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
          at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
          at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
          at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
          at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:814)
          at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:192)
          at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
          at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
          at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
          at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
          at java.base/java.lang.Thread.run(Thread.java:829)

       

      How to reproduce the issue using our tutorial deployment?

      Use following mongo documents:
      1. {
          "_id" : ObjectId("6182b1a25711ed59dd6a1d6c"),
          "f1" : {
              "f2" : [ 
                  {
                      "f3" : {}
                  }, 
                  {
                      "f3" : {"f5": 5}
                      
                  }
              ]
      }
      }
      
      2. {
          "_id" : ObjectId("6182b1a25711ed59dd6a1d6c"),
      
          "f1" : {
              "f2" : [ 
                  {
                      "f3" : []
                  }, 
                  {
                      "f3" : [{"f5": 5}]
                      
                  }
              ]
      }
      }

      Implementation Ideas (Optional)

       

              Unassigned Unassigned
              shalini.agrawal Shalini Agrawal (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

                Created:
                Updated:
                Resolved: