-
Bug
-
Resolution: Cannot Reproduce
-
Major
-
None
-
2.3.4.Final, 2.4.0.Final
-
None
-
False
-
-
False
Bug report
For bug reports, provide this information, please:
What Debezium connector do you use and what version?
the problem occurs for both 2.3.4 and 2.4.0
What is the connector configuration?
{
"connector.class":"io.debezium.connector.mongodb.MongoDbConnector",
"transforms.outbox.collection.field.event.key":"key",
"transforms.outbox.type":"io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter",
"mongodb.connection.string":"mongodb://REDACTED",
"value.converter.delegate.converter.type.schemas.enable":"false",
"tasks.max":"1",
"transforms":"outbox",
"capture.mode":"change_streams",
"transforms.outbox.collection.field.event.timestamp":"timestamp",
"tombstones.on.delete":"false",
"topic.prefix":"test.",
"transforms.outbox.route.topic.replacement":"test.${routedByValue}",
"poll.interval.ms":"1",
"transforms.outbox.collection.expand.json.payload":"true",
"value.converter":"io.debezium.converters.ByteArrayConverter",
"transforms.outbox.route.by.field":"topic",
"event.processing.failure.handling.mode":"warn", # the connector stops otherwise
"value.converter.delegate.converter.type":"org.apache.kafka.connect.json.JsonConverter",
"transforms.outbox.collection.fields.additional.placement":"source:header,ctx:header,timestamp:header",
"tracing.span.context.field":"ctx",
"value.converter.schemas.enable":"false",
"name":"outbox",
"errors.tolerance":"all",
"database.include.list":"dbname"
}
What is the captured database version and mode of deployment?
self-hosted MongoDB 6.0.8 Community deployed as a sharded cluster (3 mongoses, 3 replica sets, 3 config servers)
What behaviour do you expect?
the error not to occur
What behaviour do you see?
org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:67)
at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.readChangeStream(MongoDbStreamingChangeEventSource.java:200)
at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.lambda$streamChangesForReplicaSet$0(MongoDbStreamingChangeEventSource.java:99)
at io.debezium.connector.mongodb.connection.MongoDbConnection.lambda$execute$3(MongoDbConnection.java:104)
at io.debezium.connector.mongodb.connection.MongoDbConnection.execute(MongoDbConnection.java:120)
at io.debezium.connector.mongodb.connection.MongoDbConnection.execute(MongoDbConnection.java:103)
at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.streamChangesForReplicaSet(MongoDbStreamingChangeEventSource.java:98)
at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.lambda$streamChangesForReplicaSets$1(MongoDbStreamingChangeEventSource.java:119)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: Error while processing event at offset {sec=1698749605, ord=11, transaction_id=null, resume_token=826540DCA50000000B2B022C0100296E5A1004726401CBEBF74D1989E42F10B5ACFF69463C5F6964003C61383131646366662D363835352D346365362D613263392D633565346636396466356466000004}
at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:339)
at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.readChangeStream(MongoDbStreamingChangeEventSource.java:190)
... 11 more
Caused by: java.lang.NullPointerException
at io.debezium.connector.mongodb.MongoDbSourceInfoStructMaker.struct(MongoDbSourceInfoStructMaker.java:41)
at io.debezium.connector.mongodb.MongoDbSourceInfoStructMaker.struct(MongoDbSourceInfoStructMaker.java:14)
at io.debezium.connector.AbstractSourceInfo.struct(AbstractSourceInfo.java:77)
at io.debezium.pipeline.CommonOffsetContext.getSourceInfo(CommonOffsetContext.java:24)
at io.debezium.connector.mongodb.recordemitter.MongoDbChangeRecordEmitter.createAndEmitChangeRecord(MongoDbChangeRecordEmitter.java:87)
at io.debezium.connector.mongodb.recordemitter.MongoDbChangeRecordEmitter.emitUpdateRecord(MongoDbChangeRecordEmitter.java:74)
at io.debezium.connector.mongodb.recordemitter.MongoDbChangeRecordEmitter.emitUpdateRecord(MongoDbChangeRecordEmitter.java:33)
at io.debezium.pipeline.AbstractChangeRecordEmitter.emitChangeRecords(AbstractChangeRecordEmitter.java:49)
at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:296)
... 12 more
From time to time the connector stops with the above error. It is possible to skip (the connector won't stop) the error by setting `event.processing.failure.handling.mode` to `warn`, then the error will be logged as below and the connector will continue to run without losing any data.
Error while processing event at offset {sec=1702550260, ord=17, transaction_id=null, resume_token=82657ADAF4000000112B022C0100296E5A10046BF76AF7B4434174811DF0C273DEC9A2463C6B6579003C33393063663533342D333961312D346562632D396530342D66323231306335633165343500645F69640064657ADAF42781DA39A2C443960004} (io.debezium.pipeline.EventDispatcher:312)
I was not able to reproduce this error deterministically, but it happens quite often - twice a week in our development environment.
I would guess that it's some concurrency issue related to the `tasks.max` setting, as it seems to happen only when `tasks.max` is smaller than the number of replica sets.
Do you have the connector logs, ideally from start till finish?
(You might be asked later to provide DEBUG/TRACE level log)
yes, just before the error log there are a bunch of logs like
[2023-12-22 14:20:24,562] INFO WorkerSourceTask{id=outbox-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:510)
[2023-12-22 14:20:24,015] INFO [Producer clientId=connector-producer-outbox-0] Resetting the last seen epoch of partition topicname-10 to 17 since the associated topicId changed from null to ho0-HK6gSoW9lN-2PrT9-w (org.apache.kafka.clients.Metadata:401)
just after the error log there are logs about connecting to the mongo and cluster discovery