Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-9329

MySQL Connector Memory Leak and Thread Accumulation During Snapshot Interruption

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Unresolved
    • Icon: Major Major
    • 3.4.0.Alpha2
    • 3.1.0.Final
    • mysql-connector
    • None
    • False
    • Hide

      None

      Show
      None
    • False
    • Moderate

      In order to make your issue reports as actionable as possible, please provide the following information, depending on the issue type.

      Bug report

      For bug reports, provide this information, please:

      What Debezium connector do you use and what version?

      MySQL Connector, version v3.0.8.Final and later versions [Issue still exists in latest version because it has not been addressed.]

      What is the connector configuration?

      Any MySQL connector configuration that performs initial snapshots, particularly when:

      • Large tables are being snapshotted
      • Network connectivity to Kafka broker is unstable
      • Consumer applications may disconnect during snapshot operations
      • High volume snapshot operations that may trigger queue buffering

      What is the captured database version and mode of deployment?

      MySQL 8.0+ (any version)
      Deployment mode: Any (on-premises, cloud providers like AWS RDS, GCP Cloud SQL, Azure Database for MySQL)

      What behavior do you expect?

      When a MySQL connector task is stopped or fails during a snapshot operation:

      1. The task should shut down gracefully within a reasonable time (< 30 seconds)
      2. All coordinator threads should be properly cleaned up
      3. Memory should be released without accumulation or leaks
      4. Buffered snapshot records should be handled gracefully without causing indefinite blocking
      5. Subsequent connector restarts should work normally without memory pressure

      What behavior do you see?

      1. Memory Accumulation: Coordinator threads get stuck indefinitely when trying to flush buffered snapshot records
      2. Thread Leaks: Snapshot cleanup threads remain blocked and never terminate, holding memory references
      3. Indefinite Blocking: The modifyAndFlushLastRecord method blocks forever when the consumer queue is full and no consumer is available
      4. Resource Exhaustion: Over time, multiple failed snapshot attempts lead to accumulated memory usage and thread pool exhaustion
      5. Delayed Shutdown: Task shutdown can take extremely long or never complete when coordinator threads are stuck

      Specific symptoms:

      • Connector tasks that never finish shutting down
      • Memory usage that continues to grow after task failures
      • Thread dumps showing blocked threads in ChangeEventQueue.doEnqueue() and MySqlChangeEventSourceFactory.modifyAndFlushLastRecord()
      • OutOfMemoryError in environments with repeated snapshot failures

      Do you see the same behaviour using the latest released Debezium version?

      Yes, this issue exists in Debezium v3.0.8.Final and affects all subsequent versions that don't include the proposed fix. The issue is particularly prevalent in production environments with:

      • Intermittent network connectivity
      • Large snapshot operations
      • High-availability setups where consumer applications may restart frequently

      Do you have the connector logs, ideally from start till finish?

      (You might be asked later to provide DEBUG/TRACE level log)

      Typical log patterns showing the issue:

       

      [Coordinator thread] DEBUG - Attempting to flush buffered record during snapshot cleanup
      [Coordinator thread] WARN - Flush attempt timed out, coordinator thread may be stuck
      [Task shutdown] INFO - Stopping MySQL connector task
      [Task shutdown] WARN - Task shutdown taking longer than expected, coordinator threads may be blocked 

       

       

      How to reproduce the issue using our tutorial deployment?

      1. Setup: Use the standard Debezium MySQL tutorial setup with a database containing large tables
      2. Configure: Set up MySQL connector with initial snapshot mode
      3. Trigger: Start the connector to begin initial snapshot of large tables
      4. Simulate failure: During snapshot operation, either:
        1. Stop the Kafka Connect worker abruptly
        2. Disconnect the consumer application
        3. Fill up the Kafka topic to cause backpressure
        4. Introduce network latency/disconnection to Kafka broker
      5.  Observe: Monitor memory usage and thread counts during and after the failure
      6. Verify: Attempt to restart the connector and observe memory accumulation over multiple failure cycles

      Root Cause Analysis:

      1. Simple Blocking Flush: The original modifyAndFlushLastRecord method had a simple implementation:
         
         
           private void modifyAndFlushLastRecord(Function<SourceRecord, SourceRecord> modify) throws InterruptedException {
               queue.flushBuffer(dataChange -> new DataChangeEvent(modify.apply(dataChange.getRecord())));
               queue.disableBuffering();
           } 

         

      1. No Queue Shutdown: The ChangeEventQueue had no mechanism to signal shutdown to waiting threads.
      2. No Coordinated Shutdown: The MySqlConnectorTask.doStop() method did not coordinate with the queue to unblock waiting threads.

      Feature request or enhancement (Bug Fix)

      For feature requests or enhancements, provide this information, please:

      Which use case/requirement will be addressed by the proposed feature?

      This enhancement addresses critical production reliability requirements:

      1. High Availability Operations: Ensures MySQL connectors can handle broker outages and consumer failures gracefully without memory leaks
      2. Resource Management: Prevents memory accumulation that can lead to OutOfMemoryError in production environments
      3. Operational Stability: Enables reliable connector lifecycle management with predictable shutdown behavior
      4. Scalability: Supports large-scale deployments where snapshot operations are frequent and failures are expected
      5. Production Resilience: Improves connector robustness in unstable network environments or high-load scenarios

      Implementation ideas (optional)

      1. Queue Shutdown Mechanism: Add a shutdown() method to ChangeEventQueue that signals waiting threads to unblock gracefully
      2. Enhanced Snapshot Cleanup: Improve modifyAndFlushLastRecord() in MySqlChangeEventSourceFactory to handle interruptions and use the queue shutdown signal
      3. Coordinated Task Shutdown: Modify MySqlConnectorTask.doStop() to trigger queue shutdown early in the shutdown process

       

      Testing of the proposed implementation

      The proposed implementation above was tested locally, under following conditions:

      Database in picture: A 200 row, 10MB each row data. [DELIBERATE SELECTION OF TABLE SIZE TO REPRO THE ISSUE]

      Cluster: Standard

      Task Failing Cause: Due to standard cluster max.request.size is 8388698, so task was failing with

      WorkerSourceTask{id=lcc-devckv61v2-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restartedorg.apache.kafka.connect.errors.ConnectException: Unrecoverable exception from producer send callback	at org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:343)	at org.apache.kafka.connect.runtime.WorkerSourceTask.prepareToSendRecord(WorkerSourceTask.java:135)	at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:445)	at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:391)	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:272)	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:328)	at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:88)	at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$7(Plugins.java:341)	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)	at java.base/java.lang.Thread.run(Thread.java:1583)Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 13981496 bytes when serialized which is larger than 8388698, which is the value of the max.request.size configuration. 

       

       

      Timeline of events

      00:20 - 0:28 → Connector configured running on existing v3.0.8.Final image

      00:28 → Connector deleted

      00:34 → Connect pod restarted with new image (one with above changes on top of 3.0.8)

      00:35 → New Connector configured with same config

       

      Observation

      On older image, at each task failure, older memory was not getting free, as we can see a step pattern in first half of the above image.

      On the newer image (our fix), at each task failure, older memory was getting free, as we observe a near flatline.

      Restarts in both instances of connectors were more or less same as observed from logs.

              Unassigned Unassigned
              yashisrivastavaconfluent Yashi Srivastava (Inactive)
              Animesh Kumar, Yashi Srivastava (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

                Created:
                Updated: