Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-7058

Field exclusion does not work with events of removed fields

    XMLWordPrintable

Details

    Description

      Bug report

      For bug reports, provide this information, please:

      What Debezium connector do you use and what version?

      debezium-connector-mongodb version 2.3.0.Final

      What is the connector configuration?

      {
        "name": "mongodb-source-connector",
        "config": {
          "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
          "mongodb.connection.string": "mongodb://mongodb:27017/?replicaSet=rs0",
          "topic.prefix": "dbserver1",
          "database.include.list": "inventory",
          "field.exclude.list": "inventory.customers.contact.email"
        }
      }
      

      What is the captured database version and mode of depoyment?

      MongoDB 5.0, Kubernetes

      What behaviour do you expect?

      I am able to create a connector using the configuration above with field exclusion, and the connector handles events with field deletion successfully.

      What behaviour do you see?

      The connector fails with the error:

      Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 1 out of bounds for length 1
      

      Do you see the same behaviour using the latest relesead Debezium version?

      Yes

      Do you have the connector logs, ideally from start till finish?

      2023-10-19 18:22:09,316 ERROR  MongoDB|dbserver1|streaming  Producer failure   [io.debezium.pipeline.ErrorHandler]
      org.apache.kafka.connect.errors.ConnectException: Error while processing event at offset {sec=1697739719, ord=1, transaction_id=null, resume_token=82653173C7000000012B022C0100296E5A1004FF782D6EB16844C0A4C9E8F58FF791BA46645F69640064653173C22D4485764AB295F50004}
        at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:339)
        at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.readChangeStream(MongoDbStreamingChangeEventSource.java:190)
        at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.lambda$streamChangesForReplicaSet$0(MongoDbStreamingChangeEventSource.java:99)
        at io.debezium.connector.mongodb.connection.MongoDbConnection.lambda$execute$3(MongoDbConnection.java:104)
        at io.debezium.connector.mongodb.connection.MongoDbConnection.execute(MongoDbConnection.java:120)
        at io.debezium.connector.mongodb.connection.MongoDbConnection.execute(MongoDbConnection.java:103)
        at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.streamChangesForReplicaSet(MongoDbStreamingChangeEventSource.java:98)
        at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.execute(MongoDbStreamingChangeEventSource.java:82)
        at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.execute(MongoDbStreamingChangeEventSource.java:41)
        at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:205)
        at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:172)
        at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:118)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
      Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 1 out of bounds for length 1
        at io.debezium.connector.mongodb.FieldSelector$Path.matchesPath(FieldSelector.java:561)
        at io.debezium.connector.mongodb.FieldSelector$1.apply(FieldSelector.java:203)
        at io.debezium.connector.mongodb.MongoDbCollectionSchema.lambda$valueFromDocumentChangeStream$3(MongoDbCollectionSchema.java:117)
        at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
        at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
        at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
        at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
        at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
        at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
        at io.debezium.connector.mongodb.MongoDbCollectionSchema.valueFromDocumentChangeStream(MongoDbCollectionSchema.java:119)
        at io.debezium.connector.mongodb.recordemitter.MongoDbChangeRecordEmitter.createAndEmitChangeRecord(MongoDbChangeRecordEmitter.java:86)
        at io.debezium.connector.mongodb.recordemitter.MongoDbChangeRecordEmitter.emitUpdateRecord(MongoDbChangeRecordEmitter.java:74)
        at io.debezium.connector.mongodb.recordemitter.MongoDbChangeRecordEmitter.emitUpdateRecord(MongoDbChangeRecordEmitter.java:33)
        at io.debezium.pipeline.AbstractChangeRecordEmitter.emitChangeRecords(AbstractChangeRecordEmitter.java:49)
        at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:296)
        ... 16 more
      

      How to reproduce the issue using our tutorial deployment?

      1. Create a new connector:

      curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d'
      {
        "name": "mongodb-source-connector",
        "config": {
          "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
          "mongodb.connection.string": "mongodb://mongodb:27017/?replicaSet=rs0",
          "topic.prefix": "dbserver1",
          "database.include.list": "inventory",
          "field.exclude.list": "inventory.customers.contact.email"
        }
      }'
      

      2. Insert MongoDB document:

      db.customers.insert([{"first_name": "Bob", "last_name": "Hopper", "contact": {"email": "thebob@example.com"}}])
      

      3. Update MongoDB document (remove field):

      db.customers.updateOne({"first_name": "Bob"}, {"$unset": {"contact": ""}})
      

      3. Check connector status:
      ER:

      {
        "name": "mongodb-source-connector",
        "connector": {
          "state": "RUNNING",
          "worker_id": "10.128.7.40:8083"
        },
        "tasks": [
          {
            "id": 0,
            "state": "RUNNING",
            "worker_id": "10.128.7.40:8083"
          }
        ],
        "type": "source"
      }
      

      AR:

      {
        "name": "mongodb-source-connector",
        "connector": {
          "state": "RUNNING",
          "worker_id": "192.168.128.5:8083"
        },
        "tasks": [
          {
            "id": 0,
            "state": "FAILED",
            "worker_id": "192.168.128.5:8083",
            "trace": "org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.\n\tat io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:72)\n\tat io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.readChangeStream(MongoDbStreamingChangeEventSource.java:200)\n\tat io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.lambda$streamChangesForReplicaSet$0(MongoDbStreamingChangeEventSource.java:99)\n\tat io.debezium.connector.mongodb.connection.MongoDbConnection.lambda$execute$3(MongoDbConnection.java:104)\n\tat io.debezium.connector.mongodb.connection.MongoDbConnection.execute(MongoDbConnection.java:120)\n\tat io.debezium.connector.mongodb.connection.MongoDbConnection.execute(MongoDbConnection.java:103)\n\tat io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.streamChangesForReplicaSet(MongoDbStreamingChangeEventSource.java:98)\n\tat io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.execute(MongoDbStreamingChangeEventSource.java:82)\n\tat io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.execute(MongoDbStreamingChangeEventSource.java:41)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:205)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:172)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:118)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.connect.errors.ConnectException: Error while processing event at offset {sec=1697739719, ord=1, transaction_id=null, resume_token=82653173C7000000012B022C0100296E5A1004FF782D6EB16844C0A4C9E8F58FF791BA46645F69640064653173C22D4485764AB295F50004}\n\tat io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:339)\n\tat io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.readChangeStream(MongoDbStreamingChangeEventSource.java:190)\n\t... 15 more\nCaused by: java.lang.ArrayIndexOutOfBoundsException: Index 1 out of bounds for length 1\n\tat io.debezium.connector.mongodb.FieldSelector$Path.matchesPath(FieldSelector.java:561)\n\tat io.debezium.connector.mongodb.FieldSelector$1.apply(FieldSelector.java:203)\n\tat io.debezium.connector.mongodb.MongoDbCollectionSchema.lambda$valueFromDocumentChangeStream$3(MongoDbCollectionSchema.java:117)\n\tat java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)\n\tat java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)\n\tat java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)\n\tat java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)\n\tat java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)\n\tat java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)\n\tat java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)\n\tat io.debezium.connector.mongodb.MongoDbCollectionSchema.valueFromDocumentChangeStream(MongoDbCollectionSchema.java:119)\n\tat io.debezium.connector.mongodb.recordemitter.MongoDbChangeRecordEmitter.createAndEmitChangeRecord(MongoDbChangeRecordEmitter.java:86)\n\tat io.debezium.connector.mongodb.recordemitter.MongoDbChangeRecordEmitter.emitUpdateRecord(MongoDbChangeRecordEmitter.java:74)\n\tat io.debezium.connector.mongodb.recordemitter.MongoDbChangeRecordEmitter.emitUpdateRecord(MongoDbChangeRecordEmitter.java:33)\n\tat io.debezium.pipeline.AbstractChangeRecordEmitter.emitChangeRecords(AbstractChangeRecordEmitter.java:49)\n\tat io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:296)\n\t... 16 more\n"
          }
        ],
        "type": "source"
      }
      

      Attachments

        Activity

          People

            jcechace@redhat.com Jakub Čecháček
            andrey.pustovetov@gmail.com Andrey Pustovetov
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: