-
Bug
-
Resolution: Unresolved
-
Major
-
None
-
None
-
False
-
None
-
False
-
Important
What Debezium connector do you use and what version?
3.0.7.Final
What is the connector configuration?
mongodb.connection.string: ... topic.prefix: "my-connector" database.include.list: "my-db" collection.include.list: ".*outbox" errors.tolerance: "none" errors.log.enable: "true" errors.retry.timeout: "30000" errors.retry.delay.max.ms: "10000" key.converter: "io.debezium.converters.ByteArrayConverter" key.converter.delegate.converter.type: "org.apache.kafka.connect.storage.StringConverter" value.converter: "io.debezium.converters.ByteArrayConverter" value.converter.delegate.converter.type: "org.apache.kafka.connect.json.JsonConverter" value.converter.delegate.converter.type.schemas.enable: "false" transforms: "outbox" transforms.outbox.type: "io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter" transforms.outbox.route.topic.replacement: "${routedByValue}" transforms.outbox.collection.field.event.key: "key" transforms.outbox.collection.field.event.timestamp: "timestamp" transforms.outbox.route.by.field: "topic" transforms.outbox.route.tombstone.on.empty.payload: "true" transforms.outbox.table.op.invalid.behavior: "error" capture.scope: database capture.target: my-db
What is the captured database version and mode of deployment?
(E.g. on-premises, with a specific cloud provider, etc.)
On-premises, replica set, MongoDB-version 7
What behavior do you expect?
Debezium should reconnect after database became unavailable (depending on the configured parameters).
What behavior do you see?
Debezium throws an exception after database becomes unavailable and it is never retried to connect again.
2025-03-07 07:59:34,574 ERROR MongoDB|dummy|task0 Error while attempting to Checking change stream: Timed out while waiting for a server that matches ReadPreferenceServerSelector{readPreference=primary}. Client view of cluster state is {type=REPLICA_SET, servers=[{address=localhost:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.ConnectException: Connection refused}}] [io.debezium.connector.mongodb.connection.MongoDbConnections] com.mongodb.MongoTimeoutException: Timed out while waiting for a server that matches ReadPreferenceServerSelector{readPreference=primary}. Client view of cluster state is {type=REPLICA_SET, servers=[{address=localhost:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.ConnectException: Connection refused}}] at com.mongodb.internal.connection.BaseCluster.logAndThrowTimeoutException(BaseCluster.java:427) at com.mongodb.internal.connection.BaseCluster.lambda$selectServer$0(BaseCluster.java:154) at com.mongodb.internal.time.Timeout.lambda$onExistsAndExpired$16(Timeout.java:236) at com.mongodb.internal.time.Timeout.lambda$run$10(Timeout.java:201) at com.mongodb.internal.time.TimePoint.checkedCall(TimePoint.java:98) at com.mongodb.internal.time.Timeout.call(Timeout.java:174) at com.mongodb.internal.time.Timeout.run(Timeout.java:194) at com.mongodb.internal.time.Timeout.onExistsAndExpired(Timeout.java:233) at com.mongodb.internal.time.Timeout.onExpired(Timeout.java:226) at com.mongodb.internal.connection.BaseCluster.selectServer(BaseCluster.java:153) at com.mongodb.internal.connection.AbstractMultiServerCluster.selectServer(AbstractMultiServerCluster.java:59) at com.mongodb.internal.binding.ClusterBinding.getReadConnectionSource(ClusterBinding.java:82) at com.mongodb.client.internal.ClientSessionBinding.getConnectionSource(ClientSessionBinding.java:108) at com.mongodb.client.internal.ClientSessionBinding.getReadConnectionSource(ClientSessionBinding.java:88) at com.mongodb.internal.operation.SyncOperationHelper.withSuppliedResource(SyncOperationHelper.java:148) at com.mongodb.internal.operation.SyncOperationHelper.withSourceAndConnection(SyncOperationHelper.java:129) at com.mongodb.internal.operation.SyncOperationHelper.lambda$executeRetryableRead$4(SyncOperationHelper.java:195) at com.mongodb.internal.operation.SyncOperationHelper.lambda$decorateReadWithRetries$13(SyncOperationHelper.java:317) at com.mongodb.internal.async.function.RetryingSyncSupplier.get(RetryingSyncSupplier.java:67) at com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead(SyncOperationHelper.java:201) at com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead(SyncOperationHelper.java:180) at com.mongodb.internal.operation.AggregateOperationImpl.execute(AggregateOperationImpl.java:190) at com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:198) at com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:54) at com.mongodb.client.internal.MongoClusterImpl$OperationExecutorImpl.execute(MongoClusterImpl.java:358) at com.mongodb.client.internal.ChangeStreamIterableImpl.execute(ChangeStreamIterableImpl.java:223) at com.mongodb.client.internal.ChangeStreamIterableImpl.cursor(ChangeStreamIterableImpl.java:193) at io.debezium.connector.mongodb.connection.MongoDbConnection.lambda$isValidResumeToken$10(MongoDbConnection.java:219) at io.debezium.connector.mongodb.connection.MongoDbConnection.execute(MongoDbConnection.java:105) at io.debezium.connector.mongodb.connection.MongoDbConnection.isValidResumeToken(MongoDbConnection.java:215) at io.debezium.connector.mongodb.connection.MongoDbConnection.validateLogPosition(MongoDbConnection.java:205) at io.debezium.connector.mongodb.MongoDbConnectorTask.validate(MongoDbConnectorTask.java:292) at io.debezium.connector.mongodb.MongoDbConnectorTask.start(MongoDbConnectorTask.java:137) at io.debezium.connector.common.BaseSourceTask.startIfNeededAndPossible(BaseSourceTask.java:403) at io.debezium.connector.common.BaseSourceTask.poll(BaseSourceTask.java:313) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:466) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:354) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:226) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:281) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:79) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:238) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) at java.base/java.lang.Thread.run(Thread.java:1583) 2025-03-07 07:59:34,575 ERROR || WorkerSourceTask{id=mongo-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask] io.debezium.DebeziumException: Error while attempting to Checking change stream at io.debezium.connector.mongodb.connection.MongoDbConnections.lambda$eventSourcingErrorHandler$1(MongoDbConnections.java:53) at io.debezium.connector.mongodb.connection.MongoDbConnection.execute(MongoDbConnection.java:111) at io.debezium.connector.mongodb.connection.MongoDbConnection.isValidResumeToken(MongoDbConnection.java:215) at io.debezium.connector.mongodb.connection.MongoDbConnection.validateLogPosition(MongoDbConnection.java:205) at io.debezium.connector.mongodb.MongoDbConnectorTask.validate(MongoDbConnectorTask.java:292) at io.debezium.connector.mongodb.MongoDbConnectorTask.start(MongoDbConnectorTask.java:137) at io.debezium.connector.common.BaseSourceTask.startIfNeededAndPossible(BaseSourceTask.java:403) at io.debezium.connector.common.BaseSourceTask.poll(BaseSourceTask.java:313) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:466) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:354) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:226) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:281) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:79) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:238) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) at java.base/java.lang.Thread.run(Thread.java:1583) Caused by: com.mongodb.MongoTimeoutException: Timed out while waiting for a server that matches ReadPreferenceServerSelector{readPreference=primary}. Client view of cluster state is {type=REPLICA_SET, servers=[{address=localhost:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.ConnectException: Connection refused}}] at com.mongodb.internal.connection.BaseCluster.logAndThrowTimeoutException(BaseCluster.java:427) at com.mongodb.internal.connection.BaseCluster.lambda$selectServer$0(BaseCluster.java:154) at com.mongodb.internal.time.Timeout.lambda$onExistsAndExpired$16(Timeout.java:236) at com.mongodb.internal.time.Timeout.lambda$run$10(Timeout.java:201) at com.mongodb.internal.time.TimePoint.checkedCall(TimePoint.java:98) at com.mongodb.internal.time.Timeout.call(Timeout.java:174) at com.mongodb.internal.time.Timeout.run(Timeout.java:194) at com.mongodb.internal.time.Timeout.onExistsAndExpired(Timeout.java:233) at com.mongodb.internal.time.Timeout.onExpired(Timeout.java:226) at com.mongodb.internal.connection.BaseCluster.selectServer(BaseCluster.java:153) at com.mongodb.internal.connection.AbstractMultiServerCluster.selectServer(AbstractMultiServerCluster.java:59) at com.mongodb.internal.binding.ClusterBinding.getReadConnectionSource(ClusterBinding.java:82) at com.mongodb.client.internal.ClientSessionBinding.getConnectionSource(ClientSessionBinding.java:108) at com.mongodb.client.internal.ClientSessionBinding.getReadConnectionSource(ClientSessionBinding.java:88) at com.mongodb.internal.operation.SyncOperationHelper.withSuppliedResource(SyncOperationHelper.java:148) at com.mongodb.internal.operation.SyncOperationHelper.withSourceAndConnection(SyncOperationHelper.java:129) at com.mongodb.internal.operation.SyncOperationHelper.lambda$executeRetryableRead$4(SyncOperationHelper.java:195) at com.mongodb.internal.operation.SyncOperationHelper.lambda$decorateReadWithRetries$13(SyncOperationHelper.java:317) at com.mongodb.internal.async.function.RetryingSyncSupplier.get(RetryingSyncSupplier.java:67) at com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead(SyncOperationHelper.java:201) at com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead(SyncOperationHelper.java:180) at com.mongodb.internal.operation.AggregateOperationImpl.execute(AggregateOperationImpl.java:190) at com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:198) at com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:54) at com.mongodb.client.internal.MongoClusterImpl$OperationExecutorImpl.execute(MongoClusterImpl.java:358) at com.mongodb.client.internal.ChangeStreamIterableImpl.execute(ChangeStreamIterableImpl.java:223) at com.mongodb.client.internal.ChangeStreamIterableImpl.cursor(ChangeStreamIterableImpl.java:193) at io.debezium.connector.mongodb.connection.MongoDbConnection.lambda$isValidResumeToken$10(MongoDbConnection.java:219) at io.debezium.connector.mongodb.connection.MongoDbConnection.execute(MongoDbConnection.java:105) ... 17 more
After having a look at the source code, it seems that DBZ-7461 introduced this faulty behaviour. The problem seems to be in BaseSourceTask.startIfNeededAndPossible, because there is no exception handling for retries when MongoDbConnectorTask.start throws an exception.
DBZ-7461 seems to be introduced with 2.6. If I switch to 2.5, the exception handling seems to be working again.
Do you see the same behaviour using the latest released Debezium version?
(Ideally, also verify with latest Alpha/Beta/CR version)
Yes
Do you have the connector logs, ideally from start till finish?
(You might be asked later to provide DEBUG/TRACE level log)
See stacktrace
How to reproduce the issue using our tutorial deployment?
Setup MongoDB-connector and the shutdown database.
- is related to
-
DBZ-7461 Align snapshot modes for PostgreSQL, MySQL, Oracle
-
- Closed
-