-
Bug
-
Resolution: Done
-
Major
-
2.6.2.Final
-
None
-
False
-
None
-
False
Bug report
(information copied from https://debezium.zulipchat.com/#narrow/channel/348249-community-postgresql/topic/Broken.20pipe.20on.20streaming.20connection.20after.20blocking.20snapshot )
For bug reports, provide this information, please:
What Debezium connector do you use and what version?
PostgresSQL 2.6.2 on Confluent Platform Connect docker image 7.6.1
What is the connector configuration?
Some key configuration is that exactly-once sources are enabled on the worker, and transaction.boundary is set to interval for every few seconds. Also the snapshot.mode is set to no_data and snapshot was instead initiated using adhoc blocking snapshot written to signal table.
What is the captured database version and mode of deployment?
Amazon Aurora /w PG 13
What behavior do you expect?
Streaming should resume after an adhoc blocking snapshot finishes.
What behavior do you see?
It seems that the Debezium PG connector doesn't handle the situation well if there is an issue resuming streaming from the streaming connection after an adhoc blocking snapshot has finished. Key exception, this trace is from v2.6.2.Final (adapted from a JSON log entry):
message is Producer failure
method is setProducerThrowable
org.postgresql.util.PSQLException: Database connection failed when writing to copy at org.postgresql.core.v3.QueryExecutorImpl.flushCopy(QueryExecutorImpl.java:1176) at org.postgresql.core.v3.CopyDualImpl.flushCopy(CopyDualImpl.java:30) at org.postgresql.core.v3.replication.V3PGReplicationStream.updateStatusInternal(V3PGReplicationStream.java:195) at org.postgresql.core.v3.replication.V3PGReplicationStream.timeUpdateStatus(V3PGReplicationStream.java:186) at org.postgresql.core.v3.replication.V3PGReplicationStream.readInternal(V3PGReplicationStream.java:128) at org.postgresql.core.v3.replication.V3PGReplicationStream.readPending(V3PGReplicationStream.java:82) at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.readPending(PostgresReplicationConnection.java:622) at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.processMessages(PostgresStreamingChangeEventSource.java:217) at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:179) at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:42) at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:280) at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:197) at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:140) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.net.SocketException: Broken pipe (Write failed) at java.base/java.net.SocketOutputStream.socketWrite0(Native Method) at java.base/java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:110) at java.base/java.net.SocketOutputStream.write(SocketOutputStream.java:150) at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81) at java.base/java.io.BufferedOutputStream.flush(BufferedOutputStream.java:142) at org.postgresql.core.PGStream.flush(PGStream.java:721) at org.postgresql.core.v3.QueryExecutorImpl.flushCopy(QueryExecutorImpl.java:1174) ... 17 more
This is reported immediately after "Streaming resumed" and other messages indicating completion of all snapshots.
The connector then restarted and crashed again due to https://issues.redhat.com/browse/DBZ-7903 (I'm still on 2.6.2). I checked the offsets using kcctl:
{ "offsets" : [ { "partition" : { "server" : "cdc.postgres.gringotts.v1" }, "offset" : { "last_snapshot_record" : false, "lsn" : 9699077357080, "txId" : 7873826253, "ts_usec" : 1738804917248122, "snapshot" : true } } ] }
Do you see the same behaviour using the latest released Debezium version?
Untested, but code analysis suggests that the keep-alive "SELECT 1" query that is supposed to run every 10 seconds does not take place during a blocking snapshot, and this is still the case in the latest version, so there's little reason to suspect a change in behavior here.
Do you have the connector logs, ideally from start till finish?
see above
How to reproduce the issue using our tutorial deployment?
not attempted
Additional thoughts
(copied from Zulip)
James:
It seems to me this problem stems from some combination of:
- In processMessages, there is a function probeConnectionIfNeeded which is called every 10 seconds to run a SELECT 1 query: https://github.com/debezium/debezium/blob/3a93f02b4cbdc111557e5b5f7837b59005abfea8/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java#L225 .... However, it is not called during the adhoc snapshot when streaming is paused... our logs show Streaming will now pause and Streaming resumed messages followed immediately by above crash. We don't normally see the connections die like this during streaming, so my best guess is the snapshot took too long and the idle connection was lost. (Would it make sense to probe the streaming connection while paused due to snapshot?)
- The offsets indicating that the snapshot was completed don't seem to be committed. Thus a connector restart results in (attempting to) restart the snapshot. (Would it make sense to / is it possible to have the offsets indicating snapshot completion committed prior to restarting the connector?)
- The connector seems to rely on restarting the whole connector, rather than having smaller retry loops in the code. This seems fine, except in this case it's quite disruptive and results in resnapshotting everything all over again. The replication slot was created before the snapshot was started, so I see no technical reason why anything should have to be resnapshotted if the connection is lost... (Would it make sense to have some additional retry mechanism inside the streaming event source to mask database disconnects?)
Any thoughts? My guess is adding the keep-alive query to the "streaming paused" state (#1 above) would fix the issue in our case, since we don't regularly see this error other times. (Surely the keep-alive query was previously added for a reason like this?) But, since the slot existed prior to snapshot (since this is adhoc blocking snapshot).... it seems like there's no reason that the connector shouldn't reasonably be able to handle a lost streaming connection in a way that doesn't require resnapshotting?
Jiri
Hi, thenaks for the execellent analysis! Yes I agree that in this case just having something that would regularly "ping" the connection to keep it alive would be sufficient. Would you be interested in sending a PR?