Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-2141

MongoDB connector is not resilient to Mongo connection errors

    XMLWordPrintable

Details

    Description

      We are seeing intermittent failures of connector tasks when there are MongoDB connection exceptions. There are no retries and the task is stopped until it can be manually restarted via the Connector REST API. Here is an example (this might be from the older version 1.1.0.Final):

      org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped. 
      at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42) 
      at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.lambda$null$1(MongoDbStreamingChangeEventSource.java:102) 
      at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) 
      at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
      at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
      at java.base/java.lang.Thread.run(Thread.java:834) 
      Caused by: org.apache.kafka.connect.errors.ConnectException: Error while attempting to read from oplog on 'rsAST17=rsAST17/sfast17m1-*******:27017' 
      at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.lambda$establishConnectionToPrimary$3(MongoDbStreamingChangeEventSource.java:139) 
      at io.debezium.connector.mongodb.ConnectionContext$MongoPrimary.execute(ConnectionContext.java:282) 
      at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.lambda$null$1(MongoDbStreamingChangeEventSource.java:95) ... 5 more 
      Caused by: com.mongodb.MongoSocketReadException: Prematurely reached end of stream at com.mongodb.internal.connection.SocketStream.read(SocketStream.java:112) 
      at com.mongodb.internal.connection.InternalStreamConnection.receiveResponseBuffers(InternalStreamConnection.java:580) 
      at com.mongodb.internal.connection.InternalStreamConnection.receiveMessage(InternalStreamConnection.java:445) 
      at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:299) 
      at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:259) 
      at com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:99) 
      at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:450) 
      at com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:72) 
      at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:226) 
      at com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:269) 
      at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:131) 
      at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:123) 
      at com.mongodb.operation.QueryBatchCursor.getMore(QueryBatchCursor.java:260) 
      at com.mongodb.operation.QueryBatchCursor.tryHasNext(QueryBatchCursor.java:216) 
      at com.mongodb.operation.QueryBatchCursor.tryNext(QueryBatchCursor.java:200) 
      at com.mongodb.client.internal.MongoBatchCursorAdapter.tryNext(MongoBatchCursorAdapter.java:74) 
      at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.readOplog(MongoDbStreamingChangeEventSource.java:190) 
      at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.lambda$null$0(MongoDbStreamingChangeEventSource.java:96) 
      at io.debezium.connector.mongodb.ConnectionContext$MongoPrimary.execute(ConnectionContext.java:278) ... 6 more 
      

      Looking at the code in ConnectionContext.java, there are no retries done because errorHandler.accept() will throw an exception in MongoDbStreamingChangeEventSource.java:

              public void execute(String desc, Consumer<MongoClient> operation) {
                  final Metronome errorMetronome = Metronome.sleeper(PAUSE_AFTER_ERROR, Clock.SYSTEM);
                  while (true) {
                      MongoClient primary = primaryConnectionSupplier.get();
                      try {
                          operation.accept(primary);
                          return;
                      }
                      catch (Throwable t) {
                          errorHandler.accept(desc, t);
                          if (!isRunning()) {
                              throw new ConnectException("Operation failed and MongoDB primary termination requested", t);
                          }
                          try {
                              errorMetronome.pause();
                          }
                          catch (InterruptedException e) {
                              // Interruption is not propagated
                          }
                      }
                  }
              }
      

      We are testing out a fix for this that will do retries.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              jgraf50 John Graf (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: