-
Bug
-
Resolution: Done
-
Major
-
3.2.0.Final
-
None
-
False
-
-
False
-
Important
In order to make your issue reports as actionable as possible, please provide the following information, depending on the issue type.
Bug report
For bug reports, provide this information, please:
What Debezium connector do you use and what version?
MariaDB connector, 3.2.0.Final
What is the connector configuration?
Starting MariaDbConnectorTask with configuration:
snapshot.locking.mode = none
connector.class = io.debezium.connector.mariadb.MariaDbConnector
message.key.columns = (.*):id_user
transforms = Reroute
bootstrap.servers = localhost:38358
include.schema.changes = false
transforms.Reroute.topic.replacement = $1_YYYY_MM_DD
record.processing.threads =
offset.storage.topic = debezium.offsets
errors.retry.delay.initial.ms = 300
key.converter = org.apache.kafka.connect.json.JsonConverter
database.user = debezium_source
offset.storage = org.apache.kafka.connect.storage.KafkaOffsetBackingStore
database.history.kafka.bootstrap.servers = localhost:38358
schema.history.internal.kafka.bootstrap.servers = localhost:38358
transforms.Reroute.topic.regex = ********
schema.history.internal.skip.unparseable.ddl = true
inconsistent.schema.handling.mode = fail
value.converter.schema.registry.url = http://localhost:38359
internal.task.management.timeout.ms = 8000000
schema.history.internal.producer.acks = 1
errors.max.retries = -1
name = finopsium_412
errors.tolerance = none
transforms.Reroute.schema.name.adjustment.mode = avro
snapshot.mode = no_data
driver.connectionTimeZone = Europe/Moscow
record.processing.shutdown.timeout.ms = 1000
database.history.kafka.topic = debezium.database_history
transforms.Reroute.key.enforce.uniqueness = false
schema.history.internal.store.only.captured.databases.ddl = true
schema.history.internal.store.only.captured.tables.ddl = true
record.processing.order = ORDERED
tombstones.on.delete = false
topic.prefix = finance_core.finopsium
decimal.handling.mode = string
offset.storage.file.filename =
schema.history.internal.kafka.topic = debezium.schema_history
offset.storage.partitions = 1
value.converter = org.apache.kafka.connect.json.JsonConverter
database.allowPublicKeyRetrieval = true
transforms.Reroute.type = io.debezium.transforms.ByLogicalTableRouter
database.server.id = 1
offset.flush.timeout.ms = 5000
errors.retry.delay.max.ms = 10000
event.processing.failure.handling.mode = warn
database.port = 38324
offset.flush.interval.ms = 60000
database.ssl.mode = disable
record.processing.with.serial.consumer = false
database.connectionTimeZone = Europe/Moscow
database.hostname = localhost
schema.name.adjustment.mode = avro
offset.storage.replication.factor = 1
table.include.list = ********
key.converter.schema.registry.url = http://localhost:38359
database.include.list = ********
What is the captured database version and mode of deployment?
(E.g. on-premises, with a specific cloud provider, etc.)
mariadb:10.5.21, Testcontainers
What behavior do you expect?
Offsets are saved on connector shutdown without 5-second timeout elapsed
What behavior do you see?
There is a timeout - see logs
Do you see the same behaviour using the latest released Debezium version?
(Ideally, also verify with latest Alpha/Beta/CR version)
Yes
Do you have the connector logs, ideally from start till finish?
(You might be asked later to provide DEBUG/TRACE level log)
17:22:56.839 [debezium-mariadbconnector-finance_core.finopsium-change-event-source-coordinator] INFO i.d.pipeline.signal.SignalProcessor – SignalProcessor started. Scheduling it every 5000ms
17:22:56.839 [debezium-mariadbconnector-finance_core.finopsium-change-event-source-coordinator] INFO io.debezium.util.Threads – Creating thread debezium-mariadbconnector-finance_core.finopsium-SignalProcessor
17:22:56.839 [debezium-mariadbconnector-finance_core.finopsium-change-event-source-coordinator] INFO i.d.p.ChangeEventSourceCoordinator – Starting streaming
17:22:56.842 [debezium-mariadbconnector-finance_core.finopsium-change-event-source-coordinator] INFO i.d.c.b.BinlogStreamingChangeEventSource – GTID set purged on server: ''
17:22:56.842 [debezium-mariadbconnector-finance_core.finopsium-change-event-source-coordinator] INFO i.d.c.mariadb.jdbc.MariaDbConnection – Attempting to generate a filtered GTID set
17:22:56.842 [debezium-mariadbconnector-finance_core.finopsium-change-event-source-coordinator] INFO i.d.c.mariadb.jdbc.MariaDbConnection – GTID set from previous recorded offset: 1-1-17
17:22:56.842 [debezium-mariadbconnector-finance_core.finopsium-change-event-source-coordinator] INFO i.d.c.mariadb.jdbc.MariaDbConnection – GTID set available on server: 1-1-17
17:22:56.843 [debezium-mariadbconnector-finance_core.finopsium-change-event-source-coordinator] INFO i.d.c.mariadb.jdbc.MariaDbConnection – Using first available positions for new GTID channels
17:22:56.843 [debezium-mariadbconnector-finance_core.finopsium-change-event-source-coordinator] INFO i.d.c.mariadb.jdbc.MariaDbConnection – Relevant GTID set available on server: 1-1-17
17:22:56.843 [debezium-mariadbconnector-finance_core.finopsium-change-event-source-coordinator] INFO i.d.c.mariadb.jdbc.MariaDbConnection – Final merged GTID set to use when connecting to MariaDB: 1-1-17
17:22:56.843 [debezium-mariadbconnector-finance_core.finopsium-change-event-source-coordinator] INFO i.d.c.b.BinlogStreamingChangeEventSource – Registering binlog reader with GTID set: '1-1-17'
17:22:56.844 [debezium-mariadbconnector-finance_core.finopsium-change-event-source-coordinator] INFO c.g.s.mysql.binlog.BinaryLogClient – Enabling GTID
17:22:56.844 [debezium-mariadbconnector-finance_core.finopsium-change-event-source-coordinator] INFO i.d.c.b.BinlogStreamingChangeEventSource – Skip 0 events on streaming start
17:22:56.844 [debezium-mariadbconnector-finance_core.finopsium-change-event-source-coordinator] INFO i.d.c.b.BinlogStreamingChangeEventSource – Skip 0 rows on streaming start
17:22:56.844 [debezium-mariadbconnector-finance_core.finopsium-change-event-source-coordinator] INFO io.debezium.util.Threads – Creating thread debezium-mariadbconnector-finance_core.finopsium-binlog-client
17:22:56.845 [blc-localhost:38435] INFO io.debezium.util.Threads – Creating thread debezium-mariadbconnector-finance_core.finopsium-binlog-client
17:22:56.846 [blc-localhost:38435] INFO c.g.s.mysql.binlog.BinaryLogClient – Database version: 5.5.5-10.5.21-MariaDB-1:10.5.21+maria~ubu2004-log (major=5, minor=5, mariadb=true)
17:22:56.849 [blc-localhost:38435] INFO c.g.s.mysql.binlog.BinaryLogClient – Requesting streaming from GTID set: 1-1-17
17:22:56.850 [blc-localhost:38435] INFO c.g.s.mysql.binlog.BinaryLogClient – Connected to localhost:38435 at 1-1-17 (sid:1, cid:8)
17:22:56.850 [blc-localhost:38435] INFO i.d.c.b.BinlogStreamingChangeEventSource – Connected to binlog at localhost:38435, starting at BinlogOffsetContext{sourceInfoSchema=Schema
, sourceInfo=BinlogSourceInfo
{currentGtid='null', currentBinlogFilename='mysql_bin.000002', currentBinlogPosition=342, currentRowNumber=0, serverId=0, sourceTime=2025-07-28T14:22:56.826Z, threadId=-1, currentQuery='null', tableIds=[cfd.ma_withdrawal_casino_2024_02_02], databaseName='cfd'}, snapshotCompleted=true, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], restartGtidSet='1-1-17', currentGtidSet='1-1-17', restartBinlogFilename='mysql_bin.000002', restartBinlogPosition=342, restartRowsToSkip=0, restartEventsToSkip=0, currentEventLengthInBytes=0, inTransaction=false, transactionId='null', incrementalSnapshotContext=IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]}
17:22:56.850 [debezium-mariadbconnector-finance_core.finopsium-change-event-source-coordinator] INFO i.d.c.b.BinlogStreamingChangeEventSource – Waiting for keepalive thread to start
17:22:56.851 [blc-localhost:38435] INFO io.debezium.util.Threads – Creating thread debezium-mariadbconnector-finance_core.finopsium-binlog-client
17:22:56.851 [debezium-mariadbconnector-finance_core.finopsium-change-event-source-coordinator] INFO i.d.c.b.BinlogStreamingChangeEventSource – Keepalive thread is running
17:22:56.942 [Test worker] INFO com.zaxxer.hikari.HikariDataSource – HikariPool-1 - Starting...
17:22:56.949 [Test worker] INFO com.zaxxer.hikari.pool.HikariPool – HikariPool-1 - Added connection org.mariadb.jdbc.Connection@1cfa4ad9
17:22:56.951 [Test worker] INFO com.zaxxer.hikari.HikariDataSource – HikariPool-1 - Start completed.
17:22:57.239 [Test worker] INFO i.d.e.async.AsyncEmbeddedEngine – Engine state has changed from 'POLLING_TASKS' to 'STOPPING'
17:22:57.240 [pool-2-thread-1] INFO i.d.e.async.AsyncEmbeddedEngine – Task interrupted while polling.
17:23:02.246 [Test worker] WARN i.d.e.async.AsyncEmbeddedEngine – Failure during stopping tasks, stopping them immediately. Failed with
java.util.concurrent.ExecutionException: java.util.concurrent.TimeoutException: Timed out waiting for previous flush to finish
at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:205)
at io.debezium.embedded.async.AsyncEmbeddedEngine.stopSourceTasks(AsyncEmbeddedEngine.java:647)
at io.debezium.embedded.async.AsyncEmbeddedEngine.stopConnector(AsyncEmbeddedEngine.java:706)
at io.debezium.embedded.async.AsyncEmbeddedEngine.close(AsyncEmbeddedEngine.java:294)
at io.debezium.embedded.async.AsyncEmbeddedEngine.close(AsyncEmbeddedEngine.java:271)
at com.bytefrontier.finops.finopsium.debezium.DebeziumWrapper.close(DebeziumWrapper.kt:46)
at com.bytefrontier.finops.finopsium.DebeziumWrapperTest.should read records made before restart(DebeziumWrapperTest.kt:77)
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:775)
at org.junit.platform.commons.support.ReflectionSupport.invokeMethod(ReflectionSupport.java:479)
at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:161)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:152)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:91)
at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:112)
at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:94)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:93)
at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:87)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:216)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:212)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:137)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:69)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:156)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:146)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:144)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:143)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:100)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:160)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:146)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:144)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:143)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:100)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:160)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:146)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:144)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:143)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:100)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:201)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:170)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:94)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:59)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:142)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:58)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:103)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:85)
at org.junit.platform.launcher.core.DelegatingLauncher.execute(DelegatingLauncher.java:47)
at org.junit.platform.launcher.core.InterceptingLauncher.lambda$execute$1(InterceptingLauncher.java:39)
at org.junit.platform.launcher.core.ClasspathAlignmentCheckingLauncherInterceptor.intercept(ClasspathAlignmentCheckingLauncherInterceptor.java:25)
at org.junit.platform.launcher.core.InterceptingLauncher.execute(InterceptingLauncher.java:38)
at org.junit.platform.launcher.core.DelegatingLauncher.execute(DelegatingLauncher.java:47)
at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.processAllTestClasses(JUnitPlatformTestClassProcessor.java:124)
at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.access$000(JUnitPlatformTestClassProcessor.java:99)
at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor.stop(JUnitPlatformTestClassProcessor.java:94)
at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:63)
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:92)
at jdk.proxy1/jdk.proxy1.$Proxy4.stop(Unknown Source)
at org.gradle.api.internal.tasks.testing.worker.TestWorker$3.run(TestWorker.java:200)
at org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:132)
at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:103)
at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:63)
at org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56)
at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:122)
at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:72)
at worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
at worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74)
Caused by: java.util.concurrent.TimeoutException: Timed out waiting for previous flush to finish
at org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:150)
at io.debezium.embedded.async.AsyncEmbeddedEngine.commitOffsets(AsyncEmbeddedEngine.java:834)
at io.debezium.embedded.async.AsyncEmbeddedEngine.lambda$stopSourceTasks$3(AsyncEmbeddedEngine.java:633)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
17:23:02.253 [Test worker] INFO o.a.k.c.s.KafkaOffsetBackingStore – Stopping KafkaOffsetBackingStore
17:23:02.253 [Test worker] INFO o.a.kafka.connect.util.KafkaBasedLog – Stopping KafkaBasedLog for topic debezium.offsets
17:23:02.254 [Test worker] INFO o.a.k.clients.producer.KafkaProducer – [Producer clientId=debezium-serveroffsets] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
17:23:02.258 [Test worker] INFO o.a.kafka.common.metrics.Metrics – Metrics scheduler closed
17:23:02.259 [Test worker] INFO o.a.kafka.common.metrics.Metrics – Closing reporter org.apache.kafka.common.metrics.JmxReporter
17:23:02.259 [Test worker] INFO o.a.kafka.common.metrics.Metrics – Closing reporter org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter
17:23:02.259 [Test worker] INFO o.a.kafka.common.metrics.Metrics – Metrics reporters closed
17:23:02.259 [Test worker] INFO o.a.kafka.common.utils.AppInfoParser – App info kafka.producer for debezium-serveroffsets unregistered
17:23:02.307 [Test worker] INFO o.a.kafka.common.metrics.Metrics – Metrics scheduler closed
17:23:02.307 [Test worker] INFO o.a.kafka.common.metrics.Metrics – Closing reporter org.apache.kafka.common.metrics.JmxReporter
17:23:02.307 [Test worker] INFO o.a.kafka.common.metrics.Metrics – Closing reporter org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter
17:23:02.307 [Test worker] INFO o.a.kafka.common.metrics.Metrics – Metrics reporters closed
17:23:02.308 [Test worker] INFO o.a.kafka.common.utils.AppInfoParser – App info kafka.consumer for debezium-serveroffsets unregistered
17:23:02.308 [Test worker] INFO o.a.kafka.connect.util.KafkaBasedLog – Stopped KafkaBasedLog for topic debezium.offsets
17:23:02.308 [Test worker] INFO o.a.k.c.s.KafkaOffsetBackingStore – Stopped KafkaOffsetBackingStore
How to reproduce the issue using our tutorial deployment?
<Your answer>
Feature request or enhancement
For feature requests or enhancements, provide this information, please:
Which use case/requirement will be addressed by the proposed feature?
<Your answer>
Implementation ideas (optional)
Move offsetWriter.doFlush() under try/catch block and catch any Exception, not only
ExecutionException | TimeoutException (see AsyncEmbeddedEngine.commitOffsets())