Details
-
Bug
-
Resolution: Obsolete
-
Major
-
None
-
1.8.1.Final
-
None
-
False
-
None
-
False
-
Description
It is necessary to monitor a non-sharded collection in a two-shards MongoDB 4.0.23 sharded cluster.
Config Server Replica Set
db.createRole( { "role" : "kafkaConnectorRole", "privileges" : [ { "resource" : { db: "", collection: "" }, "actions" : [ "find", "changeStream" ] } ], "roles" : [ { "role": "read", "db": "local" }, { "role": "read", "db": "config" } ] }); db.runCommand(\{createRole:"listDatabases",privileges:[{resource:{cluster:true}, actions:["listDatabases"]}],roles:[]})
Assign both roles to the user.
Shards
db.createRole( { "role" : "kafkaConnectorRole", "privileges" : [ { "resource" : { db: "", collection: "" }, "actions" : [ "find", "changeStream" ] } ], "roles" : [ { "role": "read", "db": "local" } ] });
Assign role to monitoring user.
The following error appears because the connector tries to monitor a sharded collection which has not been "whitelisted" with collection.include.list=myDB.myColl and database.include.list=myDB:
Database Primary shard
org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped. at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42) at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.streamChangesForReplicaSet(MongoDbStreamingChangeEventSource.java:133) at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.execute(MongoDbStreamingChangeEventSource.java:102) at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.execute(MongoDbStreamingChangeEventSource.java:59) at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:172) at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:139) at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:108) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.connect.errors.ConnectException: Error while attempting to read from oplog on 'rsPrimary/host:port' at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.lambda$establishConnectionToPrimary$3(MongoDbStreamingChangeEventSource.java:181) at io.debezium.connector.mongodb.ConnectionContext$MongoPrimary.execute(ConnectionContext.java:292) at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.streamChangesForReplicaSet(MongoDbStreamingChangeEventSource.java:121) ... 10 more Caused by: com.mongodb.MongoCommandException: Command failed with error 4567 (Location4567): 'from collection (myOtherDB.myShardedColl) cannot be sharded' on server host:port. The full response is {"operationTime": {"$timestamp": {"t": 1648558162, "i": 6}}, "ok": 0.0, "errmsg": "from collection (myOtherDB.myShardedColl) cannot be sharded", "code": 4567, "codeName": "Location4567", "$gleStats": {"lastOpTime": {"$timestamp": {"t": 0, "i": 0}}, "electionId": {"$oid": "7fffffff0000000000000015"}}, "lastCommittedOpTime": {"$timestamp": {"t": 1648558162, "i": 6}}, "$configServerState": {"opTime": {"ts": {"$timestamp": {"t": 1648558159, "i": 1}}, "t": 26}}, "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1648558162, "i": 7}}, "signature": {"hash": {"$binary": {"base64": "GAw0dWdxX/bztwHPIbEgtfb3N+o=", "subType": "00"}}, "keyId": 7031255913348464643}}} at com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:195) at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:400) at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:324) at com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:114) at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:603) at com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:81) at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:252) at com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:214) at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:123) at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:113) at com.mongodb.internal.operation.CommandOperationHelper.executeCommand(CommandOperationHelper.java:328) at com.mongodb.internal.operation.CommandOperationHelper.executeCommand(CommandOperationHelper.java:318) at com.mongodb.internal.operation.CommandOperationHelper.executeCommandWithConnection(CommandOperationHelper.java:201) at com.mongodb.internal.operation.CommandOperationHelper.lambda$executeCommand$4(CommandOperationHelper.java:189) at com.mongodb.internal.operation.OperationHelper.withReadConnectionSource(OperationHelper.java:583) at com.mongodb.internal.operation.CommandOperationHelper.executeCommand(CommandOperationHelper.java:189) at com.mongodb.internal.operation.AggregateOperationImpl.execute(AggregateOperationImpl.java:195) at com.mongodb.internal.operation.ChangeStreamOperation$1.call(ChangeStreamOperation.java:323) at com.mongodb.internal.operation.ChangeStreamOperation$1.call(ChangeStreamOperation.java:319) at com.mongodb.internal.operation.OperationHelper.withReadConnectionSource(OperationHelper.java:583) at com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:319) at com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:58) at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:184) at com.mongodb.client.internal.ChangeStreamIterableImpl.execute(ChangeStreamIterableImpl.java:204) at com.mongodb.client.internal.ChangeStreamIterableImpl.cursor(ChangeStreamIterableImpl.java:158) at com.mongodb.client.internal.ChangeStreamIterableImpl.iterator(ChangeStreamIterableImpl.java:153) at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.readChangeStream(MongoDbStreamingChangeEventSource.java:331) at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.lambda$streamChangesForReplicaSet$0(MongoDbStreamingChangeEventSource.java:123) at io.debezium.connector.mongodb.ConnectionContext$MongoPrimary.execute(ConnectionContext.java:288)
Configuration used:
connector.class=io.debezium.connector.mongodb.MongoDbConnector collection.include.list=myDB.myColl sanitize.field.names=true mongodb.password=pass mongodb.user=user mongodb.name=mytopic field.exclude.list=myDB.myColl._id value.converter=org.apache.kafka.connect.json.JsonConverter mongodb.hosts=host:port key.converter=org.apache.kafka.connect.json.JsonConverter database.include.list=myDB snapshot.mode=never