Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-9292

Unchecked exception from OffsetStorageWriter.doFlush() in AsyncEmbeddedEngine leaves semaphore in OffsetStorageWriter unreleased and probably causes engine to fail

XMLWordPrintable

    • False
    • Hide

      None

      Show
      None
    • False
    • Important

      In order to make your issue reports as actionable as possible, please provide the following information, depending on the issue type.

      Bug report

      For bug reports, provide this information, please:

      What Debezium connector do you use and what version?

      MariaDB connector, 3.2.0.Final

      What is the connector configuration?

      Starting MariaDbConnectorTask with configuration:
      snapshot.locking.mode = none
      connector.class = io.debezium.connector.mariadb.MariaDbConnector
      message.key.columns = (.*):id_user
      transforms = Reroute
      bootstrap.servers = localhost:38358
      include.schema.changes = false
      transforms.Reroute.topic.replacement = $1_YYYY_MM_DD
      record.processing.threads =
      offset.storage.topic = debezium.offsets
      errors.retry.delay.initial.ms = 300
      key.converter = org.apache.kafka.connect.json.JsonConverter
      database.user = debezium_source
      offset.storage = org.apache.kafka.connect.storage.KafkaOffsetBackingStore
      database.history.kafka.bootstrap.servers = localhost:38358
      schema.history.internal.kafka.bootstrap.servers = localhost:38358
      transforms.Reroute.topic.regex = ********
      schema.history.internal.skip.unparseable.ddl = true
      inconsistent.schema.handling.mode = fail
      value.converter.schema.registry.url = http://localhost:38359
      internal.task.management.timeout.ms = 8000000
      schema.history.internal.producer.acks = 1
      errors.max.retries = -1
      name = finopsium_412
      errors.tolerance = none
      transforms.Reroute.schema.name.adjustment.mode = avro
      snapshot.mode = no_data
      driver.connectionTimeZone = Europe/Moscow
      record.processing.shutdown.timeout.ms = 1000
      database.history.kafka.topic = debezium.database_history
      transforms.Reroute.key.enforce.uniqueness = false
      schema.history.internal.store.only.captured.databases.ddl = true
      schema.history.internal.store.only.captured.tables.ddl = true
      record.processing.order = ORDERED
      tombstones.on.delete = false
      topic.prefix = finance_core.finopsium
      decimal.handling.mode = string
      offset.storage.file.filename =
      schema.history.internal.kafka.topic = debezium.schema_history
      offset.storage.partitions = 1
      value.converter = org.apache.kafka.connect.json.JsonConverter
      database.allowPublicKeyRetrieval = true
      transforms.Reroute.type = io.debezium.transforms.ByLogicalTableRouter
      database.server.id = 1
      offset.flush.timeout.ms = 5000
      errors.retry.delay.max.ms = 10000
      event.processing.failure.handling.mode = warn
      database.port = 38324
      offset.flush.interval.ms = 60000
      database.ssl.mode = disable
      record.processing.with.serial.consumer = false
      database.connectionTimeZone = Europe/Moscow
      database.hostname = localhost
      schema.name.adjustment.mode = avro
      offset.storage.replication.factor = 1
      table.include.list = ********
      key.converter.schema.registry.url = http://localhost:38359
      database.include.list = ********

      What is the captured database version and mode of deployment?

      (E.g. on-premises, with a specific cloud provider, etc.)

      mariadb:10.5.21, Testcontainers

      What behavior do you expect?

      Offsets are saved on connector shutdown without 5-second timeout elapsed

      What behavior do you see?

      There is a timeout - see logs

      Do you see the same behaviour using the latest released Debezium version?

      (Ideally, also verify with latest Alpha/Beta/CR version)

      Yes

      Do you have the connector logs, ideally from start till finish?

      (You might be asked later to provide DEBUG/TRACE level log)

      17:22:56.839 [debezium-mariadbconnector-finance_core.finopsium-change-event-source-coordinator] INFO  i.d.pipeline.signal.SignalProcessor – SignalProcessor started. Scheduling it every 5000ms
      17:22:56.839 [debezium-mariadbconnector-finance_core.finopsium-change-event-source-coordinator] INFO  io.debezium.util.Threads – Creating thread debezium-mariadbconnector-finance_core.finopsium-SignalProcessor
      17:22:56.839 [debezium-mariadbconnector-finance_core.finopsium-change-event-source-coordinator] INFO  i.d.p.ChangeEventSourceCoordinator – Starting streaming
      17:22:56.842 [debezium-mariadbconnector-finance_core.finopsium-change-event-source-coordinator] INFO  i.d.c.b.BinlogStreamingChangeEventSource – GTID set purged on server: ''
      17:22:56.842 [debezium-mariadbconnector-finance_core.finopsium-change-event-source-coordinator] INFO  i.d.c.mariadb.jdbc.MariaDbConnection – Attempting to generate a filtered GTID set
      17:22:56.842 [debezium-mariadbconnector-finance_core.finopsium-change-event-source-coordinator] INFO  i.d.c.mariadb.jdbc.MariaDbConnection – GTID set from previous recorded offset: 1-1-17
      17:22:56.842 [debezium-mariadbconnector-finance_core.finopsium-change-event-source-coordinator] INFO  i.d.c.mariadb.jdbc.MariaDbConnection – GTID set available on server: 1-1-17
      17:22:56.843 [debezium-mariadbconnector-finance_core.finopsium-change-event-source-coordinator] INFO  i.d.c.mariadb.jdbc.MariaDbConnection – Using first available positions for new GTID channels
      17:22:56.843 [debezium-mariadbconnector-finance_core.finopsium-change-event-source-coordinator] INFO  i.d.c.mariadb.jdbc.MariaDbConnection – Relevant GTID set available on server: 1-1-17
      17:22:56.843 [debezium-mariadbconnector-finance_core.finopsium-change-event-source-coordinator] INFO  i.d.c.mariadb.jdbc.MariaDbConnection – Final merged GTID set to use when connecting to MariaDB: 1-1-17
      17:22:56.843 [debezium-mariadbconnector-finance_core.finopsium-change-event-source-coordinator] INFO  i.d.c.b.BinlogStreamingChangeEventSource – Registering binlog reader with GTID set: '1-1-17'
      17:22:56.844 [debezium-mariadbconnector-finance_core.finopsium-change-event-source-coordinator] INFO  c.g.s.mysql.binlog.BinaryLogClient – Enabling GTID
      17:22:56.844 [debezium-mariadbconnector-finance_core.finopsium-change-event-source-coordinator] INFO  i.d.c.b.BinlogStreamingChangeEventSource – Skip 0 events on streaming start
      17:22:56.844 [debezium-mariadbconnector-finance_core.finopsium-change-event-source-coordinator] INFO  i.d.c.b.BinlogStreamingChangeEventSource – Skip 0 rows on streaming start
      17:22:56.844 [debezium-mariadbconnector-finance_core.finopsium-change-event-source-coordinator] INFO  io.debezium.util.Threads – Creating thread debezium-mariadbconnector-finance_core.finopsium-binlog-client
      17:22:56.845 [blc-localhost:38435] INFO  io.debezium.util.Threads – Creating thread debezium-mariadbconnector-finance_core.finopsium-binlog-client
      17:22:56.846 [blc-localhost:38435] INFO  c.g.s.mysql.binlog.BinaryLogClient – Database version: 5.5.5-10.5.21-MariaDB-1:10.5.21+maria~ubu2004-log (major=5, minor=5, mariadb=true)
      17:22:56.849 [blc-localhost:38435] INFO  c.g.s.mysql.binlog.BinaryLogClient – Requesting streaming from GTID set: 1-1-17
      17:22:56.850 [blc-localhost:38435] INFO  c.g.s.mysql.binlog.BinaryLogClient – Connected to localhost:38435 at 1-1-17 (sid:1, cid:8)
      17:22:56.850 [blc-localhost:38435] INFO  i.d.c.b.BinlogStreamingChangeEventSource – Connected to binlog at localhost:38435, starting at BinlogOffsetContext{sourceInfoSchema=Schema

      {io.debezium.connector.mariadb.Source:STRUCT}

      , sourceInfo=BinlogSourceInfo

      {currentGtid='null', currentBinlogFilename='mysql_bin.000002', currentBinlogPosition=342, currentRowNumber=0, serverId=0, sourceTime=2025-07-28T14:22:56.826Z, threadId=-1, currentQuery='null', tableIds=[cfd.ma_withdrawal_casino_2024_02_02], databaseName='cfd'}

      , snapshotCompleted=true, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], restartGtidSet='1-1-17', currentGtidSet='1-1-17', restartBinlogFilename='mysql_bin.000002', restartBinlogPosition=342, restartRowsToSkip=0, restartEventsToSkip=0, currentEventLengthInBytes=0, inTransaction=false, transactionId='null', incrementalSnapshotContext=IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]}
      17:22:56.850 [debezium-mariadbconnector-finance_core.finopsium-change-event-source-coordinator] INFO  i.d.c.b.BinlogStreamingChangeEventSource – Waiting for keepalive thread to start
      17:22:56.851 [blc-localhost:38435] INFO  io.debezium.util.Threads – Creating thread debezium-mariadbconnector-finance_core.finopsium-binlog-client
      17:22:56.851 [debezium-mariadbconnector-finance_core.finopsium-change-event-source-coordinator] INFO  i.d.c.b.BinlogStreamingChangeEventSource – Keepalive thread is running
      17:22:56.942 [Test worker] INFO  com.zaxxer.hikari.HikariDataSource – HikariPool-1 - Starting...
      17:22:56.949 [Test worker] INFO  com.zaxxer.hikari.pool.HikariPool – HikariPool-1 - Added connection org.mariadb.jdbc.Connection@1cfa4ad9
      17:22:56.951 [Test worker] INFO  com.zaxxer.hikari.HikariDataSource – HikariPool-1 - Start completed.
      17:22:57.239 [Test worker] INFO  i.d.e.async.AsyncEmbeddedEngine – Engine state has changed from 'POLLING_TASKS' to 'STOPPING'
      17:22:57.240 [pool-2-thread-1] INFO  i.d.e.async.AsyncEmbeddedEngine – Task interrupted while polling.
      17:23:02.246 [Test worker] WARN  i.d.e.async.AsyncEmbeddedEngine – Failure during stopping tasks, stopping them immediately. Failed with 
      java.util.concurrent.ExecutionException: java.util.concurrent.TimeoutException: Timed out waiting for previous flush to finish
          at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
          at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:205)
          at io.debezium.embedded.async.AsyncEmbeddedEngine.stopSourceTasks(AsyncEmbeddedEngine.java:647)
          at io.debezium.embedded.async.AsyncEmbeddedEngine.stopConnector(AsyncEmbeddedEngine.java:706)
          at io.debezium.embedded.async.AsyncEmbeddedEngine.close(AsyncEmbeddedEngine.java:294)
          at io.debezium.embedded.async.AsyncEmbeddedEngine.close(AsyncEmbeddedEngine.java:271)
          at com.bytefrontier.finops.finopsium.debezium.DebeziumWrapper.close(DebeziumWrapper.kt:46)
          at com.bytefrontier.finops.finopsium.DebeziumWrapperTest.should read records made before restart(DebeziumWrapperTest.kt:77)
          at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
          at java.base/java.lang.reflect.Method.invoke(Method.java:580)
          at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:775)
          at org.junit.platform.commons.support.ReflectionSupport.invokeMethod(ReflectionSupport.java:479)
          at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
          at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
          at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:161)
          at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:152)
          at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:91)
          at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:112)
          at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:94)
          at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
          at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
          at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
          at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
          at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:93)
          at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:87)
          at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:216)
          at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
          at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:212)
          at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:137)
          at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:69)
          at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:156)
          at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
          at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:146)
          at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
          at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:144)
          at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
          at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:143)
          at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:100)
          at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
          at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
          at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:160)
          at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
          at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:146)
          at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
          at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:144)
          at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
          at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:143)
          at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:100)
          at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
          at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
          at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:160)
          at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
          at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:146)
          at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
          at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:144)
          at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
          at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:143)
          at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:100)
          at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
          at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
          at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
          at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:201)
          at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:170)
          at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:94)
          at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:59)
          at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:142)
          at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:58)
          at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:103)
          at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:85)
          at org.junit.platform.launcher.core.DelegatingLauncher.execute(DelegatingLauncher.java:47)
          at org.junit.platform.launcher.core.InterceptingLauncher.lambda$execute$1(InterceptingLauncher.java:39)
          at org.junit.platform.launcher.core.ClasspathAlignmentCheckingLauncherInterceptor.intercept(ClasspathAlignmentCheckingLauncherInterceptor.java:25)
          at org.junit.platform.launcher.core.InterceptingLauncher.execute(InterceptingLauncher.java:38)
          at org.junit.platform.launcher.core.DelegatingLauncher.execute(DelegatingLauncher.java:47)
          at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.processAllTestClasses(JUnitPlatformTestClassProcessor.java:124)
          at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.access$000(JUnitPlatformTestClassProcessor.java:99)
          at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor.stop(JUnitPlatformTestClassProcessor.java:94)
          at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:63)
          at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
          at java.base/java.lang.reflect.Method.invoke(Method.java:580)
          at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
          at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
          at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
          at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:92)
          at jdk.proxy1/jdk.proxy1.$Proxy4.stop(Unknown Source)
          at org.gradle.api.internal.tasks.testing.worker.TestWorker$3.run(TestWorker.java:200)
          at org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:132)
          at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:103)
          at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:63)
          at org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56)
          at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:122)
          at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:72)
          at worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
          at worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74)
      Caused by: java.util.concurrent.TimeoutException: Timed out waiting for previous flush to finish
          at org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:150)
          at io.debezium.embedded.async.AsyncEmbeddedEngine.commitOffsets(AsyncEmbeddedEngine.java:834)
          at io.debezium.embedded.async.AsyncEmbeddedEngine.lambda$stopSourceTasks$3(AsyncEmbeddedEngine.java:633)
          at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
          at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
          at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
          at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
          at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
          at java.base/java.lang.Thread.run(Thread.java:1583)
      17:23:02.253 [Test worker] INFO  o.a.k.c.s.KafkaOffsetBackingStore – Stopping KafkaOffsetBackingStore
      17:23:02.253 [Test worker] INFO  o.a.kafka.connect.util.KafkaBasedLog – Stopping KafkaBasedLog for topic debezium.offsets
      17:23:02.254 [Test worker] INFO  o.a.k.clients.producer.KafkaProducer – [Producer clientId=debezium-serveroffsets] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
      17:23:02.258 [Test worker] INFO  o.a.kafka.common.metrics.Metrics – Metrics scheduler closed
      17:23:02.259 [Test worker] INFO  o.a.kafka.common.metrics.Metrics – Closing reporter org.apache.kafka.common.metrics.JmxReporter
      17:23:02.259 [Test worker] INFO  o.a.kafka.common.metrics.Metrics – Closing reporter org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter
      17:23:02.259 [Test worker] INFO  o.a.kafka.common.metrics.Metrics – Metrics reporters closed
      17:23:02.259 [Test worker] INFO  o.a.kafka.common.utils.AppInfoParser – App info kafka.producer for debezium-serveroffsets unregistered
      17:23:02.307 [Test worker] INFO  o.a.kafka.common.metrics.Metrics – Metrics scheduler closed
      17:23:02.307 [Test worker] INFO  o.a.kafka.common.metrics.Metrics – Closing reporter org.apache.kafka.common.metrics.JmxReporter
      17:23:02.307 [Test worker] INFO  o.a.kafka.common.metrics.Metrics – Closing reporter org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter
      17:23:02.307 [Test worker] INFO  o.a.kafka.common.metrics.Metrics – Metrics reporters closed
      17:23:02.308 [Test worker] INFO  o.a.kafka.common.utils.AppInfoParser – App info kafka.consumer for debezium-serveroffsets unregistered
      17:23:02.308 [Test worker] INFO  o.a.kafka.connect.util.KafkaBasedLog – Stopped KafkaBasedLog for topic debezium.offsets
      17:23:02.308 [Test worker] INFO  o.a.k.c.s.KafkaOffsetBackingStore – Stopped KafkaOffsetBackingStore

      How to reproduce the issue using our tutorial deployment?

      <Your answer>

      Feature request or enhancement

      For feature requests or enhancements, provide this information, please:

      Which use case/requirement will be addressed by the proposed feature?

      <Your answer>

      Implementation ideas (optional)

      Move offsetWriter.doFlush() under try/catch block and catch any Exception, not only 
      ExecutionException | TimeoutException (see AsyncEmbeddedEngine.commitOffsets())

              Unassigned Unassigned
              silan0176 Aleksei Silantev (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

                Created:
                Updated:
                Resolved: