Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-1896

Debezium "MongoDB New Document State Extraction": org.apache.kafka.connect.errors.DataException not a field name

    XMLWordPrintable

Details

    • Bug
    • Resolution: Not a Bug
    • Major
    • None
    • None
    • mongodb-connector
    • Hide

      Please see the attached source and sink configurations

      Show
      Please see the attached source and sink configurations

    Description

      Hi, I am working on getting changes from MongoDB to S3. I've been using
      Mongo->Debezium_Connector>Kafka>S3_Sink_Connector->S3. I wanted to use debezium suggested "MongoDB New Document State Extraction" https://debezium.io/documentation/reference/1.0/configuration/mongodb-event-flattening.html to get a valid JSON format on S3. The configuration I have below works for some of the documents but for a slightly nested JSON, I get the below error.

      org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
      	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
      	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
      	at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:506)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
      	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
      	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	at java.lang.Thread.run(Thread.java:748)
      Caused by: org.apache.kafka.connect.errors.DataException: <field> is not a valid field name
      	at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)
      	at org.apache.kafka.connect.data.Struct.put(Struct.java:202)
      	at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:231)
      	at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:129)
      	at io.debezium.connector.mongodb.transforms.MongoDataConverter.lambda$convertFieldValue$0(MongoDataConverter.java:203)
      

      Attachments

        1. source-connector.json
          0.9 kB
        2. sync-connector.json
          4 kB
        3. source-connector-v1.json
          0.6 kB
        4. sync-connector-v1.json
          3 kB
        5. source-connector-v1.json
          0.6 kB
        6. sync-connector-v1.json
          3 kB
        7. sample_data.json
          8 kB

        Issue Links

          Activity

            People

              Unassigned Unassigned
              sabarigandhi Sabari Gandhi (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: