Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-6521

MongoDB change stream pipeline not respecting hard coded `readPreference=secondaryPreferred`

    XMLWordPrintable

Details

    • False
    • None
    • False

    Description

      Bug report

      For bug reports, provide this information, please:

      What Debezium connector do you use and what version?

      2.2.0.Alpha3 nighly build 20230227.180322-243 on commit
      https://github.com/debezium/debezium/commit/7384a1695e20fcd6f826205bb9b15e2fb5446589

      What is the connector configuration?

      "mongodb.connection.string": "mongodb+srv://<snip>.mongodb.net/?readPreference=secondary&compressors=zstd,snappy",
      "mongodb.hosts": "mongodb:/<snip>.mongodb.net:1024,<snip>mongodb.net:1025,<snip>.mongodb.net:1026/?ssl=true&authSource=admin&replicaSet=atlas-<snip>-shard-0",
      

      What is the captured database version and mode of deployment?

      Atlas MongoDB 4.4.22

      What behaviour do you expect?

      Based on:

      To use readPreference=secondaryPreferred

      What behaviour do you see?

      To use readPreference= primaryPreferred

      Do you see the same behaviour using the latest releasedd Debezium version?

      Unknown

      Do you have the connector logs, ideally from start till finish?

      [2023-06-02 16:57:26,923] INFO No incremental snapshot in progress, no action needed on start (io.debezium.connector.mongodb.MongoDbIncrementalSnapshotChangeEventSource)
      [2023-06-02 16:57:26,923] INFO Starting streaming (io.debezium.pipeline.ChangeEventSourceCoordinator)
      [2023-06-02 16:57:26,923] INFO Cluster description not yet available. Waiting for 30000 ms before timing out (org.mongodb.driver.cluster)
      [2023-06-02 16:57:26,923] INFO Reading change stream for 'atlas-<snip>.mongodb.net:1024,<snip>.mongodb.net:1025,<snip>.mongodb.net:1026'/secondaryPreferred from <snip>.mongodb.net:1025 starting at Timestamp{value=7239996233081683971, seconds=1685692983, inc=3} (io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource)
      [2023-06-02 16:57:26,923] INFO Adding discovered server <snip>.mongodb.net:1024 to client view of cluster (org.mongodb.driver.cluster)
      [2023-06-02 16:57:26,924] INFO Change stream pipeline: {"pipeline": [{"$addFields": {"namespace": {"$concat": ["$ns.db", ".", "$ns.coll"]}}}, {"$match": {"$and": [{"$and": [{"ns.db": {"$regularExpression": {"pattern": "<snip>", "options": "i"}}}, {"namespace": {"$regularExpression": {"pattern": "<snip>.<snip>", "options": "i"}}}]}, {"operationType": {"$in": ["insert", "update", "replace", "delete"]}}]}}, {"$addFields": {"namespace": "$$REMOVE"}}]} (io.debezium.connector.mongodb.ChangeStreamPipelineFactory)
      [2023-06-02 16:57:26,924] INFO Adding discovered server <snip>.mongodb.net:1025 to client view of cluster (org.mongodb.driver.cluster)
      [2023-06-02 16:57:26,924] INFO Adding discovered server <snip>.mongodb.net:1026 to client view of cluster (org.mongodb.driver.cluster)
      [2023-06-02 16:57:26,924] INFO MongoClient with metadata {"driver": {"name": "mongo-java-driver|sync", "version": "4.7.1"}, "os": {"type": "Linux", "name": "Linux", "architecture": "amd64", "version": "5.15.108"}, "platform": "Java/Azul Systems, Inc./11.0.18+10-LTS"} created with settings MongoClientSettings{readPreference=primary, writeConcern=WriteConcern{w=null, wTimeout=null ms, journal=null}, retryWrites=true, retryReads=true, readConcern=ReadConcern{level=null}, credential=MongoCredential{mechanism=null, userName='default-vortex', source='admin', password=<hidden>, mechanismProperties=<hidden>}, streamFactoryFactory=null, commandListeners=[], codecRegistry=ProvidersCodecRegistry{codecProviders=[ValueCodecProvider{}, BsonValueCodecProvider{}, DBRefCodecProvider{}, DBObjectCodecProvider{}, DocumentCodecProvider{}, IterableCodecProvider{}, MapCodecProvider{}, GeoJsonCodecProvider{}, GridFSFileCodecProvider{}, Jsr310CodecProvider{}, JsonObjectCodecProvider{}, BsonCodecProvider{}, EnumCodecProvider{}, com.mongodb.Jep395RecordCodecProvider@326d7bf]}, clusterSettings={hosts=[<snip>.mongodb.net:1024, <snip>.mongodb.net:1025, <snip>.mongodb.net:1026], srvServiceName=mongodb, mode=MULTIPLE, requiredClusterType=UNKNOWN, requiredReplicaSetName='null', serverSelector='null', clusterListeners='[]', serverSelectionTimeout='30000 ms', localThreshold='30000 ms'}, socketSettings=SocketSettings{connectTimeoutMS=10000, readTimeoutMS=0, receiveBufferSize=0, sendBufferSize=0}, heartbeatSocketSettings=SocketSettings{connectTimeoutMS=10000, readTimeoutMS=10000, receiveBufferSize=0, sendBufferSize=0}, connectionPoolSettings=ConnectionPoolSettings{maxSize=100, minSize=0, maxWaitTimeMS=120000, maxConnectionLifeTimeMS=0, maxConnectionIdleTimeMS=0, maintenanceInitialDelayMS=0, maintenanceFrequencyMS=60000, connectionPoolListeners=[], maxConnecting=2}, serverSettings=ServerSettings{heartbeatFrequencyMS=10000, minHeartbeatFrequencyMS=500, serverListeners='[]', serverMonitorListeners='[]'}, sslSettings=SslSettings{enabled=true, invalidHostNameAllowed=false, context=null}, applicationName='null', compressorList=[], uuidRepresentation=UNSPECIFIED, serverApi=null, autoEncryptionSettings=null, contextProvider=null} (org.mongodb.driver.client)
      [2023-06-02 16:57:26,924] INFO Cluster description not yet available. Waiting for 30000 ms before timing out (org.mongodb.driver.cluster)
      [2023-06-02 16:57:26,925] INFO Resuming streaming from token '826479A237000000032B022C0100296E5A10049AFD063A611A4C51A9097F3365C9987246645F696400646479A21DB7EEE8E5DD0FBB830004' (io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource)
      [2023-06-02 16:57:26,935] INFO Opened connection [connectionId{localValue:447, serverValue:161424}] to <snip>.mongodb.net:1026 (org.mongodb.driver.connection)
      [2023-06-02 16:57:26,936] INFO Opened connection [connectionId{localValue:448, serverValue:161427}] to <snip>.mongodb.net:1026 (org.mongodb.driver.connection)
      [2023-06-02 16:57:26,936] INFO The oplog contains the last entry previously read for 'atlas-<snip>-shard-0', so no snapshot will be performed (io.debezium.connector.mongodb.MongoDbSnapshotChangeEventSource)
      [2023-06-02 16:57:26,936] INFO Opened connection [connectionId{localValue:443, serverValue:161423}] to <snip>.mongodb.net:1026 (org.mongodb.driver.connection)
      [2023-06-02 16:57:26,939] INFO Closing all connections to mongodb+srv://<snip>.mongodb.net/?readPreference=secondary&compressors=zstd,snappy (io.debezium.connector.mongodb.ConnectionContext)
      

      Looking at the active queries we see:

      {"aggregate":1,"pipeline":[{"$changeStream":{"fullDocument":"updateLookup","allChangesForCluster":true,"resumeAfter":{"_data":"<snip>"}}},{"$addFields":{"namespace":{"$concat":["$ns.db",".","$ns.coll"]}}},{"$match":{"$and":[{"$and":[{"ns.db":{"$regularExpression":{"pattern":"auth0","options":"i"}}},{"namespace":{"$regularExpression":{"pattern":"<snip>.<snip>","options":"i"}}}]},{"operationType":{"$in":["insert","update","replace","delete"]}}]}},{"$addFields":{"namespace":"$$REMOVE"}}],"cursor":{},"$db":"admin","$clusterTime":{"clusterTime":{"$timestamp":{"t":1685692530,"i":1}},"signature":{"hash":{"$binary":{"base64":"OAbNSnJefsRJJK0FpKomkd+Y0ww=","subType":"0"}},"keyId":7195250379028693000}},"lsid":{"id":{"$uuid":"ad60a903-9bea-4fe9-8f58-4ecdaf8e0aef"}},"$readPreference":{"mode":"primaryPreferred"}}
      

      which indicates primaryPreferred

      How to reproduce the issue using our tutorial deployment?

      To reproduce, just run a test locally and change log level to INFO and / or observe the queries that are being sent to MongoDB. Using the Debezium source:

      # Checkout effected version
      git clone git@github.com:debezium/debezium.git && cd debezium
      git checkout 7384a1695e20fcd6f826205bb9b15e2fb5446589
      
      # Change MongoDB driver log level to DEBUG to log queries
      sed -i -e '26s/warn/debug/'  debezium-connector-mongodb/src/test/resources/logback-test.xml
      
      # Execute test with streaming that should hard code to `secondaryPreferred`
      ./mvnw clean verify -B -pl debezium-connector-mongodb -am -Passembly -DskipTests -Dit.test='io.debezium.connector.mongodb.MongoDbConnectorIT#shouldConsumeAllEventsFromDatabase' | grep -i 'getMore'
      

      Observe results and notice the primaryPreferred read preference is being applied:

      2023-06-02 16:02:12,575 DEBUG  MongoDB|mongo|streaming  Sending command '{"getMore": 4075436513024580509, "collection": "$cmd.aggregate", "$db": "admin", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1685736132, "i": 14}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "lsid": {"id": {"$binary": {"base64": "t9JaohDTSHmH+t5E17wayA==", "subType": "04"}}}, "$readPreference": {"mode": "primaryPreferred"}}' with request id 68 to database admin on connection [connectionId{localValue:26, serverValue:36}] to server test-mongo1:27017   [org.mongodb.driver.protocol.command]
      2023-06-02 16:02:12,745 DEBUG  MongoDB|mongo|streaming  Sending command '{"getMore": 4075436513024580509, "collection": "$cmd.aggregate", "$db": "admin", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1685736132, "i": 17}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "lsid": {"id": {"$binary": {"base64": "t9JaohDTSHmH+t5E17wayA==", "subType": "04"}}}, "$readPreference": {"mode": "primaryPreferred"}}' with request id 76 to database admin on connection [connectionId{localValue:26, serverValue:36}] to server test-mongo1:27017   [org.mongodb.driver.protocol.command]
      2023-06-02 16:02:12,750 DEBUG  MongoDB|mongo|streaming  Sending command '{"getMore": 4075436513024580509, "collection": "$cmd.aggregate", "$db": "admin", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1685736132, "i": 18}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "lsid": {"id": {"$binary": {"base64": "t9JaohDTSHmH+t5E17wayA==", "subType": "04"}}}, "$readPreference": {"mode": "primaryPreferred"}}' with request id 78 to database admin on connection [connectionId{localValue:26, serverValue:36}] to server test-mongo1:27017   [org.mongodb.driver.protocol.command]
      2023-06-02 16:02:12,754 DEBUG  MongoDB|mongo|streaming  Sending command '{"getMore": 4075436513024580509, "collection": "$cmd.aggregate", "$db": "admin", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1685736132, "i": 19}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "lsid": {"id": {"$binary": {"base64": "t9JaohDTSHmH+t5E17wayA==", "subType": "04"}}}, "$readPreference": {"mode": "primaryPreferred"}}' with request id 80 to database admin on connection [connectionId{localValue:26, serverValue:36}] to server test-mongo1:27017   [org.mongodb.driver.protocol.command]
      2023-06-02 16:02:12,760 DEBUG  MongoDB|mongo|streaming  Sending command '{"getMore": 4075436513024580509, "collection": "$cmd.aggregate", "$db": "admin", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1685736132, "i": 20}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "lsid": {"id": {"$binary": {"base64": "t9JaohDTSHmH+t5E17wayA==", "subType": "04"}}}, "$readPreference": {"mode": "primaryPreferred"}}' with request id 82 to database admin on connection [connectionId{localValue:26, serverValue:36}] to server test-mongo1:27017   [org.mongodb.driver.protocol.command]
      2023-06-02 16:02:13,942 DEBUG  MongoDB|mongo|streaming  Sending command '{"getMore": 8159915228174257611, "collection": "$cmd.aggregate", "$db": "admin", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1685736133, "i": 7}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "lsid": {"id": {"$binary": {"base64": "IQvtzrJzRXe5WRSRIFBuRg==", "subType": "04"}}}, "$readPreference": {"mode": "primaryPreferred"}}' with request id 128 to database admin on connection [connectionId{localValue:49, serverValue:59}] to server test-mongo1:27017   [org.mongodb.driver.protocol.command]
      2023-06-02 16:02:14,018 DEBUG  MongoDB|mongo|streaming  Sending command '{"getMore": 8159915228174257611, "collection": "$cmd.aggregate", "$db": "admin", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1685736134, "i": 2}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "lsid": {"id": {"$binary": {"base64": "IQvtzrJzRXe5WRSRIFBuRg==", "subType": "04"}}}, "$readPreference": {"mode": "primaryPreferred"}}' with request id 136 to database admin on connection [connectionId{localValue:49, serverValue:59}] to server test-mongo1:27017   [org.mongodb.driver.protocol.command]
      2023-06-02 16:02:14,022 DEBUG  MongoDB|mongo|streaming  Sending command '{"getMore": 8159915228174257611, "collection": "$cmd.aggregate", "$db": "admin", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1685736134, "i": 3}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "lsid": {"id": {"$binary": {"base64": "IQvtzrJzRXe5WRSRIFBuRg==", "subType": "04"}}}, "$readPreference": {"mode": "primaryPreferred"}}' with request id 138 to database admin on connection [connectionId{localValue:49, serverValue:59}] to server test-mongo1:27017   [org.mongodb.driver.protocol.command]
      2023-06-02 16:02:14,026 DEBUG  MongoDB|mongo|streaming  Sending command '{"getMore": 8159915228174257611, "collection": "$cmd.aggregate", "$db": "admin", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1685736134, "i": 4}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "lsid": {"id": {"$binary": {"base64": "IQvtzrJzRXe5WRSRIFBuRg==", "subType": "04"}}}, "$readPreference": {"mode": "primaryPreferred"}}' with request id 140 to database admin on connection [connectionId{localValue:49, serverValue:59}] to server test-mongo1:27017   [org.mongodb.driver.protocol.command]
      2023-06-02 16:02:14,031 DEBUG  MongoDB|mongo|streaming  Sending command '{"getMore": 8159915228174257611, "collection": "$cmd.aggregate", "$db": "admin", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1685736134, "i": 5}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "lsid": {"id": {"$binary": {"base64": "IQvtzrJzRXe5WRSRIFBuRg==", "subType": "04"}}}, "$readPreference": {"mode": "primaryPreferred"}}' with request id 142 to database admin on connection [connectionId{localValue:49, serverValue:59}] to server test-mongo1:27017   [org.mongodb.driver.protocol.command]
      2023-06-02 16:02:14,035 DEBUG  MongoDB|mongo|streaming  Sending command '{"getMore": 8159915228174257611, "collection": "$cmd.aggregate", "$db": "admin", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1685736134, "i": 6}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "lsid": {"id": {"$binary": {"base64": "IQvtzrJzRXe5WRSRIFBuRg==", "subType": "04"}}}, "$readPreference": {"mode": "primaryPreferred"}}' with request id 144 to database admin on connection [connectionId{localValue:49, serverValue:59}] to server test-mongo1:27017   [org.mongodb.driver.protocol.command]
      2023-06-02 16:02:14,039 DEBUG  MongoDB|mongo|streaming  Sending command '{"getMore": 8159915228174257611, "collection": "$cmd.aggregate", "$db": "admin", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1685736134, "i": 7}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "lsid": {"id": {"$binary": {"base64": "IQvtzrJzRXe5WRSRIFBuRg==", "subType": "04"}}}, "$readPreference": {"mode": "primaryPreferred"}}' with request id 146 to database admin on connection [connectionId{localValue:49, serverValue:59}] to server test-mongo1:27017   [org.mongodb.driver.protocol.command]
      2023-06-02 16:02:14,044 DEBUG  MongoDB|mongo|streaming  Sending command '{"getMore": 8159915228174257611, "collection": "$cmd.aggregate", "$db": "admin", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1685736134, "i": 8}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "lsid": {"id": {"$binary": {"base64": "IQvtzrJzRXe5WRSRIFBuRg==", "subType": "04"}}}, "$readPreference": {"mode": "primaryPreferred"}}' with request id 148 to database admin on connection [connectionId{localValue:49, serverValue:59}] to server test-mongo1:27017   [org.mongodb.driver.protocol.command]
      2023-06-02 16:02:14,048 DEBUG  MongoDB|mongo|streaming  Sending command '{"getMore": 8159915228174257611, "collection": "$cmd.aggregate", "$db": "admin", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1685736134, "i": 9}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "lsid": {"id": {"$binary": {"base64": "IQvtzrJzRXe5WRSRIFBuRg==", "subType": "04"}}}, "$readPreference": {"mode": "primaryPreferred"}}' with request id 150 to database admin on connection [connectionId{localValue:49, serverValue:59}] to server test-mongo1:27017   [org.mongodb.driver.protocol.command]
      2023-06-02 16:02:14,104 DEBUG  MongoDB|mongo|streaming  Sending command '{"getMore": 8159915228174257611, "collection": "$cmd.aggregate", "$db": "admin", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1685736134, "i": 11}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "lsid": {"id": {"$binary": {"base64": "IQvtzrJzRXe5WRSRIFBuRg==", "subType": "04"}}}, "$readPreference": {"mode": "primaryPreferred"}}' with request id 158 to database admin on connection [connectionId{localValue:49, serverValue:59}] to server test-mongo1:27017   [org.mongodb.driver.protocol.command]
      2023-06-02 16:02:14,131 DEBUG  MongoDB|mongo|streaming  Sending command '{"getMore": 8159915228174257611, "collection": "$cmd.aggregate", "$db": "admin", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1685736134, "i": 12}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "lsid": {"id": {"$binary": {"base64": "IQvtzrJzRXe5WRSRIFBuRg==", "subType": "04"}}}, "$readPreference": {"mode": "primaryPreferred"}}' with request id 167 to database admin on connection [connectionId{localValue:49, serverValue:59}] to server test-mongo1:27017   [org.mongodb.driver.protocol.command]
      2023-06-02 16:02:14,173 DEBUG  MongoDB|mongo|streaming  Sending command '{"getMore": 8159915228174257611, "collection": "$cmd.aggregate", "$db": "admin", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1685736134, "i": 13}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "lsid": {"id": {"$binary": {"base64": "IQvtzrJzRXe5WRSRIFBuRg==", "subType": "04"}}}, "$readPreference": {"mode": "primaryPreferred"}}' with request id 175 to database admin on connection [connectionId{localValue:49, serverValue:59}] to server test-mongo1:27017   [org.mongodb.driver.protocol.command]
      

      Attachments

        Issue Links

          Activity

            People

              jcechace@redhat.com Jakub Čecháček
              btiernay Bob Tiernay
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: