Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-5015

Triggering Incremental Snapshot on MongoDB connector throws json parsing error

XMLWordPrintable

    • False
    • None
    • False
    • Hide

      Create a mongo debezium connector pointed at a mongo db. 

      Set up a signaling table and name it whatever you choose.

      Point your configuration of the mongo debezium connector to that table.

      Create a collection in your signaling table formatted like so:

      {     
          "_id": "test-1",
          "type": "execute-snapshot",
          "data": {
              "data-collections": ["database.collection"]
          }
      }

      Once committed it should throw the error in the logs and not snapshot.

      Show
      Create a mongo debezium connector pointed at a mongo db.  Set up a signaling table and name it whatever you choose. Point your configuration of the mongo debezium connector to that table. Create a collection in your signaling table formatted like so: {     "_id" : "test-1" ,   "type" : "execute-snapshot" ,     "data" : {         "data-collections" : [ "database.collection" ]    } } Once committed it should throw the error in the logs and not snapshot.

      Bug report

      For bug reports, provide this information, please:

      What Debezium connector do you use and what version?

      io.debezium.connector.mongodb.MongoDbConnector v1.9.0

      What is the connector configuration?

       

      connector.class=io.debezium.connector.mongodb.MongoDbConnector
      database.include.list=db1,debezium
      key.converter=org.apache.kafka.connect.json.JsonConverter
      mongodb.hosts=mongodb://mongo1,mongo2,mongo3
      mongodb.name=db1
      mongodb.password=<password>
      mongodb.user=<user>
      signal.data.collection=debezium.signal
      snapshot.mode=initial
      tasks.max=1
      value.converter=org.apache.kafka.connect.json.JsonConverter

       

      What is the captured database version and mode of depoyment?

      Running in local docker container test environment. MongoDB v5.0.6

      What behaviour do you expect?

      An incremental snapshot taken of the tables specified

      What behaviour do you see?

      Throws error when a record is inserted into my signaling collection table debezium.signal . My collection looks like: 

       

      {
          "_id": "test-22",  
          "type": "execute-snapshot",
          "data": {
              "data-collections": ["db1.test_table"]
          }
      }

       

      Error thrown:

       

      {
          "exception":
          {
              "stacktrace": "com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Document': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')\n at [Source: (String)\"Document{{data-collections=[db1.test_collection]}}\"; line: 1, column: 9]\n\tat com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337)\n\tat com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:720)\n\tat com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2903)\n\tat com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1949)\n\tat com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:781)\n\tat io.debezium.document.JacksonReader.parseDocument(JacksonReader.java:115)\n\tat io.debezium.document.JacksonReader.parse(JacksonReader.java:102)\n\tat io.debezium.document.JacksonReader.read(JacksonReader.java:57)\n\tat io.debezium.pipeline.signal.Signal.process(Signal.java:136)\n\tat io.debezium.pipeline.signal.Signal.process(Signal.java:175)\n\tat io.debezium.pipeline.EventDispatcher$2.changeRecord(EventDispatcher.java:226)\n\tat io.debezium.connector.mongodb.MongoDbChangeStreamChangeRecordEmitter.createAndEmitChangeRecord(MongoDbChangeStreamChangeRecordEmitter.java:89)\n\tat io.debezium.connector.mongodb.MongoDbChangeStreamChangeRecordEmitter.emitCreateRecord(MongoDbChangeStreamChangeRecordEmitter.java:67)\n\tat io.debezium.connector.mongodb.MongoDbChangeStreamChangeRecordEmitter.emitCreateRecord(MongoDbChangeStreamChangeRecordEmitter.java:31)\n\tat io.debezium.pipeline.AbstractChangeRecordEmitter.emitChangeRecords(AbstractChangeRecordEmitter.java:38)\n\tat io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:215)\n\tat io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.readChangeStream(MongoDbStreamingChangeEventSource.java:356)\n\tat io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.lambda$streamChangesForReplicaSet$0(MongoDbStreamingChangeEventSource.java:123)\n\tat io.debezium.connector.mongodb.ConnectionContext$MongoPrimary.execute(ConnectionContext.java:288)\n\tat io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.streamChangesForReplicaSet(MongoDbStreamingChangeEventSource.java:121)\n\tat io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.execute(MongoDbStreamingChangeEventSource.java:102)\n\tat io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.execute(MongoDbStreamingChangeEventSource.java:59)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:172)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:139)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:108)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)",
              "exception_class": "com.fasterxml.jackson.core.JsonParseException",
              "exception_message": "Unrecognized token 'Document': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')\n at [Source: (String)\"Document{{data-collections=[db1.test_collection]}}\"; line: 1, column: 9]"
          },
          "source_host": "a4df4ad12e21",
          "method": "process",
          "level": "WARN",
          "message": "Signal 'test-22' has been received but the data 'Document{{data-collections=[db1.test_collection]}}' cannot be parsed",
          "mdc":
          {
              "dbz.connectorContext": "streaming",
              "dbz.connectorType": "MongoDB",
              "connector.context": "[db1-mongo-debezium-connector|task-0] ",
              "dbz.connectorName": "db1"
          },
          "@timestamp": "2022-04-20T20:46:05.885Z",
          "file": "Signal.java",
          "line_number": "140",
          "thread_name": "debezium-mongodbconnector-db1-change-event-source-coordinator",
          "@version": 1,
          "logger_name": "io.debezium.pipeline.signal.Signal",
          "class": "io.debezium.pipeline.signal.Signal"
      } 

       

       

      Do you see the same behaviour using the latest released Debezium version?

      yes 1.9.0

      Description

      The issue seems to be that we are passing what is not json to a json parser. This stems from when we get the collection value json and and parse out the three fields we are looking for (id, type, data) using Document.parse. However for the data value when calling an "execute-snapshot" signal the data contains a string of json. This causes issues later on as we are looping though the fields and calling .toString() as the string representation of Document is not json. 

      https://github.com/debezium/debezium/blob/main/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorConfig.java#L836

       Instead for Object's that are of type Document we should be calling the .toJson() to format the string correctly because later on we are expecting to parse json where the error is thrown:

      https://github.com/debezium/debezium/blob/main/debezium-core/src/main/java/io/debezium/pipeline/signal/Signal.java#L134

       

            Unassigned Unassigned
            chadthman Chad Marmon (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

              Created:
              Updated:
              Resolved: