Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-9150

Decouple LSN Flush from Offset Committer Thread and Introduce Timeout Handling

XMLWordPrintable

      The Postgres connector currently flushes the last processed LSN to the server during each invocation of the commit method. This method is executed by the SourceTaskOffsetCommitter thread, which is responsible for committing offsets for all source connectors in the worker.

      However, in scenarios where the flushLSN call becomes blocked, for example, due to a lock contention in the underlying library, the SourceTaskOffsetCommitter thread also becomes stuck. This results in the offset commit functionality halting across all source connectors managed by that worker.

      We have observed instances where the offset committer thread becomes indefinitely blocked. In one such case, the thread was found waiting on a lock held by a zombie change-event-source-coordinator thread, which likely remained active due to a thread leak.

      Below are the thread traces for both the offset committer thread and the zombie coordinator thread for reference.

      SourceTaskOffsetCommitter Thread:

      "SourceTaskOffsetCommitter-1" #271 [2078] prio=5 os_prio=0 cpu=449.83ms elapsed=594771.50s tid=0x0000ffff1441a740 nid=2078 waiting for monitor entry  [0x0000fffea7677000]
         java.lang.Thread.State: BLOCKED (on object monitor)
      at org.postgresql.core.v3.CopyOperationImpl.isActive(CopyOperationImpl.java:52)
      - waiting to lock <0x000000059cc3ba30> (a org.postgresql.core.v3.QueryExecutorImpl)
      at org.postgresql.core.v3.replication.V3PGReplicationStream.isClosed(V3PGReplicationStream.java:118)
      at org.postgresql.core.v3.replication.V3PGReplicationStream.checkClose(V3PGReplicationStream.java:273)
      at org.postgresql.core.v3.replication.V3PGReplicationStream.forceUpdateStatus(V3PGReplicationStream.java:112)
      at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.doFlushLsn(PostgresReplicationConnection.java:528)
      at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.flushLsn(PostgresReplicationConnection.java:521)
      at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.commitOffset(PostgresStreamingChangeEventSource.java:421)
      at io.debezium.pipeline.ChangeEventSourceCoordinator.commitOffset(ChangeEventSourceCoordinator.java:192)
      at io.debezium.connector.common.BaseSourceTask.commit(BaseSourceTask.java:282)
      at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.commitSourceTask(AbstractWorkerSourceTask.java:603)
      at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:326)
      at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:111)
      at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.lambda$schedule$0(SourceTaskOffsetCommitter.java:79)
      at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$$Lambda/0x00000038021c8d58.run(Unknown Source)
      at java.util.concurrent.Executors$RunnableAdapter.call(java.base@21.0.4/Executors.java:572)
      at java.util.concurrent.FutureTask.runAndReset(java.base@21.0.4/FutureTask.java:358)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(java.base@21.0.4/ScheduledThreadPoolExecutor.java:305)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@21.0.4/ThreadPoolExecutor.java:1144)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@21.0.4/ThreadPoolExecutor.java:642)
      at java.lang.Thread.runWith(java.base@21.0.4/Thread.java:1596)
      at java.lang.Thread.run(java.base@21.0.4/Thread.java:1583)
       
         Locked ownable synchronizers:
      - <0x00000005a1d456e0> (a java.util.concurrent.ThreadPoolExecutor$Worker)
      - <0x00000005ad6e9058> (a java.util.concurrent.locks.ReentrantLock$NonfairSync) 

       

      change-event-source-coordinator:

      "debezium-postgresconnector-intg_colend-change-event-source-coordinator" #300 [2107] prio=5 os_prio=0 cpu=386872.26ms elapsed=594771.01s tid=0x0000ffff04315a90 nid=2107 runnable  [0x0000fffea9e4f000]
         java.lang.Thread.State: RUNNABLE
      at sun.nio.ch.Net.poll(java.base@21.0.4/Native Method)
      at sun.nio.ch.NioSocketImpl.park(java.base@21.0.4/NioSocketImpl.java:191)
      at sun.nio.ch.NioSocketImpl.timedRead(java.base@21.0.4/NioSocketImpl.java:280)
      at sun.nio.ch.NioSocketImpl.implRead(java.base@21.0.4/NioSocketImpl.java:304)
      at sun.nio.ch.NioSocketImpl.read(java.base@21.0.4/NioSocketImpl.java:346)
      at sun.nio.ch.NioSocketImpl$1.read(java.base@21.0.4/NioSocketImpl.java:796)
      at java.net.Socket$SocketInputStream.read(java.base@21.0.4/Socket.java:1099)
      at org.postgresql.core.VisibleBufferedInputStream.readMore(VisibleBufferedInputStream.java:161)
      at org.postgresql.core.VisibleBufferedInputStream.ensureBytes(VisibleBufferedInputStream.java:128)
      at org.postgresql.core.VisibleBufferedInputStream.ensureBytes(VisibleBufferedInputStream.java:113)
      at org.postgresql.core.VisibleBufferedInputStream.read(VisibleBufferedInputStream.java:223)
      at org.postgresql.core.PGStream.receive(PGStream.java:646)
      at org.postgresql.core.PGStream.receive(PGStream.java:630)
      at org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:1320)
      at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1163)
      - locked <0x000000059cc3ba30> (a org.postgresql.core.v3.QueryExecutorImpl)
      at org.postgresql.core.v3.CopyDualImpl.readFromCopy(CopyDualImpl.java:44)
      at org.postgresql.core.v3.replication.V3PGReplicationStream.receiveNextData(V3PGReplicationStream.java:160)
      at org.postgresql.core.v3.replication.V3PGReplicationStream.readInternal(V3PGReplicationStream.java:125)
      at org.postgresql.core.v3.replication.V3PGReplicationStream.readPending(V3PGReplicationStream.java:82)
      at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.readPending(PostgresReplicationConnection.java:490)
      at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.processMessages(PostgresStreamingChangeEventSource.java:216)
      at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:178)
      at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:41)
      at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:174)
      at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:141)
      at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:109)
      at io.debezium.pipeline.ChangeEventSourceCoordinator$$Lambda/0x00000038022a3cf8.run(Unknown Source)
      at java.util.concurrent.Executors$RunnableAdapter.call(java.base@21.0.4/Executors.java:572)
      at java.util.concurrent.FutureTask.run(java.base@21.0.4/FutureTask.java:317)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@21.0.4/ThreadPoolExecutor.java:1144)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@21.0.4/ThreadPoolExecutor.java:642)
      at java.lang.Thread.runWith(java.base@21.0.4/Thread.java:1596)
      at java.lang.Thread.run(java.base@21.0.4/Thread.java:1583)
       
         Locked ownable synchronizers:
      - <0x00000005ae42dc20> (a java.util.concurrent.ThreadPoolExecutor$Worker)
      - <0x00000005ae437de8> (a java.util.concurrent.locks.ReentrantLock$NonfairSync) 

       
       
      Regardless of the underlying cause, this existing design creates a critical vulnerability: a single misbehaving connector can block offset commits for all connectors. To maintain robustness, the commit method must remain lightweight and non-blocking.

      Proposed Solution

      1. Move LSN Flush Logic to the Task Thread
        Shift the flushLSN invocation from the SourceTaskOffsetCommitter thread to the connector's task thread. This ensures that even if flushLSN hangs or slows down, it doesn't impact other connectors.
      2. Introduce a Timeout for LSN Flush
        Add a configurable timeout for the LSN flush operation. If the flush does not complete within the defined time, the connector task should fail fast with a clear error message. This prevents indefinite blocking and simplifies debugging.

       

              Unassigned Unassigned
              rdangwal Rajendra Dangwal
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

                Created:
                Updated:
                Resolved: