-
Bug
-
Resolution: Done
-
Major
-
0.6
-
None
RecordsStreamProducer throws java.util.concurrent.RejectedExecutionException after rebalance started:
[2017-11-27T14:17:03,463][INFO][category=org.apache.kafka.connect.runtime.distributed.DistributedHerder] Rebalance started
...
[2017-11-27T14:17:07,189][ERROR][category=io.debezium.connector.postgresql.RecordsSnapshotProducer] unexpected exception
org.apache.kafka.connect.errors.ConnectException: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@45d0bbe7 rejected from java.util.concurrent.ThreadPoolExecutor@270a9ed9[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
at io.debezium.connector.postgresql.RecordsStreamProducer.start(RecordsStreamProducer.java:100)
at io.debezium.connector.postgresql.RecordsSnapshotProducer.lambda$startStreaming$3(RecordsSnapshotProducer.java:98)
at java.util.Optional.ifPresent(Optional.java:159)
at io.debezium.connector.postgresql.RecordsSnapshotProducer.startStreaming(RecordsSnapshotProducer.java:96)
at io.debezium.connector.postgresql.RecordsSnapshotProducer.lambda$start$2(RecordsSnapshotProducer.java:78)
at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:705)
at java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:687)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1632)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@45d0bbe7 rejected from java.util.concurrent.ThreadPoolExecutor@270a9ed9[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at java.util.concurrent.Executors$DelegatedExecutorService.submit(Executors.java:678)
at io.debezium.connector.postgresql.RecordsStreamProducer.start(RecordsStreamProducer.java:98)
... 11 more
because PostgresConnector can be stoped by Kafka Connect during startStreaming method invocation:
io.debezium.connector.postgresql.RecordsSnapshotProducer.java
@Override protected void start(Consumer<SourceRecord> recordConsumer) { // MDC should be in inherited from parent to child threads LoggingContext.PreviousContext previousContext = taskContext.configureLoggingContext(CONTEXT_NAME); try { CompletableFuture.runAsync(() -> this.takeSnapshot(recordConsumer), executorService) .thenRun(() -> this.startStreaming(recordConsumer)) .exceptionally(this::handleException); } finally { previousContext.restore(); } }
Root cause:
RecordsStreamProducer has synchronized stop method:
io.debezium.connector.postgresql.RecordsStreamProducer.java
@Override protected synchronized void stop() { LoggingContext.PreviousContext previousContext = taskContext.configureLoggingContext(CONTEXT_NAME); try { if (replicationStream.get() == null) { logger.debug("already stopped...."); return; } closeConnections(); } finally { replicationStream.set(null); executorService.shutdownNow(); previousContext.restore(); } }
but start method doesn't have synchronized keyword and it doesn't have executorService.isShutdown() condition to return from the method:
io.debezium.connector.postgresql.RecordsStreamProducer.java
@Override protected void start(Consumer<SourceRecord> recordConsumer) { LoggingContext.PreviousContext previousContext = taskContext.configureLoggingContext(CONTEXT_NAME); try { if (sourceInfo.hasLastKnownPosition()) { // start streaming from the last recorded position in the offset Long lsn = sourceInfo.lsn(); if (logger.isDebugEnabled()) { logger.debug("retrieved latest position from stored offset '{}'", ReplicationConnection.format(lsn)); } replicationStream.compareAndSet(null, replicationConnection.startStreaming(lsn)); } else { logger.info("no previous LSN found in Kafka, streaming from the latest xlogpos or flushed LSN..."); replicationStream.compareAndSet(null, replicationConnection.startStreaming()); } // refresh the schema so we have a latest view of the DB tables taskContext.refreshSchema(true); // the new thread will inherit it's parent MDC executorService.submit(() -> streamChanges(recordConsumer)); } catch (Throwable t) { throw new ConnectException(t.getCause() != null ? t.getCause() : t); } finally { previousContext.restore(); } }