-
Enhancement
-
Resolution: Done
-
Major
-
None
-
None
-
False
-
-
False
The Postgres connector currently flushes the last processed LSN to the server during each invocation of the commit method. This method is executed by the SourceTaskOffsetCommitter thread, which is responsible for committing offsets for all source connectors in the worker.
However, in scenarios where the flushLSN call becomes blocked, for example, due to a lock contention in the underlying library, the SourceTaskOffsetCommitter thread also becomes stuck. This results in the offset commit functionality halting across all source connectors managed by that worker.
We have observed instances where the offset committer thread becomes indefinitely blocked. In one such case, the thread was found waiting on a lock held by a zombie change-event-source-coordinator thread, which likely remained active due to a thread leak.
Below are the thread traces for both the offset committer thread and the zombie coordinator thread for reference.
SourceTaskOffsetCommitter Thread:
"SourceTaskOffsetCommitter-1" #271 [2078] prio=5 os_prio=0 cpu=449.83ms elapsed=594771.50s tid=0x0000ffff1441a740 nid=2078 waiting for monitor entry [0x0000fffea7677000] java.lang.Thread.State: BLOCKED (on object monitor) at org.postgresql.core.v3.CopyOperationImpl.isActive(CopyOperationImpl.java:52) - waiting to lock <0x000000059cc3ba30> (a org.postgresql.core.v3.QueryExecutorImpl) at org.postgresql.core.v3.replication.V3PGReplicationStream.isClosed(V3PGReplicationStream.java:118) at org.postgresql.core.v3.replication.V3PGReplicationStream.checkClose(V3PGReplicationStream.java:273) at org.postgresql.core.v3.replication.V3PGReplicationStream.forceUpdateStatus(V3PGReplicationStream.java:112) at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.doFlushLsn(PostgresReplicationConnection.java:528) at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.flushLsn(PostgresReplicationConnection.java:521) at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.commitOffset(PostgresStreamingChangeEventSource.java:421) at io.debezium.pipeline.ChangeEventSourceCoordinator.commitOffset(ChangeEventSourceCoordinator.java:192) at io.debezium.connector.common.BaseSourceTask.commit(BaseSourceTask.java:282) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.commitSourceTask(AbstractWorkerSourceTask.java:603) at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:326) at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:111) at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.lambda$schedule$0(SourceTaskOffsetCommitter.java:79) at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$$Lambda/0x00000038021c8d58.run(Unknown Source) at java.util.concurrent.Executors$RunnableAdapter.call(java.base@21.0.4/Executors.java:572) at java.util.concurrent.FutureTask.runAndReset(java.base@21.0.4/FutureTask.java:358) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(java.base@21.0.4/ScheduledThreadPoolExecutor.java:305) at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@21.0.4/ThreadPoolExecutor.java:1144) at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@21.0.4/ThreadPoolExecutor.java:642) at java.lang.Thread.runWith(java.base@21.0.4/Thread.java:1596) at java.lang.Thread.run(java.base@21.0.4/Thread.java:1583) Locked ownable synchronizers: - <0x00000005a1d456e0> (a java.util.concurrent.ThreadPoolExecutor$Worker) - <0x00000005ad6e9058> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
change-event-source-coordinator:
"debezium-postgresconnector-intg_colend-change-event-source-coordinator" #300 [2107] prio=5 os_prio=0 cpu=386872.26ms elapsed=594771.01s tid=0x0000ffff04315a90 nid=2107 runnable [0x0000fffea9e4f000] java.lang.Thread.State: RUNNABLE at sun.nio.ch.Net.poll(java.base@21.0.4/Native Method) at sun.nio.ch.NioSocketImpl.park(java.base@21.0.4/NioSocketImpl.java:191) at sun.nio.ch.NioSocketImpl.timedRead(java.base@21.0.4/NioSocketImpl.java:280) at sun.nio.ch.NioSocketImpl.implRead(java.base@21.0.4/NioSocketImpl.java:304) at sun.nio.ch.NioSocketImpl.read(java.base@21.0.4/NioSocketImpl.java:346) at sun.nio.ch.NioSocketImpl$1.read(java.base@21.0.4/NioSocketImpl.java:796) at java.net.Socket$SocketInputStream.read(java.base@21.0.4/Socket.java:1099) at org.postgresql.core.VisibleBufferedInputStream.readMore(VisibleBufferedInputStream.java:161) at org.postgresql.core.VisibleBufferedInputStream.ensureBytes(VisibleBufferedInputStream.java:128) at org.postgresql.core.VisibleBufferedInputStream.ensureBytes(VisibleBufferedInputStream.java:113) at org.postgresql.core.VisibleBufferedInputStream.read(VisibleBufferedInputStream.java:223) at org.postgresql.core.PGStream.receive(PGStream.java:646) at org.postgresql.core.PGStream.receive(PGStream.java:630) at org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:1320) at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1163) - locked <0x000000059cc3ba30> (a org.postgresql.core.v3.QueryExecutorImpl) at org.postgresql.core.v3.CopyDualImpl.readFromCopy(CopyDualImpl.java:44) at org.postgresql.core.v3.replication.V3PGReplicationStream.receiveNextData(V3PGReplicationStream.java:160) at org.postgresql.core.v3.replication.V3PGReplicationStream.readInternal(V3PGReplicationStream.java:125) at org.postgresql.core.v3.replication.V3PGReplicationStream.readPending(V3PGReplicationStream.java:82) at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.readPending(PostgresReplicationConnection.java:490) at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.processMessages(PostgresStreamingChangeEventSource.java:216) at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:178) at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:41) at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:174) at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:141) at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:109) at io.debezium.pipeline.ChangeEventSourceCoordinator$$Lambda/0x00000038022a3cf8.run(Unknown Source) at java.util.concurrent.Executors$RunnableAdapter.call(java.base@21.0.4/Executors.java:572) at java.util.concurrent.FutureTask.run(java.base@21.0.4/FutureTask.java:317) at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@21.0.4/ThreadPoolExecutor.java:1144) at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@21.0.4/ThreadPoolExecutor.java:642) at java.lang.Thread.runWith(java.base@21.0.4/Thread.java:1596) at java.lang.Thread.run(java.base@21.0.4/Thread.java:1583) Locked ownable synchronizers: - <0x00000005ae42dc20> (a java.util.concurrent.ThreadPoolExecutor$Worker) - <0x00000005ae437de8> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
Regardless of the underlying cause, this existing design creates a critical vulnerability: a single misbehaving connector can block offset commits for all connectors. To maintain robustness, the commit method must remain lightweight and non-blocking.
Proposed Solution
- Move LSN Flush Logic to the Task Thread
Shift the flushLSN invocation from the SourceTaskOffsetCommitter thread to the connector's task thread. This ensures that even if flushLSN hangs or slows down, it doesn't impact other connectors. - Introduce a Timeout for LSN Flush
Add a configurable timeout for the LSN flush operation. If the flush does not complete within the defined time, the connector task should fail fast with a clear error message. This prevents indefinite blocking and simplifies debugging.
- is depended on by
-
DBZ-9156 Decouple low watermark flush from offset commit thread
-
- Closed
-
- links to
-
RHEA-2025:154266 Red Hat build of Debezium 3.2.4 release