Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-6973

Add MongoDB Connector support for `filtering.match.mode=regex|literal` property

XMLWordPrintable

    • Icon: Feature Request Feature Request
    • Resolution: Done
    • Icon: Major Major
    • 2.4.0.Final
    • None
    • mongodb-connector
    • None
    • False
    • None
    • False
    • 0
    • 0% 0%

      After performing deeper profiling of MongoB Debezium change stream queries, we've discovered that the aggregation pipeline operators are problematic in high throughput scenarios, adding large load to MongoDB Atlas. We ran 4 hour a load test with a 5k rps scale of our application platform. In analyzing the 108k slow queries reported for that load test, 98,668 of them were getMores on the associated change stream with the following structure:

      {
        "_id": "ObjectId(\"64fb212051c100d0523a8666\")",
        "t": "ISODate(\"2023-08-31T00:29:13.553Z\")",
        "s": "I",
        "c": "COMMAND",
        "id": 51803,
        "ctx": "conn273193",
        "msg": "Slow query",
        "attr": {
          "type": "command",
          "ns": "admin.$cmd",
          "appName": "<snip>",
          "command": {
            "getMore": 2531982790501019123,
            "collection": "$cmd.aggregate",
            "$db": "admin",
            "$clusterTime": {
              "clusterTime": "Timestamp(1693441752, 1033)",
              "signature": {
                "hash": "",
                "keyId": 7272676609172176900
              }
            },
            "lsid": {
              "id": "c5f68fc8-ac60-4219-9274-475b8fb93762"
            }
          },
          "originatingCommand": {
            "aggregate": 1,
            "pipeline": {
              "0": {
                "$changeStream": {
                  "fullDocument": "updateLookup",
                  "allChangesForCluster": true,
                  "resumeAfter": {
                    "_data": "8264EFA9EC000015D02B0229296E04"
                  }
                }
              },
              "1": {
                "$replaceRoot": {
                  "newRoot": {
                    "event": "$$ROOT",
                    "namespace": {
                      "$concat": {
                        "0": "$ns.db",
                        "1": ".",
                        "2": "$ns.coll"
                      }
                    }
                  }
                }
              },
              "2": {
                "$match": {
                  "$and": {
                    "0": {
                      "$and": {
                        "0": {
                          "event.ns.db": {}
                        },
                        "1": {
                          "namespace": {}
                        }
                      }
                    },
                    "1": {
                      "event.operationType": {
                        "$in": {
                          "0": "insert",
                          "1": "update",
                          "2": "replace",
                          "3": "delete"
                        }
                      }
                    }
                  }
                }
              },
              "3": {
                "$replaceRoot": {
                  "newRoot": "$event"
                }
              }
            },
            "cursor": {},
            "$db": "admin",
            "lsid": {
              "id": "c5f68fc8-ac60-4219-9274-475b8fb93762"
            },
            "$readPreference": {
              "mode": "secondaryPreferred"
            }
          },
          "planSummary": "COLLSCAN",
          "cursorid": 2531982790501019123,
          "keysExamined": 0,
          "docsExamined": 1840,
          "numYields": 4622,
          "nreturned": 0,
          "reslen": 302,
          "locks": {
            "FeatureCompatibilityVersion": {
              "acquireCount": {
                "r": 7861
              }
            },
            "ReplicationStateTransition": {
              "acquireCount": {
                "w": 7861
              }
            },
            "Global": {
              "acquireCount": {
                "r": 7861
              }
            },
            "Database": {
              "acquireCount": {
                "r": 7861
              }
            },
            "Collection": {
              "acquireCount": {
                "r": 1398
              }
            },
            "Mutex": {
              "acquireCount": {
                "r": 3239
              }
            },
            "oplog": {
              "acquireCount": {
                "r": 6463
              }
            }
          },
          "readConcern": {
            "level": "majority"
          },
          "storage": {
            "data": {
              "bytesRead": 209
            }
          },
          "protocol": "op_msg",
          "durationMillis": 213
        },
        "startedAt": "ISODate(\"2023-08-31T00:29:13.340Z\")"
      }
      

      The originating pipeline:

      [
        {
          '$changeStream': {
            fullDocument: 'updateLookup',
            allChangesForCluster: true,
            resumeAfter: { _data: '8264EFA9EC000015D02B0229296E04' }
          }
        },
        {
          '$replaceRoot': {
            newRoot: {
              event: '$$ROOT',
              namespace: { '$concat': [ '$ns.db', '.', '$ns.coll' ] }
            }
          }
        },
        {
          '$match': {
            '$and': [
              {
                '$and': [
                  { 'event.ns.db': /mydb/i },
                  { namespace: /mydb.mycollection/i }
                ]
              },
              {
                'event.operationType': { '$in': [ 'insert', 'update', 'replace', 'delete' ] }
              }
            ]
          }
        },
        { '$replaceRoot': { newRoot: '$event' } }
      ]
      

      In order to remove the need to perform regex, transform, concat and match processing in change streams on the server side, this feature is proposing a new configuration property to change the behavior of collection.include.list and database.include.list matching. This will further allow pushing down the match expression into the cursor prior to transformation.

      The idea is to introduce a filtering.match.mode=regex|literal configuration property with the following semantics:

      • regex (default) : matches database names and fully qualified collection identifiers as regular expressions
      • literal: matches database names and simple collection names exactly. collection.include.list must be qualified (i.e. with the dotted db name). Multiple value supported via comma-separated values where whitespace will be stripped and is case sensitive. Note that db names are not allowed to contain dots in MongoDB so there is no ambiguity in parsing out the db name from the collection name.

      Thus, with this we can convert:

      "database.include.list": "mydb",
      "collection.include.list": "mydb.users",
      

      to

      "collection.include.list": "mydb.users",
      "filtering.match.mode": "literal",
      

      Which would roughly translate to the following MongoDB pipeline expression:

      {
          $match: {
              $or: [
                  { 'ns.db': 'mydb', 'ns.col': 'users' } 
              ]
          }
      }
      

      This would not be applied to other connectors (e.g PG, etc.).

            Unassigned Unassigned
            btiernay Bob Tiernay
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

              Created:
              Updated:
              Resolved: