-
Task
-
Resolution: Done
-
Major
-
None
-
None
When running through tests in the debugger and stopping in the event polling loop, I'll often see exceptions like this, indicating an unclean connector shutdown:
2020-06-04 21:46:12,132 ERROR PostgresConnectorIT||engine Interrupted while stopping coordinator [io.debezium.connector.common.BaseSourceTask] java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2088) at java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1475) at java.util.concurrent.Executors$DelegatedExecutorService.awaitTermination(Executors.java:675) at io.debezium.pipeline.ChangeEventSourceCoordinator.stop(ChangeEventSourceCoordinator.java:138) at io.debezium.connector.common.BaseSourceTask.stop(BaseSourceTask.java:192) at io.debezium.connector.common.BaseSourceTask.stop(BaseSourceTask.java:171) at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:824) at io.debezium.embedded.AbstractConnectorTest.lambda$3(AbstractConnectorTest.java:327) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2020-06-04 21:46:17,411 ERROR PostgresConnectorIT||engine Error while trying to stop the task and commit the offsets [io.debezium.connector.postgresql.PostgresConnectorIT] org.apache.kafka.connect.errors.ConnectException: Interrupted while stopping coordinator, failing the task at io.debezium.connector.common.BaseSourceTask.stop(BaseSourceTask.java:198) at io.debezium.connector.common.BaseSourceTask.stop(BaseSourceTask.java:171) at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:824) at io.debezium.embedded.AbstractConnectorTest.lambda$3(AbstractConnectorTest.java:327) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
Note how the call to awaitTermination() in the coordinator is interrupted. This will only happen when that calling thread itself is interrupted.
I think the root cause is that we have a very short waiting time in the embedded engine before interrupting the engine thread in EmbeddedEngine#stop() (see WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_DEFAULT). Specifically, that waiting period should be longer than 2 * 90 sec which are used in the coordinator for shutting down.