Details
-
Bug
-
Resolution: Obsolete
-
Critical
-
None
-
1.8.1.Final
-
None
-
False
-
None
-
False
Description
1.8.1.Final
Occasionally, mongo kafka connector will be throw error, I found same issue: https://jira.mongodb.org/browse/KAFKA-219
When i use config as above link, i still get this error, it not ignore the error, because it throws error in mongo. The connector cannot restarted,but if i change mongo.name setting, no errors happened.
So, the mongo change event cannot be large than 16m, But I still get this error.
The exception stack as follows:
[2022-04-08 15:29:14,732] ERROR WorkerSourceTask{id=dbzcompact_source_mongo_mss_json-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask) org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped. at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42) at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.streamChangesForReplicaSet(MongoDbStreamingChangeEventSource.java:133) at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.execute(MongoDbStreamingChangeEventSource.java:102) at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.execute(MongoDbStreamingChangeEventSource.java:59) at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:172) at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:139) at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:108) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.apache.kafka.connect.errors.ConnectException: Error while attempting to read from oplog on 'rplset/xxxx:27017,xxxx:27017' at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.lambda$establishConnectionToPrimary$3(MongoDbStreamingChangeEventSource.java:181) at io.debezium.connector.mongodb.ConnectionContext$MongoPrimary.execute(ConnectionContext.java:292) at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.streamChangesForReplicaSet(MongoDbStreamingChangeEventSource.java:121) ... 10 more Caused by: com.mongodb.MongoQueryException: Query failed with error code 10334 and error message 'BSONObj size: 16862627 (0x1014DA3) is invalid. Size must be between 0 and 16793600(16MB) First element: _id: { _data: "82624FC0760000000B2B022C0100296E5A1004ECE333084BE04540879771E9F3A49E1E46645F69640064624FBF416F26E32ADE1B56050004" }' on server 10.17.34.216:27017 at com.mongodb.internal.operation.QueryHelper.translateCommandException(QueryHelper.java:29) at com.mongodb.internal.operation.QueryBatchCursor.lambda$getMore$1(QueryBatchCursor.java:282) at com.mongodb.internal.operation.QueryBatchCursor$ResourceManager.executeWithConnection(QueryBatchCursor.java:512) at com.mongodb.internal.operation.QueryBatchCursor.getMore(QueryBatchCursor.java:270) at com.mongodb.internal.operation.QueryBatchCursor.tryHasNext(QueryBatchCursor.java:223) at com.mongodb.internal.operation.QueryBatchCursor.lambda$tryNext$0(QueryBatchCursor.java:206) at com.mongodb.internal.operation.QueryBatchCursor$ResourceManager.execute(QueryBatchCursor.java:397) at com.mongodb.internal.operation.QueryBatchCursor.tryNext(QueryBatchCursor.java:205) at com.mongodb.internal.operation.ChangeStreamBatchCursor$3.apply(ChangeStreamBatchCursor.java:102) at com.mongodb.internal.operation.ChangeStreamBatchCursor$3.apply(ChangeStreamBatchCursor.java:98) at com.mongodb.internal.operation.ChangeStreamBatchCursor.resumeableOperation(ChangeStreamBatchCursor.java:195) at com.mongodb.internal.operation.ChangeStreamBatchCursor.tryNext(ChangeStreamBatchCursor.java:98) at com.mongodb.client.internal.MongoChangeStreamCursorImpl.tryNext(MongoChangeStreamCursorImpl.java:78) at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.readChangeStream(MongoDbStreamingChangeEventSource.java:339) at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.lambda$streamChangesForReplicaSet$0(MongoDbStreamingChangeEventSource.java:123) at io.debezium.connector.mongodb.ConnectionContext$MongoPrimary.execute(ConnectionContext.java:288) ... 11 more [2022-04-08 15:29:14,732] ERROR WorkerSourceTask{id=dbzcompact_source_mongo_mss_json-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [2022-04-08 15:29:14,733] INFO Stopping down connector (io.debezium.connector.common.BaseSourceTask)
- connector cannot restart, must change mongo.name .
- After change connector config: "mongo.name", it re-snapshot, and works well.
- When two days passed, the connector restart normally with the same mongo.name, because mongo resume token is not invalid, the connector is start with new snapshot. I known it as i saw the messages from kafka .
- I cannot recovery normally even we set max.batch.size = 1, so may exist a bug ???
- I'm looking for one way to restart connector normally.
- Or, could you provide one way that connector restart with same mongo.name (even though the way will happed with new snapshot can be allowed).