Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-182

Doesn't redo initial sync after restart Mongodb connector

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Done
    • Icon: Optional Optional
    • 0.4
    • 0.4
    • mongodb-connector
    • None
    • Hide

      Create a new connector within the same group as the Mongodb connector which are still doing initial sync. Kafka will trigger a rebalance. After restart, the Mongodb connector will not redo the initial sync.

      Show
      Create a new connector within the same group as the Mongodb connector which are still doing initial sync. Kafka will trigger a rebalance. After restart, the Mongodb connector will not redo the initial sync.

      Mongodb connector doesn't redo the initial sync after interruption. There is some finding which may help locate the problem. The Mongodb replicator records the last timestamp of the oplog before finishing the initial sync. After restart or rebalance, it retrieves the timestamp which is greater than the first oplog timestamp, it will just skip the initial sync regardless the last initial sync is finished or not.

          public void run() {
              if (this.running.compareAndSet(false, true)) {
                  try {
                      if (establishConnectionToPrimary()) {
                          if (isInitialSyncExpected()) {
                              recordCurrentOplogPosition();
                              if (!performInitialSync()) {
                                  return;
                              }
                          }
                          readOplog();
                      }
                  } finally {
                      this.running.set(false);
                  }
              }
          }
      
          protected boolean isInitialSyncExpected() {
              boolean performSnapshot = true;
              if (source.hasOffset(rsName)) {
                  logger.info("Found existing offset for replica set '{}' at {}", rsName, source.lastOffset(rsName));
                  performSnapshot = false;
                  if (context.performSnapshotEvenIfNotNeeded()) {
                      logger.info("Configured to performing initial sync of replica set '{}'", rsName);
                      performSnapshot = true;
                  } else {
                      // Look to see if our last recorded offset still exists in the oplog.
                      BsonTimestamp lastRecordedTs = source.lastOffsetTimestamp(rsName);
                      AtomicReference<BsonTimestamp> firstExistingTs = new AtomicReference<>();
                      primaryClient.execute("get oplog position", primary -> {
                          MongoCollection<Document> oplog = primary.getDatabase("local").getCollection("oplog.rs");
                          Document firstEvent = oplog.find().sort(new Document("$natural", 1)).limit(1).first(); // may be null
                          firstExistingTs.set(SourceInfo.extractEventTimestamp(firstEvent));
                      });
                      BsonTimestamp firstAvailableTs = firstExistingTs.get();
                      if ( firstAvailableTs == null ) {
                          logger.info("The oplog contains no entries, so performing initial sync of replica set '{}'", rsName);
                          performSnapshot = true;
                      } else if ( lastRecordedTs.compareTo(firstAvailableTs) < 0 ) {
                          // The last recorded timestamp is *before* the first existing oplog event, which means there is
                          // almost certainly some history lost since we last processed the oplog ...
                          logger.info("Initial sync is required since the oplog for replica set '{}' starts at {}, which is later than the timestamp of the last offset {}",
                                      rsName, firstAvailableTs, lastRecordedTs);
                          performSnapshot = true;
                      }
                  }
              } else {
                  logger.info("No existing offset found for replica set '{}', starting initial sync", rsName);
                  performSnapshot = true;
              }
              return performSnapshot;
          }
      

              rhauch Randall Hauch (Inactive)
              wangshao_jira barten barten (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

                Created:
                Updated:
                Resolved: