diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ConnectionContext.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ConnectionContext.java index 7b731b05..fcb62bdb 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ConnectionContext.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ConnectionContext.java @@ -13,6 +13,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.function.Predicate; import java.util.function.Supplier; import org.apache.kafka.connect.errors.ConnectException; @@ -322,16 +323,25 @@ public void executeBlocking(String desc, BlockingConsumer operation * @return the collection identifiers; never null */ public List collections() { + return collections(s->true, collectionId -> true); + } + + public List collections(Predicate databaseFilter, Predicate collectionFilter) { String replicaSetName = replicaSet.replicaSetName(); // For each database, get the list of collections ... List collections = new ArrayList<>(); execute("get collections in databases", primary -> { collections.clear(); // in case we restarted Set databaseNames = databaseNames(); - MongoUtil.forEachDatabaseName(primary, databaseNames::add); + MongoUtil.forEachDatabaseName(primary, dbName -> { + if(databaseFilter.test(dbName)) databaseNames.add(dbName); + }); databaseNames.forEach(dbName -> { MongoUtil.forEachCollectionNameInDatabase(primary, dbName, collectionName -> { - collections.add(new CollectionId(replicaSetName, dbName, collectionName)); + CollectionId id = new CollectionId(replicaSetName, dbName, collectionName); + if(collectionFilter.test(id)){ + collections.add(id); + } }); }); }); diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/Replicator.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/Replicator.java index bb768e7b..85e3f8f2 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/Replicator.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/Replicator.java @@ -5,7 +5,6 @@ */ package io.debezium.connector.mongodb; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Queue; @@ -261,10 +260,7 @@ protected boolean performInitialSync() { final long syncStart = clock.currentTimeInMillis(); // We need to copy each collection, so put the collection IDs into a queue ... - final List collections = new ArrayList<>(); - primaryClient.collections().forEach(id -> { - if (databaseFilter.test(id.dbName()) && collectionFilter.test(id))collections.add(id); - }); + final List collections = primaryClient.collections(databaseFilter, collectionFilter); final Queue collectionsToCopy = new ConcurrentLinkedQueue<>(collections); final int numThreads = Math.min(collections.size(), context.getConnectionContext().maxNumberOfCopyThreads()); final CountDownLatch latch = new CountDownLatch(numThreads);