-
Bug
-
Resolution: Done
-
Blocker
-
None
-
None
-
False
-
None
-
False
Bug report
What Debezium connector do you use and what version?
Debezium MongoDB connector: 2.3.0.Final
What is the connector configuration?
"mongodb.connection.string": "${secrets:debezium-mongo-credentials}" "mongodb.user": "${secrets:debezium-mongo-credentials}" "mongodb.password": "${secrets:debezium-mongo-credentials}" "mongodb.members.auto.discover": true "mongodb.ssl.enabled": true # collections "collection.include.list": "debezium.signal,db.collection" # configuration "topic.prefix": "tst.dbz.mongo" "schema.name.adjustment.mode": "avro" "field.name.adjustment.mode": "avro" "snapshot.mode": "never" "skipped.operations": "t" "tombstones.on.delete": false # converters "key.converter": "org.apache.kafka.connect.json.JsonConverter" "key.converter.schemas.enable": false "value.converter": "io.confluent.connect.avro.AvroConverter" "value.converter.schemas.enable": false "value.converter.schema.registry.url": "http://schema-registry" # max size "max.batch.size": 20480 "max.queue.size": 81920 # debezium signaling - ad-hoc snapshot "signal.data.collection": "debezium.signal" "incremental.snapshot.chunk.size": 10000 # heartbeat "topic.heartbeat.prefix": "debezium-heartbeat" "heartbeat.interval.ms": "600000" # transforms "transforms": "unwrap" "transforms.unwrap.type": "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState" "transforms.unwrap.drop.tombstones": true "transforms.unwrap.delete.handling.mode": "none" "transforms.unwrap.add.headers.prefix": "" "transforms.unwrap.add.headers": "op,ts_ms,source.version,source.connector,source.name,source.ts_ms,source.snapshot,source.db,source.collection,source.ord"
What is the captured database version and mode of depoyment?
Atlas MongoDB: 4.4.19
What behaviour do you expect?
Debezium CDC with MongoDB New Document State Extraction transformation should work correctly without connector errors.
This error happened for Incremental initial load with MongoDB New Document State Extraction transformation. But it was reproduced for both Incremental initial load and regular CDC.
What behaviour do you see?
Regular CDC flow and Incremental initial load with MongoDB New Document State Extraction transformation failed with the error: `org.apache.avro.SchemaParseException` (see logs below).
Do you see the same behaviour using the latest relesead Debezium version?
Verified with Debezium MongoDB connector: 2.3.0.Final only.
Do you have the connector logs, ideally from start till finish?
MongoDB connector with MongoDB New Document State Extraction transformation can fail with similiar exceptions related to `org.apache.avro.SchemaParseException`.
Example of an exception:
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.convertTransformedRecord(AbstractWorkerSourceTask.java:482) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:392) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:359) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:833) Caused by: org.apache.avro.SchemaParseException: Illegal initial character: 0 at org.apache.avro.Schema.validateName(Schema.java:1567) at org.apache.avro.Schema.access$400(Schema.java:92) at org.apache.avro.Schema$Name.<init>(Schema.java:704) at org.apache.avro.Schema.createRecord(Schema.java:219) at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:965) at io.confluent.connect.avro.AvroData.fromConnectSchemaWithCycle(AvroData.java:1178) at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:955) at io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:1197) at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:976) at io.confluent.connect.avro.AvroData.fromConnectSchemaWithCycle(AvroData.java:1178) at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:955) at io.confluent.connect.avro.AvroData.fromConnectSchemaWithCycle(AvroData.java:1178) at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:897) at io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:1197) at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:976) at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:780) at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:765) at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:93) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.lambda$convertTransformedRecord$6(AbstractWorkerSourceTask.java:482) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214) ... 13 more
Another example of an exception:
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.convertTransformedRecord(AbstractWorkerSourceTask.java:482) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:392) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:359) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:833) Caused by: org.apache.avro.SchemaParseException: Illegal character in: api-version at org.apache.avro.Schema.validateName(Schema.java:1571) at org.apache.avro.Schema.access$400(Schema.java:92) at org.apache.avro.Schema$Field.<init>(Schema.java:549) at org.apache.avro.Schema$Field.<init>(Schema.java:588) at io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:1202) at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:976) at io.confluent.connect.avro.AvroData.fromConnectSchemaWithCycle(AvroData.java:1178) at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:955) at io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:1197) at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:976) at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:780) at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:765) at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:93) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.lambda$convertTransformedRecord$6(AbstractWorkerSourceTask.java:482) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214) ... 13 more
And then after restart it fails with:
org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped. at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:72) at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:125) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:833) Caused by: java.lang.NullPointerException: Cannot invoke "Object.hashCode()" because "key" is null at java.base/java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936) at java.base/java.util.concurrent.ConcurrentHashMap.containsKey(ConcurrentHashMap.java:964) at java.base/java.util.Collections$SetFromMap.contains(Collections.java:5683) at io.debezium.connector.mongodb.SourceInfo.isInitialSyncOngoing(SourceInfo.java:408) at io.debezium.connector.mongodb.SourceInfo.snapshot(SourceInfo.java:473) at io.debezium.connector.AbstractSourceInfoStructMaker.commonStruct(AbstractSourceInfoStructMaker.java:64) at io.debezium.connector.mongodb.MongoDbSourceInfoStructMaker.struct(MongoDbSourceInfoStructMaker.java:39) at io.debezium.connector.mongodb.MongoDbSourceInfoStructMaker.struct(MongoDbSourceInfoStructMaker.java:14) at io.debezium.connector.AbstractSourceInfo.struct(AbstractSourceInfo.java:77) at io.debezium.pipeline.CommonOffsetContext.getSourceInfo(CommonOffsetContext.java:24) at io.debezium.pipeline.notification.IncrementalSnapshotNotificationService.buildNotificationWith(IncrementalSnapshotNotificationService.java:154) at io.debezium.pipeline.notification.IncrementalSnapshotNotificationService.notifyInProgress(IncrementalSnapshotNotificationService.java:130) at io.debezium.connector.mongodb.MongoDbIncrementalSnapshotChangeEventSource.lambda$readChunk$8(MongoDbIncrementalSnapshotChangeEventSource.java:344) at io.debezium.connector.mongodb.MongoDbIncrementalSnapshotChangeEventSource.notifyReplicaSets(MongoDbIncrementalSnapshotChangeEventSource.java:439) at io.debezium.connector.mongodb.MongoDbIncrementalSnapshotChangeEventSource.readChunk(MongoDbIncrementalSnapshotChangeEventSource.java:342) at io.debezium.connector.mongodb.MongoDbIncrementalSnapshotChangeEventSource.init(MongoDbIncrementalSnapshotChangeEventSource.java:263) at io.debezium.connector.mongodb.MongoDbIncrementalSnapshotChangeEventSource.init(MongoDbIncrementalSnapshotChangeEventSource.java:57) at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$initStreamEvents$3(ChangeEventSourceCoordinator.java:221) at java.base/java.util.Optional.ifPresent(Optional.java:178) at io.debezium.pipeline.ChangeEventSourceCoordinator.initStreamEvents(ChangeEventSourceCoordinator.java:221) at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:203) at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:172) at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:118) ... 5 more
Based on the examples of exceptions above, Avro serialization can fail at any level of JSON document structure because of some illegal characters. Please pay attention that both parameters related to Avro names adjustment are already set:
"schema.name.adjustment.mode": "avro" "field.name.adjustment.mode": "avro"
But it does not help.
This error does not occur if MongoDB New Document State Extraction transformation disabled. Because whole JSON document is placed inside "after" field as a string (no schemas autogeneration, just default Envelope one).
How to reproduce the issue using our tutorial deployment?
- relates to
-
DBZ-8084 Naming adjustment mode documentation incorrect
- Open