Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-2141

MongoDB connector is not resilient to Mongo connection errors

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Done
    • Icon: Major Major
    • 1.2.1.Final
    • 1.1.1.Final
    • mongodb-connector
    • None

      We are seeing intermittent failures of connector tasks when there are MongoDB connection exceptions. There are no retries and the task is stopped until it can be manually restarted via the Connector REST API. Here is an example (this might be from the older version 1.1.0.Final):

      org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped. 
      at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42) 
      at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.lambda$null$1(MongoDbStreamingChangeEventSource.java:102) 
      at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) 
      at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
      at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
      at java.base/java.lang.Thread.run(Thread.java:834) 
      Caused by: org.apache.kafka.connect.errors.ConnectException: Error while attempting to read from oplog on 'rsAST17=rsAST17/sfast17m1-*******:27017' 
      at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.lambda$establishConnectionToPrimary$3(MongoDbStreamingChangeEventSource.java:139) 
      at io.debezium.connector.mongodb.ConnectionContext$MongoPrimary.execute(ConnectionContext.java:282) 
      at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.lambda$null$1(MongoDbStreamingChangeEventSource.java:95) ... 5 more 
      Caused by: com.mongodb.MongoSocketReadException: Prematurely reached end of stream at com.mongodb.internal.connection.SocketStream.read(SocketStream.java:112) 
      at com.mongodb.internal.connection.InternalStreamConnection.receiveResponseBuffers(InternalStreamConnection.java:580) 
      at com.mongodb.internal.connection.InternalStreamConnection.receiveMessage(InternalStreamConnection.java:445) 
      at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:299) 
      at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:259) 
      at com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:99) 
      at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:450) 
      at com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:72) 
      at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:226) 
      at com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:269) 
      at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:131) 
      at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:123) 
      at com.mongodb.operation.QueryBatchCursor.getMore(QueryBatchCursor.java:260) 
      at com.mongodb.operation.QueryBatchCursor.tryHasNext(QueryBatchCursor.java:216) 
      at com.mongodb.operation.QueryBatchCursor.tryNext(QueryBatchCursor.java:200) 
      at com.mongodb.client.internal.MongoBatchCursorAdapter.tryNext(MongoBatchCursorAdapter.java:74) 
      at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.readOplog(MongoDbStreamingChangeEventSource.java:190) 
      at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.lambda$null$0(MongoDbStreamingChangeEventSource.java:96) 
      at io.debezium.connector.mongodb.ConnectionContext$MongoPrimary.execute(ConnectionContext.java:278) ... 6 more 
      

      Looking at the code in ConnectionContext.java, there are no retries done because errorHandler.accept() will throw an exception in MongoDbStreamingChangeEventSource.java:

              public void execute(String desc, Consumer<MongoClient> operation) {
                  final Metronome errorMetronome = Metronome.sleeper(PAUSE_AFTER_ERROR, Clock.SYSTEM);
                  while (true) {
                      MongoClient primary = primaryConnectionSupplier.get();
                      try {
                          operation.accept(primary);
                          return;
                      }
                      catch (Throwable t) {
                          errorHandler.accept(desc, t);
                          if (!isRunning()) {
                              throw new ConnectException("Operation failed and MongoDB primary termination requested", t);
                          }
                          try {
                              errorMetronome.pause();
                          }
                          catch (InterruptedException e) {
                              // Interruption is not propagated
                          }
                      }
                  }
              }
      

      We are testing out a fix for this that will do retries.

            Unassigned Unassigned
            jgraf50 John Graf (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

              Created:
              Updated:
              Resolved: