-
Bug
-
Resolution: Done
-
Major
-
0.6
-
None
RecordsStreamProducer throws java.util.concurrent.RejectedExecutionException after rebalance started:
[2017-11-27T14:17:03,463][INFO][category=org.apache.kafka.connect.runtime.distributed.DistributedHerder] Rebalance started ... [2017-11-27T14:17:07,189][ERROR][category=io.debezium.connector.postgresql.RecordsSnapshotProducer] unexpected exception org.apache.kafka.connect.errors.ConnectException: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@45d0bbe7 rejected from java.util.concurrent.ThreadPoolExecutor@270a9ed9[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] at io.debezium.connector.postgresql.RecordsStreamProducer.start(RecordsStreamProducer.java:100) at io.debezium.connector.postgresql.RecordsSnapshotProducer.lambda$startStreaming$3(RecordsSnapshotProducer.java:98) at java.util.Optional.ifPresent(Optional.java:159) at io.debezium.connector.postgresql.RecordsSnapshotProducer.startStreaming(RecordsSnapshotProducer.java:96) at io.debezium.connector.postgresql.RecordsSnapshotProducer.lambda$start$2(RecordsSnapshotProducer.java:78) at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:705) at java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:687) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1632) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@45d0bbe7 rejected from java.util.concurrent.ThreadPoolExecutor@270a9ed9[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112) at java.util.concurrent.Executors$DelegatedExecutorService.submit(Executors.java:678) at io.debezium.connector.postgresql.RecordsStreamProducer.start(RecordsStreamProducer.java:98) ... 11 more
because PostgresConnector can be stoped by Kafka Connect during startStreaming method invocation:
io.debezium.connector.postgresql.RecordsSnapshotProducer.java
@Override protected void start(Consumer<SourceRecord> recordConsumer) { // MDC should be in inherited from parent to child threads LoggingContext.PreviousContext previousContext = taskContext.configureLoggingContext(CONTEXT_NAME); try { CompletableFuture.runAsync(() -> this.takeSnapshot(recordConsumer), executorService) .thenRun(() -> this.startStreaming(recordConsumer)) .exceptionally(this::handleException); } finally { previousContext.restore(); } }
Root cause:
RecordsStreamProducer has synchronized stop method:
io.debezium.connector.postgresql.RecordsStreamProducer.java
@Override protected synchronized void stop() { LoggingContext.PreviousContext previousContext = taskContext.configureLoggingContext(CONTEXT_NAME); try { if (replicationStream.get() == null) { logger.debug("already stopped...."); return; } closeConnections(); } finally { replicationStream.set(null); executorService.shutdownNow(); previousContext.restore(); } }
but start method doesn't have synchronized keyword and it doesn't have executorService.isShutdown() condition to return from the method:
io.debezium.connector.postgresql.RecordsStreamProducer.java
@Override protected void start(Consumer<SourceRecord> recordConsumer) { LoggingContext.PreviousContext previousContext = taskContext.configureLoggingContext(CONTEXT_NAME); try { if (sourceInfo.hasLastKnownPosition()) { // start streaming from the last recorded position in the offset Long lsn = sourceInfo.lsn(); if (logger.isDebugEnabled()) { logger.debug("retrieved latest position from stored offset '{}'", ReplicationConnection.format(lsn)); } replicationStream.compareAndSet(null, replicationConnection.startStreaming(lsn)); } else { logger.info("no previous LSN found in Kafka, streaming from the latest xlogpos or flushed LSN..."); replicationStream.compareAndSet(null, replicationConnection.startStreaming()); } // refresh the schema so we have a latest view of the DB tables taskContext.refreshSchema(true); // the new thread will inherit it's parent MDC executorService.submit(() -> streamChanges(recordConsumer)); } catch (Throwable t) { throw new ConnectException(t.getCause() != null ? t.getCause() : t); } finally { previousContext.restore(); } }