Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-7707

Async engine may fail with RejectedExecutionException

    XMLWordPrintable

Details

    • Bug
    • Resolution: Unresolved
    • Major
    • 2.7.0.Alpha2
    • 2.6.0.CR1
    • embedded-engine
    • None
    • False
    • None
    • False

    Description

      Async engine may fail with RejectedExecutionException, e.g.

      [debezium-postgresconnector-server1-change-event-source-coordinator] INFO io.debezium.connector.postgresql.PostgresSchema - REPLICA IDENTITY for 'inventory.test' is 'DEFAULT'; UPDATE and DELETE events will contain previous values only for PK columns
      [debezium-postgresconnector-server1-change-event-source-coordinator] INFO io.debezium.connector.postgresql.PostgresStreamingChangeEventSource - Processing messages
      finished
      [io.debezium.performance.embedded.PostgresEndToEndPerf.capture-jmh-worker-1] INFO io.debezium.embedded.async.AsyncEmbeddedEngine - Engine state has changed from 'POLLING_TASKS' to 'STOPPING'
      [pool-13-thread-1] ERROR io.debezium.embedded.async.AsyncEmbeddedEngine - Engine has failed with 
      java.util.concurrent.ExecutionException: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@61eecc3a[Not completed, task = io.debezium.embedded.async.ProcessingCallables$TransformConvertConsumeRecord@32c121c6] rejected from java.util.concurrent.ThreadPoolExecutor@7724251[Shutting down, pool size = 12, active threads = 12, queued tasks = 11031, completed tasks = 11485]
              at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
              at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
              at io.debezium.embedded.async.AsyncEmbeddedEngine.runTasksPolling(AsyncEmbeddedEngine.java:462)
              at io.debezium.embedded.async.AsyncEmbeddedEngine.run(AsyncEmbeddedEngine.java:206)
              at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
              at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
              at java.base/java.lang.Thread.run(Thread.java:1583)
      Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@61eecc3a[Not completed, task = io.debezium.embedded.async.ProcessingCallables$TransformConvertConsumeRecord@32c121c6] rejected from java.util.concurrent.ThreadPoolExecutor@7724251[Shutting down, pool size = 12, active threads = 12, queued tasks = 11031, completed tasks = 11485]
              at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2081)
              at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:841)
              at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1376)
              at java.base/java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:145)
              at io.debezium.embedded.async.ParallelSmtAndConvertAsyncConsumerProcessor.lambda$processRecords$0(ParallelSmtAndConvertAsyncConsumerProcessor.java:45)
              at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708)
              at java.base/java.util.stream.ReferencePipeline$Head.forEachOrdered(ReferencePipeline.java:772)
              at io.debezium.embedded.async.ParallelSmtAndConvertAsyncConsumerProcessor.processRecords(ParallelSmtAndConvertAsyncConsumerProcessor.java:44)
              at io.debezium.embedded.async.AsyncEmbeddedEngine$PollRecords.doCall(AsyncEmbeddedEngine.java:1155)
              at io.debezium.embedded.async.AsyncEmbeddedEngine$PollRecords.doCall(AsyncEmbeddedEngine.java:1136)
              at io.debezium.embedded.async.RetryingCallable.call(RetryingCallable.java:47)
              at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
              at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
              at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
              ... 3 more
      [pool-11-thread-1] INFO io.debezium.connector.common.BaseSourceTask - Stopping down connector
      
      ...
      
      [pool-13-thread-1] INFO io.debezium.embedded.async.AsyncEmbeddedEngine - Engine state has changed from 'STOPPING' to 'STOPPED'
      [pool-13-thread-1] ERROR io.debezium.embedded.async.AsyncEmbeddedEngine - java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@61eecc3a[Not completed, task = io.debezium.embedded.async.ProcessingCallables$TransformConvertConsumeRecord@32c121c6] rejected from java.util.concurrent.ThreadPoolExecutor@7724251[Shutting down, pool size = 12, active threads = 12, queued tasks = 11031, completed tasks = 11485]
      java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@61eecc3a[Not completed, task = io.debezium.embedded.async.ProcessingCallables$TransformConvertConsumeRecord@32c121c6] rejected from java.util.concurrent.ThreadPoolExecutor@7724251[Shutting down, pool size = 12, active threads = 12, queued tasks = 11031, completed tasks = 11485]
              at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2081)
              at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:841)
              at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1376)
              at java.base/java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:145)
              at io.debezium.embedded.async.ParallelSmtAndConvertAsyncConsumerProcessor.lambda$processRecords$0(ParallelSmtAndConvertAsyncConsumerProcessor.java:45)
              at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708)
              at java.base/java.util.stream.ReferencePipeline$Head.forEachOrdered(ReferencePipeline.java:772)
              at io.debezium.embedded.async.ParallelSmtAndConvertAsyncConsumerProcessor.processRecords(ParallelSmtAndConvertAsyncConsumerProcessor.java:44)
              at io.debezium.embedded.async.AsyncEmbeddedEngine$PollRecords.doCall(AsyncEmbeddedEngine.java:1155)
              at io.debezium.embedded.async.AsyncEmbeddedEngine$PollRecords.doCall(AsyncEmbeddedEngine.java:1136)
              at io.debezium.embedded.async.RetryingCallable.call(RetryingCallable.java:47)
              at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
              at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
              at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
              at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
              at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
              at java.base/java.lang.Thread.run(Thread.java:1583)
      

      This may lead to other failure, e.g. not closing resource properly and failure upon next start:

      [debezium-postgresconnector-server1-change-event-source-coordinator] WARN io.debezium.connector.postgresql.connection.PostgresReplicationConnection - Failed to start replication stream at LSN{0/23D18FB0}, waiting for PT10S ms and retrying, attempt number 1 over 6
      org.postgresql.util.PSQLException: ERROR: replication slot "debezium" is active for PID 1547
              at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2713)
              at org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:1299)
              at org.postgresql.core.v3.QueryExecutorImpl.startCopy(QueryExecutorImpl.java:968)
              at org.postgresql.core.v3.replication.V3ReplicationProtocol.initializeReplication(V3ReplicationProtocol.java:60)
              at org.postgresql.core.v3.replication.V3ReplicationProtocol.startLogical(V3ReplicationProtocol.java:44)
              at org.postgresql.replication.fluent.ReplicationStreamBuilder$1.start(ReplicationStreamBuilder.java:38)
              at org.postgresql.replication.fluent.logical.LogicalStreamBuilder.start(LogicalStreamBuilder.java:41)
              at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.startPgReplicationStream(PostgresReplicationConnection.java:734)
              at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.createReplicationStream(PostgresReplicationConnection.java:580)
              at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.startStreaming(PostgresReplicationConnection.java:410)
              at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.startStreaming(PostgresReplicationConnection.java:386)
              at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:149)
              at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:42)
              at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:274)
              at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:197)
              at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:140)
              at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
              at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
              at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
              at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
              at java.base/java.lang.Thread.run(Thread.java:1583)
      [debezium-postgresconnector-server1-change-event-source-coordinator] WARN io.debezium.connector.postgresql.connection.PostgresReplicationConnection - Failed to start replication stream at LSN{0/23D18FB0}, waiting for PT10S ms and retrying, attempt number 2 over 6
      org.postgresql.util.PSQLException: ERROR: replication slot "debezium" is active for PID 1547
              at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2713)
              at org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:1299)
              at org.postgresql.core.v3.QueryExecutorImpl.startCopy(QueryExecutorImpl.java:968)
              at org.postgresql.core.v3.replication.V3ReplicationProtocol.initializeReplication(V3ReplicationProtocol.java:60)
              at org.postgresql.core.v3.replication.V3ReplicationProtocol.startLogical(V3ReplicationProtocol.java:44)
              at org.postgresql.replication.fluent.ReplicationStreamBuilder$1.start(ReplicationStreamBuilder.java:38)
              at org.postgresql.replication.fluent.logical.LogicalStreamBuilder.start(LogicalStreamBuilder.java:41)
              at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.startPgReplicationStream(PostgresReplicationConnection.java:734)
              at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.createReplicationStream(PostgresReplicationConnection.java:580)
              at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.startStreaming(PostgresReplicationConnection.java:410)
              at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.startStreaming(PostgresReplicationConnection.java:386)
              at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:149)
              at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:42)
              at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:274)
              at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:197)
              at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:140)
              at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
              at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
              at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
              at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
              at java.base/java.lang.Thread.run(Thread.java:1583)
      

      Attachments

        Activity

          People

            vjuranek@redhat.com Vojtech Juranek
            vjuranek@redhat.com Vojtech Juranek
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: