Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-6809

Test Avro adjustment for MongoDb connector and ExtractNewDocumentState SMT

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Done
    • Icon: Blocker Blocker
    • 2.5.0.Beta1
    • None
    • mongodb-connector
    • None
    • False
    • None
    • False

      Bug report

      What Debezium connector do you use and what version?

      Debezium MongoDB connector: 2.3.0.Final

      What is the connector configuration?

       

      "mongodb.connection.string": "${secrets:debezium-mongo-credentials}"
      "mongodb.user": "${secrets:debezium-mongo-credentials}"
      "mongodb.password": "${secrets:debezium-mongo-credentials}"
      "mongodb.members.auto.discover": true
      "mongodb.ssl.enabled": true
      
      # collections
      "collection.include.list": "debezium.signal,db.collection"
      
      # configuration
      "topic.prefix": "tst.dbz.mongo"
      "schema.name.adjustment.mode": "avro"
      "field.name.adjustment.mode": "avro"
      "snapshot.mode": "never"
      "skipped.operations": "t"
      "tombstones.on.delete": false
      
      # converters
      "key.converter": "org.apache.kafka.connect.json.JsonConverter"
      "key.converter.schemas.enable": false
      "value.converter": "io.confluent.connect.avro.AvroConverter"
      "value.converter.schemas.enable": false
      "value.converter.schema.registry.url": "http://schema-registry"
      
      # max size
      "max.batch.size": 20480
      "max.queue.size": 81920
      
      # debezium signaling - ad-hoc snapshot
      "signal.data.collection": "debezium.signal"
      "incremental.snapshot.chunk.size": 10000
      
      # heartbeat
      "topic.heartbeat.prefix": "debezium-heartbeat"
      "heartbeat.interval.ms": "600000"
      
      # transforms
      "transforms": "unwrap"
      "transforms.unwrap.type": "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState"
      "transforms.unwrap.drop.tombstones": true
      "transforms.unwrap.delete.handling.mode": "none"
      "transforms.unwrap.add.headers.prefix": ""
      "transforms.unwrap.add.headers": "op,ts_ms,source.version,source.connector,source.name,source.ts_ms,source.snapshot,source.db,source.collection,source.ord" 

       

      What is the captured database version and mode of depoyment?

      Atlas MongoDB: 4.4.19

      What behaviour do you expect?

      Debezium CDC with MongoDB New Document State Extraction transformation should work correctly without connector errors. 

      This error happened for Incremental initial load with MongoDB New Document State Extraction transformation. But it was reproduced for both Incremental initial load and regular CDC.

      What behaviour do you see?

      Regular CDC flow and Incremental initial load with MongoDB New Document State Extraction transformation failed with the error: `org.apache.avro.SchemaParseException` (see logs below).

      Do you see the same behaviour using the latest relesead Debezium version?

      Verified with Debezium MongoDB connector: 2.3.0.Final only.

      Do you have the connector logs, ideally from start till finish?

      MongoDB connector with MongoDB New Document State Extraction transformation can fail with similiar exceptions related to `org.apache.avro.SchemaParseException`.
      Example of an exception:

      org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230)
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.convertTransformedRecord(AbstractWorkerSourceTask.java:482)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:392)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:359)
          at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
          at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75)
          at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
          at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
          at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
          at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
          at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
          at java.base/java.lang.Thread.run(Thread.java:833)
      Caused by: org.apache.avro.SchemaParseException: Illegal initial character: 0
          at org.apache.avro.Schema.validateName(Schema.java:1567)
          at org.apache.avro.Schema.access$400(Schema.java:92)
          at org.apache.avro.Schema$Name.<init>(Schema.java:704)
          at org.apache.avro.Schema.createRecord(Schema.java:219)
          at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:965)
          at io.confluent.connect.avro.AvroData.fromConnectSchemaWithCycle(AvroData.java:1178)
          at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:955)
          at io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:1197)
          at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:976)
          at io.confluent.connect.avro.AvroData.fromConnectSchemaWithCycle(AvroData.java:1178)
          at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:955)
          at io.confluent.connect.avro.AvroData.fromConnectSchemaWithCycle(AvroData.java:1178)
          at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:897)
          at io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:1197)
          at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:976)
          at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:780)
          at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:765)
          at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:93)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.lambda$convertTransformedRecord$6(AbstractWorkerSourceTask.java:482)
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
          ... 13 more 

      Another example of an exception:

      org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230)
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.convertTransformedRecord(AbstractWorkerSourceTask.java:482)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:392)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:359)
          at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
          at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75)
          at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
          at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
          at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
          at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
          at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
          at java.base/java.lang.Thread.run(Thread.java:833)
      Caused by: org.apache.avro.SchemaParseException: Illegal character in: api-version
          at org.apache.avro.Schema.validateName(Schema.java:1571)
          at org.apache.avro.Schema.access$400(Schema.java:92)
          at org.apache.avro.Schema$Field.<init>(Schema.java:549)
          at org.apache.avro.Schema$Field.<init>(Schema.java:588)
          at io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:1202)
          at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:976)
          at io.confluent.connect.avro.AvroData.fromConnectSchemaWithCycle(AvroData.java:1178)
          at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:955)
          at io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:1197)
          at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:976)
          at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:780)
          at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:765)
          at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:93)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.lambda$convertTransformedRecord$6(AbstractWorkerSourceTask.java:482)
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
          ... 13 more 

      And then after restart it fails with:

      org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
          at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:72)
          at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:125)
          at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
          at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
          at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
          at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
          at java.base/java.lang.Thread.run(Thread.java:833)
      Caused by: java.lang.NullPointerException: Cannot invoke "Object.hashCode()" because "key" is null
          at java.base/java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936)
          at java.base/java.util.concurrent.ConcurrentHashMap.containsKey(ConcurrentHashMap.java:964)
          at java.base/java.util.Collections$SetFromMap.contains(Collections.java:5683)
          at io.debezium.connector.mongodb.SourceInfo.isInitialSyncOngoing(SourceInfo.java:408)
          at io.debezium.connector.mongodb.SourceInfo.snapshot(SourceInfo.java:473)
          at io.debezium.connector.AbstractSourceInfoStructMaker.commonStruct(AbstractSourceInfoStructMaker.java:64)
          at io.debezium.connector.mongodb.MongoDbSourceInfoStructMaker.struct(MongoDbSourceInfoStructMaker.java:39)
          at io.debezium.connector.mongodb.MongoDbSourceInfoStructMaker.struct(MongoDbSourceInfoStructMaker.java:14)
          at io.debezium.connector.AbstractSourceInfo.struct(AbstractSourceInfo.java:77)
          at io.debezium.pipeline.CommonOffsetContext.getSourceInfo(CommonOffsetContext.java:24)
          at io.debezium.pipeline.notification.IncrementalSnapshotNotificationService.buildNotificationWith(IncrementalSnapshotNotificationService.java:154)
          at io.debezium.pipeline.notification.IncrementalSnapshotNotificationService.notifyInProgress(IncrementalSnapshotNotificationService.java:130)
          at io.debezium.connector.mongodb.MongoDbIncrementalSnapshotChangeEventSource.lambda$readChunk$8(MongoDbIncrementalSnapshotChangeEventSource.java:344)
          at io.debezium.connector.mongodb.MongoDbIncrementalSnapshotChangeEventSource.notifyReplicaSets(MongoDbIncrementalSnapshotChangeEventSource.java:439)
          at io.debezium.connector.mongodb.MongoDbIncrementalSnapshotChangeEventSource.readChunk(MongoDbIncrementalSnapshotChangeEventSource.java:342)
          at io.debezium.connector.mongodb.MongoDbIncrementalSnapshotChangeEventSource.init(MongoDbIncrementalSnapshotChangeEventSource.java:263)
          at io.debezium.connector.mongodb.MongoDbIncrementalSnapshotChangeEventSource.init(MongoDbIncrementalSnapshotChangeEventSource.java:57)
          at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$initStreamEvents$3(ChangeEventSourceCoordinator.java:221)
          at java.base/java.util.Optional.ifPresent(Optional.java:178)
          at io.debezium.pipeline.ChangeEventSourceCoordinator.initStreamEvents(ChangeEventSourceCoordinator.java:221)
          at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:203)
          at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:172)
          at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:118)
          ... 5 more 

      Based on the examples of exceptions above, Avro serialization can fail at any level of JSON document structure because of some illegal characters. Please pay attention that both parameters related to Avro names adjustment are already set:

      "schema.name.adjustment.mode": "avro"
      "field.name.adjustment.mode": "avro" 

      But it does not help.

      This error does not occur if MongoDB New Document State Extraction transformation disabled. Because whole JSON document is placed inside "after" field as a string (no schemas autogeneration, just default Envelope one). 

      How to reproduce the issue using our tutorial deployment?

      Using docker-compose-mongodb.yaml

            jcechace@redhat.com Jakub Čecháček
            arttaraskin Artem Taraskin (Inactive)
            Votes:
            1 Vote for this issue
            Watchers:
            5 Start watching this issue

              Created:
              Updated:
              Resolved: