Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-2141

MongoDB connector is not resilient to Mongo connection errors

    Details

      Description

      We are seeing intermittent failures of connector tasks when there are MongoDB connection exceptions. There are no retries and the task is stopped until it can be manually restarted via the Connector REST API. Here is an example (this might be from the older version 1.1.0.Final):

      org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped. 
      at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42) 
      at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.lambda$null$1(MongoDbStreamingChangeEventSource.java:102) 
      at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) 
      at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
      at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
      at java.base/java.lang.Thread.run(Thread.java:834) 
      Caused by: org.apache.kafka.connect.errors.ConnectException: Error while attempting to read from oplog on 'rsAST17=rsAST17/sfast17m1-*******:27017' 
      at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.lambda$establishConnectionToPrimary$3(MongoDbStreamingChangeEventSource.java:139) 
      at io.debezium.connector.mongodb.ConnectionContext$MongoPrimary.execute(ConnectionContext.java:282) 
      at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.lambda$null$1(MongoDbStreamingChangeEventSource.java:95) ... 5 more 
      Caused by: com.mongodb.MongoSocketReadException: Prematurely reached end of stream at com.mongodb.internal.connection.SocketStream.read(SocketStream.java:112) 
      at com.mongodb.internal.connection.InternalStreamConnection.receiveResponseBuffers(InternalStreamConnection.java:580) 
      at com.mongodb.internal.connection.InternalStreamConnection.receiveMessage(InternalStreamConnection.java:445) 
      at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:299) 
      at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:259) 
      at com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:99) 
      at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:450) 
      at com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:72) 
      at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:226) 
      at com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:269) 
      at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:131) 
      at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:123) 
      at com.mongodb.operation.QueryBatchCursor.getMore(QueryBatchCursor.java:260) 
      at com.mongodb.operation.QueryBatchCursor.tryHasNext(QueryBatchCursor.java:216) 
      at com.mongodb.operation.QueryBatchCursor.tryNext(QueryBatchCursor.java:200) 
      at com.mongodb.client.internal.MongoBatchCursorAdapter.tryNext(MongoBatchCursorAdapter.java:74) 
      at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.readOplog(MongoDbStreamingChangeEventSource.java:190) 
      at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.lambda$null$0(MongoDbStreamingChangeEventSource.java:96) 
      at io.debezium.connector.mongodb.ConnectionContext$MongoPrimary.execute(ConnectionContext.java:278) ... 6 more 
      

      Looking at the code in ConnectionContext.java, there are no retries done because errorHandler.accept() will throw an exception in MongoDbStreamingChangeEventSource.java:

              public void execute(String desc, Consumer<MongoClient> operation) {
                  final Metronome errorMetronome = Metronome.sleeper(PAUSE_AFTER_ERROR, Clock.SYSTEM);
                  while (true) {
                      MongoClient primary = primaryConnectionSupplier.get();
                      try {
                          operation.accept(primary);
                          return;
                      }
                      catch (Throwable t) {
                          errorHandler.accept(desc, t);
                          if (!isRunning()) {
                              throw new ConnectException("Operation failed and MongoDB primary termination requested", t);
                          }
                          try {
                              errorMetronome.pause();
                          }
                          catch (InterruptedException e) {
                              // Interruption is not propagated
                          }
                      }
                  }
              }
      

      We are testing out a fix for this that will do retries.

        Gliffy Diagrams

          Attachments

            Issue Links

              Activity

                People

                • Assignee:
                  Unassigned
                  Reporter:
                  jgraf50 John Graf
                • Votes:
                  0 Vote for this issue
                  Watchers:
                  3 Start watching this issue

                  Dates

                  • Created:
                    Updated:
                    Resolved: