Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-2150

Await coordinator shutdown in embedded engine

XMLWordPrintable

      When running through tests in the debugger and stopping in the event polling loop, I'll often see exceptions like this, indicating an unclean connector shutdown:

      2020-06-04 21:46:12,132 ERROR  PostgresConnectorIT||engine  Interrupted while stopping coordinator   [io.debezium.connector.common.BaseSourceTask]
      java.lang.InterruptedException
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2088)
      	at java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1475)
      	at java.util.concurrent.Executors$DelegatedExecutorService.awaitTermination(Executors.java:675)
      	at io.debezium.pipeline.ChangeEventSourceCoordinator.stop(ChangeEventSourceCoordinator.java:138)
      	at io.debezium.connector.common.BaseSourceTask.stop(BaseSourceTask.java:192)
      	at io.debezium.connector.common.BaseSourceTask.stop(BaseSourceTask.java:171)
      	at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:824)
      	at io.debezium.embedded.AbstractConnectorTest.lambda$3(AbstractConnectorTest.java:327)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	at java.lang.Thread.run(Thread.java:748)
      2020-06-04 21:46:17,411 ERROR  PostgresConnectorIT||engine  Error while trying to stop the task and commit the offsets   [io.debezium.connector.postgresql.PostgresConnectorIT]
      org.apache.kafka.connect.errors.ConnectException: Interrupted while stopping coordinator, failing the task
      	at io.debezium.connector.common.BaseSourceTask.stop(BaseSourceTask.java:198)
      	at io.debezium.connector.common.BaseSourceTask.stop(BaseSourceTask.java:171)
      	at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:824)
      	at io.debezium.embedded.AbstractConnectorTest.lambda$3(AbstractConnectorTest.java:327)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	at java.lang.Thread.run(Thread.java:748)
      

      Note how the call to awaitTermination() in the coordinator is interrupted. This will only happen when that calling thread itself is interrupted.

      I think the root cause is that we have a very short waiting time in the embedded engine before interrupting the engine thread in EmbeddedEngine#stop() (see WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_DEFAULT). Specifically, that waiting period should be longer than 2 * 90 sec which are used in the coordinator for shutting down.

              gunnar.morling Gunnar Morling
              gunnar.morling Gunnar Morling
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

                Created:
                Updated:
                Resolved: