Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-9617

Handle exceptions thrown from perfromCommit method to gracefully cleanup the coordinator during the stop call

XMLWordPrintable

    • False
    • Hide

      None

      Show
      None
    • False
    • Important

      Bug report

      For bug reports, provide this information, please:

      What Debezium connector do you use and what version?

      Debezium Postgres 3.0.8

      What is the connector configuration?

      {
               "database.dbname": "mydb",
               "database.hostname": "<HOSTNAME>",
               "database.password": "<PASSWORD>",
               "database.port": "5432",
               "database.user": "postgresadmin",
               "name": "myconnector",
               "publication.name": "dbz_publication",
               "slot.name": "debezium",
               "snapshot.mode": "initial",
               "table.include.list": "public.*",
               "tasks.max": "1",
               "topic.prefix": "mydb-prod"
      } 

      What is the captured database version and mode of deployment?

      (E.g. on-premises, with a specific cloud provider, etc.)

      RDS Postgres v14

      What behavior do you expect?

      Expect the threads associated with the connector task to be cleaned-up gracefully when the task is stopped, irrespective of any exception thrown during the offset flush.

      What behavior do you see?

      In a distributed worker setup, during a Connect worker rebalance, the coordinator thread didn't shut down when the task was stopped and a new task was spawned on a different worker.
      As a result, the original coordinator thread continued acquiring the replication slot, and the new task's coordinator thread failed to acquire it until the DB-side process holding the slot was terminated.

      Do you see the same behaviour using the latest released Debezium version?

      (Ideally, also verify with latest Alpha/Beta/CR version)

      Yes, the code path is same in the latest version.

      Do you have the connector logs, ideally from start till finish?

      (You might be asked later to provide DEBUG/TRACE level log)

      Here are some important logs highlighting the issue:

      Oct 23, 2025 @ 13:59:30.521 [connect-2] Stopping task myconnector-0
      
      Oct 23, 2025 @ 13:59:31.073 [connect-2] WorkerSourceTask{id=myconnector-0} Committing offsets for 5069 acknowledged messages
      
      Oct 23, 2025 @ 14:00:00.526 [connect-2] Graceful stop of task myconnector-0 failed.
      
      Oct 23, 2025 @ 14:00:03.399 [connect-4] Creating task myconnector-0 intial state STARTED
      
      Oct 23, 2025 @ 14:00:03.399 [connect-4] Instantiated task myconnector-0 with version 3.0.8.Final of type io.debezium.connector.postgresql.PostgresConnectorTask
      
      Oct 23, 2025 @ 14:00:03.418. [connect-2] Failed to fetch offsets from namespace myconnector: 
      
      org.apache.kafka.connect.errors.ConnectException: Offset reader is closed. This is likely because the task has already been scheduled to stop but has taken longer than the graceful shutdown period to do so.
      at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:92)
      at io.debezium.connector.common.OffsetReader.offsets(OffsetReader.java:42)
      at io.debezium.connector.common.BaseSourceTask.getPreviousOffsets(BaseSourceTask.java:523)
      at io.debezium.connector.postgresql.PostgresConnectorTask.performCommit(PostgresConnectorTask.java:410)
      at io.debezium.connector.common.BaseSourceTask.stop(BaseSourceTask.java:424)
      at org.apache.kafka.common.utils.Utils.closeQuietly(Utils.java:1288)
      at org.apache.kafka.common.utils.Utils.closeQuietly(Utils.java:1271)
      at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.close(AbstractWorkerSourceTask.java:337)
      at org.apache.kafka.connect.runtime.WorkerTask.doClose(WorkerTask.java:249)
      at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:284)
      at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:328)
      at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:88)
      at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$7(Plugins.java:341)
      at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
      at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
      at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
      at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
      at java.base/java.lang.Thread.run(Thread.java:1583) 

      How to reproduce the issue using our tutorial deployment?

      Don't have the reproduction steps. Reading the offsets took too long and resulted in a ConnectException.

      Implementation ideas (optional)

      Need to catch the exceptions thrown in the performCommit call in the stop method and gracefully call the following stop(boolean) method for cleanup of coordinator thread.

              Unassigned Unassigned
              rdangwal Rajendra Dangwal
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

                Created:
                Updated:
                Resolved: