Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-2386

Exception during commit offsets won't trigger retry logic.

XMLWordPrintable

      This was reported specifically against the PostgreSQL connector but most likely impacts all connectors.

      The issue is during the PostgresStreamingChangeEventSource#commitOffset(Map), we explicitly catch the SQLException and throw a ConnectException. This exception is then never queued up to be tested as a retry-able exception.

      The stack trace

      org.apache.kafka.connect.errors.ConnectException: org.postgresql.util.PSQLException: Database connection failed when writing to copy
      at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.commitOffset(PostgresStreamingChangeEventSource.java:281)
      at io.debezium.pipeline.ChangeEventSourceCoordinator.commitOffset(ChangeEventSourceCoordinator.java:124)
      at io.debezium.connector.common.BaseSourceTask.commit(BaseSourceTask.java:230)
      at org.apache.kafka.connect.runtime.WorkerSourceTask.commitSourceTask(WorkerSourceTask.java:507)
      at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:448)
      at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:239)
      at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
      at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)
      Caused by: org.postgresql.util.PSQLException: Database connection failed when writing to copy
      at org.postgresql.core.v3.QueryExecutorImpl.flushCopy(QueryExecutorImpl.java:1081)
      at org.postgresql.core.v3.CopyDualImpl.flushCopy(CopyDualImpl.java:28)
      at org.postgresql.core.v3.replication.V3PGReplicationStream.updateStatusInternal(V3PGReplicationStream.java:193)
      at org.postgresql.core.v3.replication.V3PGReplicationStream.forceUpdateStatus(V3PGReplicationStream.java:111)
      at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.doFlushLsn(PostgresReplicationConnection.java:435)
      at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.flushLsn(PostgresReplicationConnection.java:428)
      at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.commitOffset(PostgresStreamingChangeEventSource.java:274)
      ... 12 more
      Caused by: java.net.SocketException: Broken pipe
      at java.net.SocketOutputStream.socketWrite0(Native Method)
      at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
      at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
      at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
      at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
      at org.postgresql.core.PGStream.flush(PGStream.java:592)
      at org.postgresql.core.v3.QueryExecutorImpl.flushCopy(QueryExecutorImpl.java:1079)
      ... 18 more
      [2020-07-10 21:36:49,722] ERROR WorkerSourceTask{id=XXXXXXXXXXX} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
      org.apache.kafka.connect.errors.ConnectException: Could not create PG connection
      at io.debezium.connector.postgresql.TypeRegistry.prime(TypeRegistry.java:336)
      at io.debezium.connector.postgresql.TypeRegistry.<init>(TypeRegistry.java:117)
      at io.debezium.connector.postgresql.connection.PostgresConnection.<init>(PostgresConnection.java:72)
      at io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:70)
      at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:101)
      at io.debezium.connector.common.BaseSourceTask.startIfNeededAndPossible(BaseSourceTask.java:156)
      at io.debezium.connector.common.BaseSourceTask.poll(BaseSourceTask.java:119)
      at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:245)
      at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:221)
      at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
      at org.apache.kafka.connect.runtime.WorkerTask.run(Worker
      

              Unassigned Unassigned
              ccranfor@redhat.com Chris Cranford
              Votes:
              1 Vote for this issue
              Watchers:
              5 Start watching this issue

                Created:
                Updated:
                Resolved: