-
Bug
-
Resolution: Done
-
Major
-
1.1.1.Final
-
None
We are seeing intermittent failures of connector tasks when there are MongoDB connection exceptions. There are no retries and the task is stopped until it can be manually restarted via the Connector REST API. Here is an example (this might be from the older version 1.1.0.Final):
org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped. at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42) at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.lambda$null$1(MongoDbStreamingChangeEventSource.java:102) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.apache.kafka.connect.errors.ConnectException: Error while attempting to read from oplog on 'rsAST17=rsAST17/sfast17m1-*******:27017' at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.lambda$establishConnectionToPrimary$3(MongoDbStreamingChangeEventSource.java:139) at io.debezium.connector.mongodb.ConnectionContext$MongoPrimary.execute(ConnectionContext.java:282) at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.lambda$null$1(MongoDbStreamingChangeEventSource.java:95) ... 5 more Caused by: com.mongodb.MongoSocketReadException: Prematurely reached end of stream at com.mongodb.internal.connection.SocketStream.read(SocketStream.java:112) at com.mongodb.internal.connection.InternalStreamConnection.receiveResponseBuffers(InternalStreamConnection.java:580) at com.mongodb.internal.connection.InternalStreamConnection.receiveMessage(InternalStreamConnection.java:445) at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:299) at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:259) at com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:99) at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:450) at com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:72) at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:226) at com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:269) at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:131) at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:123) at com.mongodb.operation.QueryBatchCursor.getMore(QueryBatchCursor.java:260) at com.mongodb.operation.QueryBatchCursor.tryHasNext(QueryBatchCursor.java:216) at com.mongodb.operation.QueryBatchCursor.tryNext(QueryBatchCursor.java:200) at com.mongodb.client.internal.MongoBatchCursorAdapter.tryNext(MongoBatchCursorAdapter.java:74) at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.readOplog(MongoDbStreamingChangeEventSource.java:190) at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.lambda$null$0(MongoDbStreamingChangeEventSource.java:96) at io.debezium.connector.mongodb.ConnectionContext$MongoPrimary.execute(ConnectionContext.java:278) ... 6 more
Looking at the code in ConnectionContext.java, there are no retries done because errorHandler.accept() will throw an exception in MongoDbStreamingChangeEventSource.java:
public void execute(String desc, Consumer<MongoClient> operation) { final Metronome errorMetronome = Metronome.sleeper(PAUSE_AFTER_ERROR, Clock.SYSTEM); while (true) { MongoClient primary = primaryConnectionSupplier.get(); try { operation.accept(primary); return; } catch (Throwable t) { errorHandler.accept(desc, t); if (!isRunning()) { throw new ConnectException("Operation failed and MongoDB primary termination requested", t); } try { errorMetronome.pause(); } catch (InterruptedException e) { // Interruption is not propagated } } } }
We are testing out a fix for this that will do retries.
- relates to
-
DBZ-2506 Debezium mongodb connector stop pulling change events after the change of the PRIMARY
- Closed