-
Bug
-
Resolution: Done
-
Major
-
1.9.0.Final
-
None
-
False
-
None
-
False
-
Bug report
For bug reports, provide this information, please:
What Debezium connector do you use and what version?
io.debezium.connector.mongodb.MongoDbConnector v1.9.0
What is the connector configuration?
connector.class=io.debezium.connector.mongodb.MongoDbConnector
database.include.list=db1,debezium
key.converter=org.apache.kafka.connect.json.JsonConverter
mongodb.hosts=mongodb://mongo1,mongo2,mongo3
mongodb.name=db1
mongodb.password=<password>
mongodb.user=<user>
signal.data.collection=debezium.signal
snapshot.mode=initial
tasks.max=1
value.converter=org.apache.kafka.connect.json.JsonConverter
What is the captured database version and mode of depoyment?
Running in local docker container test environment. MongoDB v5.0.6
What behaviour do you expect?
An incremental snapshot taken of the tables specified
What behaviour do you see?
Throws error when a record is inserted into my signaling collection table debezium.signal . My collection looks like:
{ "_id": "test-22", "type": "execute-snapshot", "data": { "data-collections": ["db1.test_table"] } }
Error thrown:
{ "exception": { "stacktrace": "com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Document': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')\n at [Source: (String)\"Document{{data-collections=[db1.test_collection]}}\"; line: 1, column: 9]\n\tat com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337)\n\tat com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:720)\n\tat com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2903)\n\tat com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1949)\n\tat com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:781)\n\tat io.debezium.document.JacksonReader.parseDocument(JacksonReader.java:115)\n\tat io.debezium.document.JacksonReader.parse(JacksonReader.java:102)\n\tat io.debezium.document.JacksonReader.read(JacksonReader.java:57)\n\tat io.debezium.pipeline.signal.Signal.process(Signal.java:136)\n\tat io.debezium.pipeline.signal.Signal.process(Signal.java:175)\n\tat io.debezium.pipeline.EventDispatcher$2.changeRecord(EventDispatcher.java:226)\n\tat io.debezium.connector.mongodb.MongoDbChangeStreamChangeRecordEmitter.createAndEmitChangeRecord(MongoDbChangeStreamChangeRecordEmitter.java:89)\n\tat io.debezium.connector.mongodb.MongoDbChangeStreamChangeRecordEmitter.emitCreateRecord(MongoDbChangeStreamChangeRecordEmitter.java:67)\n\tat io.debezium.connector.mongodb.MongoDbChangeStreamChangeRecordEmitter.emitCreateRecord(MongoDbChangeStreamChangeRecordEmitter.java:31)\n\tat io.debezium.pipeline.AbstractChangeRecordEmitter.emitChangeRecords(AbstractChangeRecordEmitter.java:38)\n\tat io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:215)\n\tat io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.readChangeStream(MongoDbStreamingChangeEventSource.java:356)\n\tat io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.lambda$streamChangesForReplicaSet$0(MongoDbStreamingChangeEventSource.java:123)\n\tat io.debezium.connector.mongodb.ConnectionContext$MongoPrimary.execute(ConnectionContext.java:288)\n\tat io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.streamChangesForReplicaSet(MongoDbStreamingChangeEventSource.java:121)\n\tat io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.execute(MongoDbStreamingChangeEventSource.java:102)\n\tat io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.execute(MongoDbStreamingChangeEventSource.java:59)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:172)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:139)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:108)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)", "exception_class": "com.fasterxml.jackson.core.JsonParseException", "exception_message": "Unrecognized token 'Document': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')\n at [Source: (String)\"Document{{data-collections=[db1.test_collection]}}\"; line: 1, column: 9]" }, "source_host": "a4df4ad12e21", "method": "process", "level": "WARN", "message": "Signal 'test-22' has been received but the data 'Document{{data-collections=[db1.test_collection]}}' cannot be parsed", "mdc": { "dbz.connectorContext": "streaming", "dbz.connectorType": "MongoDB", "connector.context": "[db1-mongo-debezium-connector|task-0] ", "dbz.connectorName": "db1" }, "@timestamp": "2022-04-20T20:46:05.885Z", "file": "Signal.java", "line_number": "140", "thread_name": "debezium-mongodbconnector-db1-change-event-source-coordinator", "@version": 1, "logger_name": "io.debezium.pipeline.signal.Signal", "class": "io.debezium.pipeline.signal.Signal" }
Do you see the same behaviour using the latest released Debezium version?
yes 1.9.0
Description
The issue seems to be that we are passing what is not json to a json parser. This stems from when we get the collection value json and and parse out the three fields we are looking for (id, type, data) using Document.parse. However for the data value when calling an "execute-snapshot" signal the data contains a string of json. This causes issues later on as we are looping though the fields and calling .toString() as the string representation of Document is not json.
Instead for Object's that are of type Document we should be calling the .toJson() to format the string correctly because later on we are expecting to parse json where the error is thrown: