Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-8763

MongoDB-Connector: MongoDB-connect is not retried

XMLWordPrintable

    • False
    • None
    • False
    • Important

      What Debezium connector do you use and what version?

      3.0.7.Final

      What is the connector configuration?

       

      mongodb.connection.string: ...
      topic.prefix: "my-connector"
      database.include.list: "my-db"
      collection.include.list: ".*outbox"
      errors.tolerance: "none"
      errors.log.enable: "true"
      errors.retry.timeout: "30000"
      errors.retry.delay.max.ms: "10000"
      key.converter: "io.debezium.converters.ByteArrayConverter"
      key.converter.delegate.converter.type: "org.apache.kafka.connect.storage.StringConverter"
      value.converter: "io.debezium.converters.ByteArrayConverter"
      value.converter.delegate.converter.type: "org.apache.kafka.connect.json.JsonConverter"
      value.converter.delegate.converter.type.schemas.enable: "false"
      transforms: "outbox"
      transforms.outbox.type: "io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter"
      transforms.outbox.route.topic.replacement: "${routedByValue}"
      transforms.outbox.collection.field.event.key: "key"
      transforms.outbox.collection.field.event.timestamp: "timestamp"
      transforms.outbox.route.by.field: "topic"
      transforms.outbox.route.tombstone.on.empty.payload: "true"
      transforms.outbox.table.op.invalid.behavior: "error"
      capture.scope: database
      capture.target: my-db 

       

      What is the captured database version and mode of deployment?

      (E.g. on-premises, with a specific cloud provider, etc.)

      On-premises, replica set, MongoDB-version 7

      What behavior do you expect?

      Debezium should reconnect after database became unavailable (depending on the configured parameters). 

      What behavior do you see?

      Debezium throws an exception after database becomes unavailable and it is never retried to connect again.

      2025-03-07 07:59:34,574 ERROR  MongoDB|dummy|task0  Error while attempting to Checking change stream: Timed out while waiting for a server that matches ReadPreferenceServerSelector{readPreference=primary}. Client view of cluster state is {type=REPLICA_SET, servers=[{address=localhost:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.ConnectException: Connection refused}}]   [io.debezium.connector.mongodb.connection.MongoDbConnections]
      com.mongodb.MongoTimeoutException: Timed out while waiting for a server that matches ReadPreferenceServerSelector{readPreference=primary}. Client view of cluster state is {type=REPLICA_SET, servers=[{address=localhost:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.ConnectException: Connection refused}}]
          at com.mongodb.internal.connection.BaseCluster.logAndThrowTimeoutException(BaseCluster.java:427)
          at com.mongodb.internal.connection.BaseCluster.lambda$selectServer$0(BaseCluster.java:154)
          at com.mongodb.internal.time.Timeout.lambda$onExistsAndExpired$16(Timeout.java:236)
          at com.mongodb.internal.time.Timeout.lambda$run$10(Timeout.java:201)
          at com.mongodb.internal.time.TimePoint.checkedCall(TimePoint.java:98)
          at com.mongodb.internal.time.Timeout.call(Timeout.java:174)
          at com.mongodb.internal.time.Timeout.run(Timeout.java:194)
          at com.mongodb.internal.time.Timeout.onExistsAndExpired(Timeout.java:233)
          at com.mongodb.internal.time.Timeout.onExpired(Timeout.java:226)
          at com.mongodb.internal.connection.BaseCluster.selectServer(BaseCluster.java:153)
          at com.mongodb.internal.connection.AbstractMultiServerCluster.selectServer(AbstractMultiServerCluster.java:59)
          at com.mongodb.internal.binding.ClusterBinding.getReadConnectionSource(ClusterBinding.java:82)
          at com.mongodb.client.internal.ClientSessionBinding.getConnectionSource(ClientSessionBinding.java:108)
          at com.mongodb.client.internal.ClientSessionBinding.getReadConnectionSource(ClientSessionBinding.java:88)
          at com.mongodb.internal.operation.SyncOperationHelper.withSuppliedResource(SyncOperationHelper.java:148)
          at com.mongodb.internal.operation.SyncOperationHelper.withSourceAndConnection(SyncOperationHelper.java:129)
          at com.mongodb.internal.operation.SyncOperationHelper.lambda$executeRetryableRead$4(SyncOperationHelper.java:195)
          at com.mongodb.internal.operation.SyncOperationHelper.lambda$decorateReadWithRetries$13(SyncOperationHelper.java:317)
          at com.mongodb.internal.async.function.RetryingSyncSupplier.get(RetryingSyncSupplier.java:67)
          at com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead(SyncOperationHelper.java:201)
          at com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead(SyncOperationHelper.java:180)
          at com.mongodb.internal.operation.AggregateOperationImpl.execute(AggregateOperationImpl.java:190)
          at com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:198)
          at com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:54)
          at com.mongodb.client.internal.MongoClusterImpl$OperationExecutorImpl.execute(MongoClusterImpl.java:358)
          at com.mongodb.client.internal.ChangeStreamIterableImpl.execute(ChangeStreamIterableImpl.java:223)
          at com.mongodb.client.internal.ChangeStreamIterableImpl.cursor(ChangeStreamIterableImpl.java:193)
          at io.debezium.connector.mongodb.connection.MongoDbConnection.lambda$isValidResumeToken$10(MongoDbConnection.java:219)
          at io.debezium.connector.mongodb.connection.MongoDbConnection.execute(MongoDbConnection.java:105)
          at io.debezium.connector.mongodb.connection.MongoDbConnection.isValidResumeToken(MongoDbConnection.java:215)
          at io.debezium.connector.mongodb.connection.MongoDbConnection.validateLogPosition(MongoDbConnection.java:205)
          at io.debezium.connector.mongodb.MongoDbConnectorTask.validate(MongoDbConnectorTask.java:292)
          at io.debezium.connector.mongodb.MongoDbConnectorTask.start(MongoDbConnectorTask.java:137)
          at io.debezium.connector.common.BaseSourceTask.startIfNeededAndPossible(BaseSourceTask.java:403)
          at io.debezium.connector.common.BaseSourceTask.poll(BaseSourceTask.java:313)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:466)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:354)
          at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:226)
          at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:281)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:79)
          at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:238)
          at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
          at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
          at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
          at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
          at java.base/java.lang.Thread.run(Thread.java:1583)
      2025-03-07 07:59:34,575 ERROR  ||  WorkerSourceTask{id=mongo-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted   [org.apache.kafka.connect.runtime.WorkerTask]
      io.debezium.DebeziumException: Error while attempting to Checking change stream
          at io.debezium.connector.mongodb.connection.MongoDbConnections.lambda$eventSourcingErrorHandler$1(MongoDbConnections.java:53)
          at io.debezium.connector.mongodb.connection.MongoDbConnection.execute(MongoDbConnection.java:111)
          at io.debezium.connector.mongodb.connection.MongoDbConnection.isValidResumeToken(MongoDbConnection.java:215)
          at io.debezium.connector.mongodb.connection.MongoDbConnection.validateLogPosition(MongoDbConnection.java:205)
          at io.debezium.connector.mongodb.MongoDbConnectorTask.validate(MongoDbConnectorTask.java:292)
          at io.debezium.connector.mongodb.MongoDbConnectorTask.start(MongoDbConnectorTask.java:137)
          at io.debezium.connector.common.BaseSourceTask.startIfNeededAndPossible(BaseSourceTask.java:403)
          at io.debezium.connector.common.BaseSourceTask.poll(BaseSourceTask.java:313)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:466)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:354)
          at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:226)
          at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:281)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:79)
          at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:238)
          at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
          at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
          at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
          at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
          at java.base/java.lang.Thread.run(Thread.java:1583)
      Caused by: com.mongodb.MongoTimeoutException: Timed out while waiting for a server that matches ReadPreferenceServerSelector{readPreference=primary}. Client view of cluster state is {type=REPLICA_SET, servers=[{address=localhost:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.ConnectException: Connection refused}}]
          at com.mongodb.internal.connection.BaseCluster.logAndThrowTimeoutException(BaseCluster.java:427)
          at com.mongodb.internal.connection.BaseCluster.lambda$selectServer$0(BaseCluster.java:154)
          at com.mongodb.internal.time.Timeout.lambda$onExistsAndExpired$16(Timeout.java:236)
          at com.mongodb.internal.time.Timeout.lambda$run$10(Timeout.java:201)
          at com.mongodb.internal.time.TimePoint.checkedCall(TimePoint.java:98)
          at com.mongodb.internal.time.Timeout.call(Timeout.java:174)
          at com.mongodb.internal.time.Timeout.run(Timeout.java:194)
          at com.mongodb.internal.time.Timeout.onExistsAndExpired(Timeout.java:233)
          at com.mongodb.internal.time.Timeout.onExpired(Timeout.java:226)
          at com.mongodb.internal.connection.BaseCluster.selectServer(BaseCluster.java:153)
          at com.mongodb.internal.connection.AbstractMultiServerCluster.selectServer(AbstractMultiServerCluster.java:59)
          at com.mongodb.internal.binding.ClusterBinding.getReadConnectionSource(ClusterBinding.java:82)
          at com.mongodb.client.internal.ClientSessionBinding.getConnectionSource(ClientSessionBinding.java:108)
          at com.mongodb.client.internal.ClientSessionBinding.getReadConnectionSource(ClientSessionBinding.java:88)
          at com.mongodb.internal.operation.SyncOperationHelper.withSuppliedResource(SyncOperationHelper.java:148)
          at com.mongodb.internal.operation.SyncOperationHelper.withSourceAndConnection(SyncOperationHelper.java:129)
          at com.mongodb.internal.operation.SyncOperationHelper.lambda$executeRetryableRead$4(SyncOperationHelper.java:195)
          at com.mongodb.internal.operation.SyncOperationHelper.lambda$decorateReadWithRetries$13(SyncOperationHelper.java:317)
          at com.mongodb.internal.async.function.RetryingSyncSupplier.get(RetryingSyncSupplier.java:67)
          at com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead(SyncOperationHelper.java:201)
          at com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead(SyncOperationHelper.java:180)
          at com.mongodb.internal.operation.AggregateOperationImpl.execute(AggregateOperationImpl.java:190)
          at com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:198)
          at com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:54)
          at com.mongodb.client.internal.MongoClusterImpl$OperationExecutorImpl.execute(MongoClusterImpl.java:358)
          at com.mongodb.client.internal.ChangeStreamIterableImpl.execute(ChangeStreamIterableImpl.java:223)
          at com.mongodb.client.internal.ChangeStreamIterableImpl.cursor(ChangeStreamIterableImpl.java:193)
          at io.debezium.connector.mongodb.connection.MongoDbConnection.lambda$isValidResumeToken$10(MongoDbConnection.java:219)
          at io.debezium.connector.mongodb.connection.MongoDbConnection.execute(MongoDbConnection.java:105)
          ... 17 more 

      After having a look at the source code, it seems that DBZ-7461 introduced this faulty behaviour. The problem seems to be in BaseSourceTask.startIfNeededAndPossible, because there is no exception handling for retries when MongoDbConnectorTask.start throws an exception.

      DBZ-7461 seems to be introduced with 2.6. If I switch to 2.5, the exception handling seems to be working again.

      Do you see the same behaviour using the latest released Debezium version?

      (Ideally, also verify with latest Alpha/Beta/CR version)

      Yes

      Do you have the connector logs, ideally from start till finish?

      (You might be asked later to provide DEBUG/TRACE level log)

      See stacktrace

      How to reproduce the issue using our tutorial deployment?

      Setup MongoDB-connector and the shutdown database.

       

              Unassigned Unassigned
              wicksim Simon Wick
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

                Created:
                Updated: