Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-5973

MongoDB Incremental Snapshot not Working

    XMLWordPrintable

Details

    • Critical

    Description

      _emphasized text_In order to make your issue reports as actionable as possible, please provide the following information, depending on the issue type.

      Bug report

      For bug reports, provide this information, please:

      What Debezium connector do you use and what version?

      debezium-connector-mongodb-2.0.0.Final

      What is the connector configuration? 

      class: io.debezium.connector.mongodb.MongoDbConnector config: mongodb.hosts: > psmdb-db-entity-mongos-0.psmdb-db-entity.svc.cluster.local, psmdb-db-entity-mongos-1.psmdb-db-entity.svc.cluster.local mongodb.name: entity topic.prefix: entity mongodb.user: <user> mongodb.password: <password> mongodb.authsource: admin database.include.list: > PaymentEntities, debezium signal.data.collection: debezium.debeziumSignal collection.exclude.list: > .+.system.profile snapshot.max.threads: 2 schema.name.adjustment.mode: avro key.converter: io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url: <URL> value.converter: io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url:<URL>

       

      What is the captured database version and mode of depoyment?

      The connector is deployed on Kubernetes using KafkaConnector CRD from strimzi operator version 0.32.0

       

      What behaviour do you expect?

      I'm trying to enable incremental snapshots, and I'm expecting the connector to start the ad-hoc snapshot

      What behaviour do you see?

      I'm receiving an error once I send the signal to start the ad-hoc snapshot, 

      Do you see the same behaviour using the latest relesead Debezium version?

      (Ideally, also verify with latest Alpha/Beta/CR version)

      I have verified this error on 2.0.0 and I will try to test on latest Alpha

      Do you have the connector logs, ideally from start till finish?

      2023-01-02 23:43:13,102 INFO [source-connector-mongodb-entity|task-0] Connected metrics set to 'false' (io.debezium.pipeline.ChangeEventSourceCoordinator) [debezium-mongodbconnector-entity-change-event-source-coordinator]
      2023-01-02 23:43:13,270 ERROR [source-connector-mongodb-entity|task-0] WorkerSourceTask{id=source-connector-mongodb-entity-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-source-connector-mongodb-entity-0]
      org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
          at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:53)
          at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.readChangeStream(MongoDbStreamingChangeEventSource.java:270)
          at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.lambda$streamChangesForReplicaSet$0(MongoDbStreamingChangeEventSource.java:115)
          at io.debezium.connector.mongodb.ConnectionContext$MongoPrimary.execute(ConnectionContext.java:317)
          at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.streamChangesForReplicaSet(MongoDbStreamingChangeEventSource.java:114)
          at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.execute(MongoDbStreamingChangeEventSource.java:95)
          at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.execute(MongoDbStreamingChangeEventSource.java:51)
          at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:174)
          at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:141)
          at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:109)
          at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
          at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
          at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
          at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
          at java.base/java.lang.Thread.run(Thread.java:829)
      Caused by: org.apache.kafka.connect.errors.ConnectException: Error while processing event at offset {sec=1672672322, ord=1, transaction_id=null, resume_token=8263B2F442000000012B022C0100296E5A10044DDCEC313E9B4262B9718D3DB9067BEA46645F6964006463B2F4415C801E2812A60AA90004, incremental_snapshot_maximum_key=aced0005757200135b4c6a6176612e6c616e672e4f626a6563743b90ce589f1073296c0200007870000000017372000e6a6176612e6c616e672e4c6f6e673b8be490cc8f23df0200014a000576616c7565787200106a6176612e6c616e672e4e756d62657286ac951d0b94e08b02000078700000000000087cd5, incremental_snapshot_collections=[{"incremental_snapshot_collections_id":"rs0.PaymentEntities.Entities","incremental_snapshot_collections_additional_condition":null}], incremental_snapshot_primary_key=aced000570}
          at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:261)
          at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.readChangeStream(MongoDbStreamingChangeEventSource.java:260)
          ... 13 more
      Caused by: org.apache.kafka.connect.errors.ConnectException: Error while attempting to chunk query key for 'rs0.PaymentEntities.Entities'
          at io.debezium.connector.mongodb.MongoDbIncrementalSnapshotChangeEventSource.lambda$establishConnectionToPrimary$7(MongoDbIncrementalSnapshotChangeEventSource.java:542)
          at io.debezium.connector.mongodb.ConnectionContext$MongoPrimary.executeBlocking(ConnectionContext.java:385)
          at io.debezium.connector.mongodb.MongoDbIncrementalSnapshotChangeEventSource.createDataEventsForDataCollection(MongoDbIncrementalSnapshotChangeEventSource.java:413)
          at io.debezium.connector.mongodb.MongoDbIncrementalSnapshotChangeEventSource.readChunk(MongoDbIncrementalSnapshotChangeEventSource.java:284)
          at io.debezium.connector.mongodb.MongoDbIncrementalSnapshotChangeEventSource.addDataCollectionNamesToSnapshot(MongoDbIncrementalSnapshotChangeEventSource.java:350)
          at io.debezium.connector.mongodb.MongoDbIncrementalSnapshotChangeEventSource.addDataCollectionNamesToSnapshot(MongoDbIncrementalSnapshotChangeEventSource.java:49)
          at io.debezium.pipeline.signal.ExecuteSnapshot.arrived(ExecuteSnapshot.java:54)
          at io.debezium.pipeline.signal.Signal.process(Signal.java:138)
          at io.debezium.pipeline.signal.Signal.process(Signal.java:176)
          at io.debezium.pipeline.EventDispatcher$2.changeRecord(EventDispatcher.java:235)
          at io.debezium.connector.mongodb.MongoDbChangeRecordEmitter.createAndEmitChangeRecord(MongoDbChangeRecordEmitter.java:89)
          at io.debezium.connector.mongodb.MongoDbChangeRecordEmitter.emitCreateRecord(MongoDbChangeRecordEmitter.java:67)
          at io.debezium.connector.mongodb.MongoDbChangeRecordEmitter.emitCreateRecord(MongoDbChangeRecordEmitter.java:30)
          at io.debezium.pipeline.AbstractChangeRecordEmitter.emitChangeRecords(AbstractChangeRecordEmitter.java:39)
          at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:224)
          ... 14 more
      Caused by: org.bson.BsonInvalidOperationException: Value expected to be of type INT32 is of unexpected type INT64
          at org.bson.BsonValue.throwIfInvalidType(BsonValue.java:419)
          at org.bson.BsonValue.asInt32(BsonValue.java:94)
          at org.bson.BsonDocument.getInt32(BsonDocument.java:191)
          at io.debezium.connector.mongodb.MongoDbIncrementalSnapshotChangeEventSource.keyFromRow(MongoDbIncrementalSnapshotChangeEventSource.java:500)
          at io.debezium.connector.mongodb.MongoDbIncrementalSnapshotChangeEventSource.lambda$createDataEventsForDataCollection$6(MongoDbIncrementalSnapshotChangeEventSource.java:460)
          at io.debezium.connector.mongodb.ConnectionContext$MongoPrimary.executeBlocking(ConnectionContext.java:378)
          ... 27 more

       

      How to reproduce the issue using our tutorial deployment?

      Use this command to trigger an ad-hoc snapshot

      db.debeziumSignal.insertOne({
          "type" : "execute-snapshot",
          "data" : {
          "data-collections": ["PaymentEntities.Entities"],
          "type": "incremental"}
          }); 

      Feature request or enhancement

      For feature requests or enhancements, provide this information, please:

      Which use case/requirement will be addressed by the proposed feature?

      <Your answer>

      Implementation ideas (optional)

      <Your answer>

      Attachments

        Issue Links

          Activity

            People

              jcechace@redhat.com Jakub Čecháček
              malhomaid Mohammad Alhomaid (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: