Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-501

PostgresConnector falls with RejectedExecutionException

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Done
    • Icon: Major Major
    • 0.7.0
    • 0.6
    • postgresql-connector
    • None

      RecordsStreamProducer throws java.util.concurrent.RejectedExecutionException after rebalance started:

      [2017-11-27T14:17:03,463][INFO][category=org.apache.kafka.connect.runtime.distributed.DistributedHerder] Rebalance started
      
      ...
      
      [2017-11-27T14:17:07,189][ERROR][category=io.debezium.connector.postgresql.RecordsSnapshotProducer] unexpected exception
      org.apache.kafka.connect.errors.ConnectException: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@45d0bbe7 rejected from java.util.concurrent.ThreadPoolExecutor@270a9ed9[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
          at io.debezium.connector.postgresql.RecordsStreamProducer.start(RecordsStreamProducer.java:100)
          at io.debezium.connector.postgresql.RecordsSnapshotProducer.lambda$startStreaming$3(RecordsSnapshotProducer.java:98)
          at java.util.Optional.ifPresent(Optional.java:159)
          at io.debezium.connector.postgresql.RecordsSnapshotProducer.startStreaming(RecordsSnapshotProducer.java:96)
          at io.debezium.connector.postgresql.RecordsSnapshotProducer.lambda$start$2(RecordsSnapshotProducer.java:78)
          at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:705)
          at java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:687)
          at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
          at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1632)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
          at java.lang.Thread.run(Thread.java:745)
      Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@45d0bbe7 rejected from java.util.concurrent.ThreadPoolExecutor@270a9ed9[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
          at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
          at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
          at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
          at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
          at java.util.concurrent.Executors$DelegatedExecutorService.submit(Executors.java:678)
          at io.debezium.connector.postgresql.RecordsStreamProducer.start(RecordsStreamProducer.java:98)
          ... 11 more
      

      because PostgresConnector can be stoped by Kafka Connect during startStreaming method invocation:

      io.debezium.connector.postgresql.RecordsSnapshotProducer.java
      @Override
      protected void start(Consumer<SourceRecord> recordConsumer) {
          // MDC should be in inherited from parent to child threads
          LoggingContext.PreviousContext previousContext = taskContext.configureLoggingContext(CONTEXT_NAME);
          try {
              CompletableFuture.runAsync(() -> this.takeSnapshot(recordConsumer), executorService)
                               .thenRun(() -> this.startStreaming(recordConsumer))
                               .exceptionally(this::handleException);
          } finally {
              previousContext.restore();
          }
      }
      

      Root cause:
      RecordsStreamProducer has synchronized stop method:

      io.debezium.connector.postgresql.RecordsStreamProducer.java
      @Override
      protected synchronized void stop() {
          LoggingContext.PreviousContext previousContext = taskContext.configureLoggingContext(CONTEXT_NAME);
      
          try {
              if (replicationStream.get() == null) {
                  logger.debug("already stopped....");
                  return;
              }
      
              closeConnections();
          } finally {
              replicationStream.set(null);
              executorService.shutdownNow();
              previousContext.restore();
          }
      }
      

      but start method doesn't have synchronized keyword and it doesn't have executorService.isShutdown() condition to return from the method:

      io.debezium.connector.postgresql.RecordsStreamProducer.java
      @Override
      protected void start(Consumer<SourceRecord> recordConsumer)  {
          LoggingContext.PreviousContext previousContext = taskContext.configureLoggingContext(CONTEXT_NAME);
          try {
              if (sourceInfo.hasLastKnownPosition()) {
                  // start streaming from the last recorded position in the offset
                  Long lsn = sourceInfo.lsn();
                  if (logger.isDebugEnabled()) {
                      logger.debug("retrieved latest position from stored offset '{}'", ReplicationConnection.format(lsn));
                  }
                  replicationStream.compareAndSet(null, replicationConnection.startStreaming(lsn));
              } else {
                  logger.info("no previous LSN found in Kafka, streaming from the latest xlogpos or flushed LSN...");
                  replicationStream.compareAndSet(null, replicationConnection.startStreaming());
              }
      
              // refresh the schema so we have a latest view of the DB tables
              taskContext.refreshSchema(true);
              // the new thread will inherit it's parent MDC
              executorService.submit(() -> streamChanges(recordConsumer));
          } catch (Throwable t) {
              throw new ConnectException(t.getCause() != null ? t.getCause() : t);
          } finally {
              previousContext.restore();
          }
      }
      

              jpechane Jiri Pechanec
              andrey.pustovetov@gmail.com Andrey Pustovetov (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

                Created:
                Updated:
                Resolved: