Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-6774

MongoDB New Document State Extraction: nonexistent field for add.headers

XMLWordPrintable

    • False
    • None
    • False

      Bug report

      What Debezium connector do you use and what version?

      Debezium MongoDB connector: 2.3.0.Final

      What is the connector configuration?

      "transforms": "unwrap"
      "transforms.unwrap.type": "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState"
      "transforms.unwrap.drop.tombstones": true
      "transforms.unwrap.delete.handling.mode": "none"
      "transforms.unwrap.add.headers.prefix": ""
      "transforms.unwrap.add.headers": "nonexistentField"

      What is the captured database version and mode of depoyment?

      Atlas MongoDB: 4.4.19

      What behaviour do you expect?

      According to the MongoDB New Document State Extraction add.headers:
      "If you specify a field that is not in the change event original message, the SMT does not add the field to the header."

      What behaviour do you see?

      Based on documentation, it is expected that a nonexistent field will be added to the header (skipped), but the connector failed with the error: 

      "Caused by: org.apache.kafka.connect.errors.DataException: nonexistentField is not a valid field name"

      It is better to skip such fields like it is mentioned in documentation, because set of fields can vary for JSON document as well as for metadata "source" field (for instance, "lsid" and "txnNumber" can be present or absent).

      Do you see the same behaviour using the latest relesead Debezium version?

      Verified with Debezium MongoDB connector: 2.3.0.Final only.

      Do you have the connector logs, ideally from start till finish?

      org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230)
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
          at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:391)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:359)
          at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
          at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75)
          at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
          at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
          at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
          at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
          at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
          at java.base/java.lang.Thread.run(Thread.java:833)
      Caused by: org.apache.kafka.connect.errors.DataException: nonexistentField is not a valid field name
          at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)
          at org.apache.kafka.connect.data.Struct.get(Struct.java:74)
          at io.debezium.connector.mongodb.transforms.ExtractNewDocumentState$FieldReference.getValue(ExtractNewDocumentState.java:521)
          at io.debezium.connector.mongodb.transforms.ExtractNewDocumentState.makeHeaders(ExtractNewDocumentState.java:355)
          at io.debezium.connector.mongodb.transforms.ExtractNewDocumentState.apply(ExtractNewDocumentState.java:199)
          at org.apache.kafka.connect.runtime.PredicatedTransformation.apply(PredicatedTransformation.java:56)
          at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
          ... 13 more 

      How to reproduce the issue using our tutorial deployment?

      Using docker-compose-mongodb.yaml

              Unassigned Unassigned
              arttaraskin Artem Taraskin (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

                Created:
                Updated:
                Resolved: