Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-6522

Unable to use resume token of some documents with composite IDs

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Done
    • Icon: Major Major
    • 2.7.0.Final
    • 2.2.1.Final
    • mongodb-connector
    • None
    • False
    • None
    • False

      MongoDB connector is not able to use resume token of an document with NumberLong data type in composite ID.
       
      DBZ version 2.2.1.Final
      MongoDB version 4.2.24 (replica set)

      Observed behaviour of DBZ connector

      DBZ triggers unnecessary snapshot while restarted

      1. Connector is watching a collection where the ID contains attribute of type NumberLong.
      2. Documents are inserted and a resume token is stored in the offsets topic.
      3. Connector is restarted.
      4. Connector tries to use resume token, but fails with log Invalid resume token present for replica set 'rs27017, snapshot will be performed' and performs unnecessary snapshot.

       
      When there is a permanent traffic in the replica set manipulating with documents having NumberLong in the ID, then the connector is unable to start and keeps performing snapshots again and again.

      1. The connector starts, it gets resume token before performing a snapshot.
      2. Performs snapshot
      3. Tries to use the resume token to start consuming changes and fails with following error.
        2023-06-02 19:42:10,919 ERROR Error while attempting to read from change stream on 'rs27017': Command failed with error 280 (ChangeStreamFatalError): 'cannot resume stream; the resume token was not found. {_data: "82647A46120000000A2B022C0100296E5A10042409C0859BCF45ABBE0E0BD72AB4040346465F696400461E69002B5E1E666F6F002F0100000000000004", _typeBits: BinData(0, "824002")}' on server mongo.com:27017. The full response is {"errorLabels": ["NonResumableChangeStreamError"[], "operationTime": {"$timestamp": {"t": 1685734930, "i": 11}}, "ok": 0.0, "errmsg": "cannot resume stream; the resume token was not found. {_data: \"82647A46120000000A2B022C0100296E5A10042409C0859BCF45ABBE0E0BD72AB4040346465F696400461E69002B5E1E666F6F002F0100000000000004\", _typeBits: BinData(0, \"824002\")}", "code": 280, "codeName": "ChangeStreamFatalError", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1685734930, "i": 12}}, "signature": {"hash": {"$binary": {"base64": "SU5wpY4BeMs0Ywhz2iuYowY+izc=", "subType": "00"}}, "keyId": 7217851605041807362}}} (io.debezium.connector.mongodb.connection.MongoDbConnection) [debezium-mongodbconnector-mongodb_device_vulnerability_events_003-change-event-source-coordinator[]
        com.mongodb.MongoCommandException: Command failed with error 280 (ChangeStreamFatalError): 'cannot resume stream; the resume token was not found. {_data: "82647A46120000000A2B022C0100296E5A10042409C0859BCF45ABBE0E0BD72AB4040346465F696400461E69002B5E1E666F6F002F0100000000000004", _typeBits: BinData(0, "824002")}' on server mdb-001-fi.eu-west-1a.fi2.wandera.cz:27017. The full response is {"errorLabels": ["NonResumableChangeStreamError"[], "operationTime": {"$timestamp": {"t": 1685734930, "i": 11}}, "ok": 0.0, "errmsg": "cannot resume stream; the resume token was not found. {_data: \"82647A46120000000A2B022C0100296E5A10042409C0859BCF45ABBE0E0BD72AB4040346465F696400461E69002B5E1E666F6F002F0100000000000004\", _typeBits: BinData(0, \"824002\")}", "code": 280, "codeName": "ChangeStreamFatalError", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1685734930, "i": 12}}, "signature": {"hash": {"$binary": {"base64": "SU5wpY4BeMs0Ywhz2iuYowY+izc=", "subType": "00"}}, "keyId": 7217851605041807362}}}
            at com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:198)
            at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:413)
            at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:337)
            at com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:116)
            at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:643)
            at com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:71)
            at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:240)
            at com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:226)
            at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:126)
            at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:116)
            at com.mongodb.internal.connection.DefaultServer$OperationCountTrackingConnection.command(DefaultServer.java:345)
            at com.mongodb.internal.operation.CommandOperationHelper.createReadCommandAndExecute(CommandOperationHelper.java:232)
            at com.mongodb.internal.operation.CommandOperationHelper.lambda$executeRetryableRead$4(CommandOperationHelper.java:214)
            at com.mongodb.internal.operation.OperationHelper.lambda$withSourceAndConnection$2(OperationHelper.java:566)
            at com.mongodb.internal.operation.OperationHelper.withSuppliedResource(OperationHelper.java:591)
            at com.mongodb.internal.operation.OperationHelper.lambda$withSourceAndConnection$3(OperationHelper.java:565)
            at com.mongodb.internal.operation.OperationHelper.withSuppliedResource(OperationHelper.java:591)
            at com.mongodb.internal.operation.OperationHelper.withSourceAndConnection(OperationHelper.java:564)
            at com.mongodb.internal.operation.CommandOperationHelper.lambda$executeRetryableRead$5(CommandOperationHelper.java:211)
            at com.mongodb.internal.async.function.RetryingSyncSupplier.get(RetryingSyncSupplier.java:65)
            at com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead(CommandOperationHelper.java:217)
            at com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead(CommandOperationHelper.java:197)
            at com.mongodb.internal.operation.AggregateOperationImpl.execute(AggregateOperationImpl.java:195)
            at com.mongodb.internal.operation.ChangeStreamOperation$1.call(ChangeStreamOperation.java:379)
            at com.mongodb.internal.operation.ChangeStreamOperation$1.call(ChangeStreamOperation.java:375)
            at com.mongodb.internal.operation.OperationHelper.withReadConnectionSource(OperationHelper.java:529)
            at com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:375)
            at com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:59)
            at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:191)
            at com.mongodb.client.internal.ChangeStreamIterableImpl.execute(ChangeStreamIterableImpl.java:237)
            at com.mongodb.client.internal.ChangeStreamIterableImpl.cursor(ChangeStreamIterableImpl.java:189)
            at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.readChangeStream(MongoDbStreamingChangeEventSource.java:158)
            at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.lambda$streamChangesForReplicaSet$0(MongoDbStreamingChangeEventSource.java:91)
            at io.debezium.connector.mongodb.connection.MongoDbConnection.lambda$execute$3(MongoDbConnection.java:104)
            at io.debezium.connector.mongodb.connection.MongoDbConnection.execute(MongoDbConnection.java:120)
            at io.debezium.connector.mongodb.connection.MongoDbConnection.execute(MongoDbConnection.java:103)
            at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.streamChangesForReplicaSet(MongoDbStreamingChangeEventSource.java:90)
            at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.execute(MongoDbStreamingChangeEventSource.java:79)
            at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.execute(MongoDbStreamingChangeEventSource.java:41)
            at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:174)
            at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:141)
            at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:109)
            at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
            at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
            at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
            at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
            at java.base/java.lang.Thread.run(Thread.java:829)
        
      4. The connector is restarted and repeats from step 1

      Details

      For some IDs, MongoDB will produce resume tokens with _typeBits attribute next to the _data attribute. The typeBits "Encodes info needed to restore the original BSONTypes from a KeyString" (link).

      • For example for this update (mongosh)
        db.getCollection('dummy').insertOne(
            {
                "_id" : {
                    "foo": 1,
                    "bar": 2
                }
            }
        )
        
      • The resume token will look like this
        {
            "_data": "82647A41A6000000012B022C0100296E5A10042409C0859BCF45ABBE0E0BD72AB4040346465F696400461E666F6F002B021E626172002B04000004",
            "_typeBits":
            {
                "$binary":
                {
                    "base64": "gkAB",
                    "subType": "00"
                }
            }
        }
        
      • The DBZ is using just the _data attribute when resuming the stream. It works in most cases and it would also work for the example above (no exception thrown):
        String uri = "...";
        try (MongoClient mongoClient = MongoClients.create(uri)) {
        	ChangeStreamIterable<BsonDocument> stream = mongoClient.watch(BsonDocument.class);
        
        	var data = "82647A41A6000000012B022C0100296E5A10042409C0859BCF45ABBE0E0BD72AB4040346465F696400461E666F6F002B021E626172002B04000004";
        	var resumeToken = new BsonDocument("_data", new BsonString(data));
        	stream.resumeAfter(resumeToken);
        
        	var cursor = stream.cursor();
        }
        

      But when the ID contains some "complex" data type, like NumberLong, then the resume token fails:

      • For this update
        db.getCollection('dummy').insertOne(
            {
                "_id" : {
                    "foo": NumberLong(2147483648)
                }
            }
        )
        
      • having this resume token
        {
            "_data": "82647A4319000000012B022C0100296E5A10042409C0859BCF45ABBE0E0BD72AB4040346465F696400461E666F6F002F0100000000000004",
            "_typeBits":
            {
                "$binary":
                {
                    "base64": "gYA=",
                    "subType": "00"
                }
            }
        }
        
      • the following code fails
        String uri = "...";
        try (MongoClient mongoClient = MongoClients.create(uri)) {
        	ChangeStreamIterable<BsonDocument> stream = mongoClient.watch(BsonDocument.class);
        
        	var data = "82647A4319000000012B022C0100296E5A10042409C0859BCF45ABBE0E0BD72AB4040346465F696400461E666F6F002F0100000000000004";
        	var resumeToken = new BsonDocument("_data", new BsonString(data));
        	stream.resumeAfter(resumeToken);
        
        	var cursor = stream.cursor();
        }
        
      • with an error
        Command failed with error 280 (ChangeStreamFatalError): 'cannot resume stream; the resume token was not found. {_data: "82647A4319000000012B022C0100296E5A10042409C0859BCF45ABBE0E0BD72AB4040346465F696400461E666F6F002F0100000000000004", _typeBits: BinData(0, "8180")}' on server mongo.com:27017. The full response is {"errorLabels": ["NonResumableChangeStreamError"], "operationTime": {"$timestamp": {"t": 1685734252, "i": 1}}, "ok": 0.0, "errmsg": "cannot resume stream; the resume token was not found. {_data: \"82647A4319000000012B022C0100296E5A10042409C0859BCF45ABBE0E0BD72AB4040346465F696400461E666F6F002F0100000000000004\", _typeBits: BinData(0, \"8180\")}", "code": 280, "codeName": "ChangeStreamFatalError", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1685734252, "i": 1}}, "signature": {"hash": {"$binary": {"base64": "8/tZ+m+MtRw+yxnXN5shVg8udeY=", "subType": "00"}}, "keyId": 7217851605041807362}}}
        
      • When trying to resume from the same resume token with _typeBites attribute, it works without throwing error:
        String uri = "...";
        try (MongoClient mongoClient = MongoClients.create(uri)) {
        	ChangeStreamIterable<BsonDocument> stream = mongoClient.watch(BsonDocument.class);
        	
        	var data = "82647A4319000000012B022C0100296E5A10042409C0859BCF45ABBE0E0BD72AB4040346465F696400461E666F6F002F0100000000000004";
        	var resumeToken = new BsonDocument("_data", new BsonString(data));
        	var typeBits = new BsonBinary(HexFormat.of().parseHex("8180"));
        	resumeToken.append("_typeBits", typeBits);
        	stream.resumeAfter(resumeToken);
        	
        	var cursor = stream.cursor();
        }
        

      (Tested with mongodb-driver-sync version 4.7.1 which should be the same as used by DBZ.)

              anmohant Anisha Mohanty
              jaroslav.kaspar@jamf.com Jaroslav Kaspar (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

                Created:
                Updated:
                Resolved: