-
Bug
-
Resolution: Done
-
Major
-
None
-
None
This was reported specifically against the PostgreSQL connector but most likely impacts all connectors.
The issue is during the PostgresStreamingChangeEventSource#commitOffset(Map), we explicitly catch the SQLException and throw a ConnectException. This exception is then never queued up to be tested as a retry-able exception.
The stack trace
org.apache.kafka.connect.errors.ConnectException: org.postgresql.util.PSQLException: Database connection failed when writing to copy at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.commitOffset(PostgresStreamingChangeEventSource.java:281) at io.debezium.pipeline.ChangeEventSourceCoordinator.commitOffset(ChangeEventSourceCoordinator.java:124) at io.debezium.connector.common.BaseSourceTask.commit(BaseSourceTask.java:230) at org.apache.kafka.connect.runtime.WorkerSourceTask.commitSourceTask(WorkerSourceTask.java:507) at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:448) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:239) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.postgresql.util.PSQLException: Database connection failed when writing to copy at org.postgresql.core.v3.QueryExecutorImpl.flushCopy(QueryExecutorImpl.java:1081) at org.postgresql.core.v3.CopyDualImpl.flushCopy(CopyDualImpl.java:28) at org.postgresql.core.v3.replication.V3PGReplicationStream.updateStatusInternal(V3PGReplicationStream.java:193) at org.postgresql.core.v3.replication.V3PGReplicationStream.forceUpdateStatus(V3PGReplicationStream.java:111) at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.doFlushLsn(PostgresReplicationConnection.java:435) at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.flushLsn(PostgresReplicationConnection.java:428) at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.commitOffset(PostgresStreamingChangeEventSource.java:274) ... 12 more Caused by: java.net.SocketException: Broken pipe at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109) at java.net.SocketOutputStream.write(SocketOutputStream.java:153) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at org.postgresql.core.PGStream.flush(PGStream.java:592) at org.postgresql.core.v3.QueryExecutorImpl.flushCopy(QueryExecutorImpl.java:1079) ... 18 more [2020-07-10 21:36:49,722] ERROR WorkerSourceTask{id=XXXXXXXXXXX} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask) org.apache.kafka.connect.errors.ConnectException: Could not create PG connection at io.debezium.connector.postgresql.TypeRegistry.prime(TypeRegistry.java:336) at io.debezium.connector.postgresql.TypeRegistry.<init>(TypeRegistry.java:117) at io.debezium.connector.postgresql.connection.PostgresConnection.<init>(PostgresConnection.java:72) at io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:70) at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:101) at io.debezium.connector.common.BaseSourceTask.startIfNeededAndPossible(BaseSourceTask.java:156) at io.debezium.connector.common.BaseSourceTask.poll(BaseSourceTask.java:119) at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:245) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:221) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) at org.apache.kafka.connect.runtime.WorkerTask.run(Worker