Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-5102

Server side database and collection filtering on MongoDB change stream

XMLWordPrintable

    • Icon: Enhancement Enhancement
    • Resolution: Done
    • Icon: Major Major
    • 2.2.0.Alpha3
    • None
    • mongodb-connector
    • None

      Feature request or enhancement

      Which use case/requirement will be addressed by the proposed feature?

      Until now, in mongodb debezium source connector there is no server side database and collection filtering (only operation type). So, any change document in mongo is transferred to the instance where debezium runs, and then, once reached the instance, debezium applies all the database and collection filters logic.

      The proposed feature is to add database and collection filters on the change stream to avoid unnecessary network transfer between mongo and debezium instance. Also, resources for filtering messages can be saved in debezium instance.

      Implementation ideas (optional)

      In MongoDbStreamingChangeEventSource.java line 308:

      filters = Filters.and(filters, Filters.ne("clusterTime", oplogStart));

      can be modified to:

      Bson collectionFilters = Filters.in("ns", Arrays.asList(nsList));
      filters = Filters.and(filters, collectionFilters, Filters.ne("clusterTime", oplogStart)); 

      For getting "nsList" I made this code that iterates over existing collections and tests if are included in database and collection filters: 

      // Get collection filters on change stream pipeline
      Document[] nsList = new Document[]{};
      for (Document db : primary.listDatabases()) {
          String dbName = db.get("name").toString();
          if (taskContext.filters().databaseFilter().test(dbName)) {
              for (Document coll : primary.getDatabase(dbName).listCollections()) {
                  String collName = coll.get("name").toString();
                  CollectionId collectionId = new CollectionId(
                          replicaSet.replicaSetName(),
                          dbName,
                          collName);
                  if (taskContext.filters().collectionFilter().test(collectionId)) {
                      Document ns = new Document().append("db", dbName).append("coll", collName);
                      nsList = Arrays.copyOf(nsList, nsList.length + 1);
                      nsList[nsList.length - 1] = ns;
                  }
              }
          }
      }

      But I think it isn't the best approach because "nsList" is generated when the job is sent to Kafka Connect. If a new collection or database that matches the include list is created after that, it wouldn't be included in the stream filters. So I think maybe a regex approach based on collection (and database) include/exclude list can be a better option.

              jcechace@redhat.com Jakub Čecháček
              lucio.catinelli Lucio Catinelli (Inactive)
              Votes:
              2 Vote for this issue
              Watchers:
              6 Start watching this issue

                Created:
                Updated:
                Resolved: