Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-1896

Debezium "MongoDB New Document State Extraction": org.apache.kafka.connect.errors.DataException not a field name

    Details

    • Steps to Reproduce:
      Hide

      Please see the attached source and sink configurations

      Show
      Please see the attached source and sink configurations

      Description

      Hi, I am working on getting changes from MongoDB to S3. I've been using
      Mongo->Debezium_Connector>Kafka>S3_Sink_Connector->S3. I wanted to use debezium suggested "MongoDB New Document State Extraction" https://debezium.io/documentation/reference/1.0/configuration/mongodb-event-flattening.html to get a valid JSON format on S3. The configuration I have below works for some of the documents but for a slightly nested JSON, I get the below error.

      org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
      	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
      	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
      	at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:506)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
      	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
      	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	at java.lang.Thread.run(Thread.java:748)
      Caused by: org.apache.kafka.connect.errors.DataException: <field> is not a valid field name
      	at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)
      	at org.apache.kafka.connect.data.Struct.put(Struct.java:202)
      	at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:231)
      	at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:129)
      	at io.debezium.connector.mongodb.transforms.MongoDataConverter.lambda$convertFieldValue$0(MongoDataConverter.java:203)
      

        Gliffy Diagrams

          Attachments

          1. sample_data.json
            8 kB
          2. source-connector.json
            0.9 kB
          3. source-connector-v1.json
            0.6 kB
          4. source-connector-v1.json
            0.6 kB
          5. sync-connector.json
            4 kB
          6. sync-connector-v1.json
            3 kB
          7. sync-connector-v1.json
            3 kB

            Issue Links

              Activity

                People

                • Assignee:
                  Unassigned
                  Reporter:
                  sabarigandhi Sabari Gandhi
                • Votes:
                  0 Vote for this issue
                  Watchers:
                  2 Start watching this issue

                  Dates

                  • Created:
                    Updated:
                    Resolved: