-
Bug
-
Resolution: Done
-
Blocker
-
2.5.4.Final
-
None
-
False
-
None
-
False
-
Critical
What Debezium connector do you use and what version?
Debezium Server version: 2.5.4
Connector: RabbitMQ
What is the connector configuration?
DEBEZIUM_SOURCE_CONNECTOR_CLASS=io.debezium.connector.mysql.MySqlConnector
DEBEZIUM_SOURCE_OFFSET_FLUSH_INTERVAL_MS=0
- Snapshot
DEBEZIUM_SOURCE_SNAPSHOT_MODE=schema_only - Sink
DEBEZIUM_SINK_TYPE=rabbitmqWhat is the captured database version and mode of deployment?
Captured database: MySQL (version 8.0.x)
What behavior do you expect?
When RabbitMQ is down or unreachable, Debezium should not shift the binlog offset until RabbitMQ confirms that the message has been successfully processed. The system should wait for the ack before committing the offset.
What behavior do you see?
Debezium shifts the binlog offset forward even when RabbitMQ does not confirm the message, due to a TimeoutException. This leads to a loss of messages, as the offset is committed before the message is confirmed by RabbitMQ.
Do you see the same behaviour using the latest released Debezium version?
Yes, I am observing this behavior in version 2.5.4. I have not yet tested with any alpha/beta/CR versions.
Do you have the connector logs, ideally from start till finish?
Yes, here is a snippet from the logs where the exception occurs:
try { channel.waitForConfirmsOrDie(ackTimeout); } catch (IOException | TimeoutException e) { throw new DebeziumException(e); }
Code snuppet taken from: rabbitmq/src/main/java/io/debezium/server/rabbitmq/RabbitMqStreamChangeConsumer.java
Logs:
2024-10-04T23:20:23.712Z 2024-10-04 23:20:23,711 ERROR [io.deb.ser.ConnectorLifecycle] (pool-7-thread-1) Connector completed: success = 'false', message = 'Stopping connector after error in the application's handler method: java.util.concurrent.TimeoutException', error = 'io.debezium.DebeziumException: java.util.concurrent.TimeoutException': io.debezium.DebeziumException: java.util.concurrent.TimeoutException 2024-10-04 23:20:23,711 ERROR [io.deb.ser.ConnectorLifecycle] (pool-7-thread-1) Connector completed: success = 'false', message = 'Stopping connector after error in the application's handler method: java.util.concurrent.TimeoutException', error = 'io.debezium.DebeziumException: java.util.concurrent.TimeoutException': io.debezium.DebeziumException: java.util.concurrent.TimeoutException 2024-10-04T23:20:23.712Z at io.debezium.server.rabbitmq.RabbitMqStreamChangeConsumer.handleBatch(RabbitMqStreamChangeConsumer.java:167) at io.debezium.server.rabbitmq.RabbitMqStreamChangeConsumer.handleBatch(RabbitMqStreamChangeConsumer.java:167) 2024-10-04T23:20:23.712Z at io.debezium.embedded.ConvertingEngineBuilder$ConvertingChangeConsumer.handleBatch(ConvertingEngineBuilder.java:108) at io.debezium.embedded.ConvertingEngineBuilder$ConvertingChangeConsumer.handleBatch(ConvertingEngineBuilder.java:108) 2024-10-04T23:20:23.712Z at io.debezium.embedded.EmbeddedEngine.pollRecords(EmbeddedEngine.java:735) at io.debezium.embedded.EmbeddedEngine.pollRecords(EmbeddedEngine.java:735) 2024-10-04T23:20:23.712Z at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:475) at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:475) 2024-10-04T23:20:23.712Z at io.debezium.embedded.ConvertingEngineBuilder$1.run(ConvertingEngineBuilder.java:248) at io.debezium.embedded.ConvertingEngineBuilder$1.run(ConvertingEngineBuilder.java:248) 2024-10-04T23:20:23.712Z at io.debezium.server.DebeziumServer.lambda$start$1(DebeziumServer.java:170) at io.debezium.server.DebeziumServer.lambda$start$1(DebeziumServer.java:170) 2024-10-04T23:20:23.712Z at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 2024-10-04T23:20:23.712Z at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 2024-10-04T23:20:23.712Z at java.base/java.lang.Thread.run(Thread.java:829) at java.base/java.lang.Thread.run(Thread.java:829) 2024-10-04T23:20:23.712Z Caused by: java.util.concurrent.TimeoutException Caused by: java.util.concurrent.TimeoutException 2024-10-04T23:20:23.712Z at com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:77) at com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:77) 2024-10-04T23:20:23.712Z at com.rabbitmq.utility.BlockingCell.uninterruptibleGet(BlockingCell.java:120) at com.rabbitmq.utility.BlockingCell.uninterruptibleGet(BlockingCell.java:120) 2024-10-04T23:20:23.712Z at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36) at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36) 2024-10-04T23:20:23.712Z at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502) at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502) 2024-10-04T23:20:23.712Z at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:617) at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:617) 2024-10-04T23:20:23.712Z at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:542) at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:542) 2024-10-04T23:20:23.712Z at com.rabbitmq.client.impl.ChannelN.waitForConfirmsOrDie(ChannelN.java:252) at com.rabbitmq.client.impl.ChannelN.waitForConfirmsOrDie(ChannelN.java:252) 2024-10-04T23:20:23.712Z at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.waitForConfirmsOrDie(AutorecoveringChannel.java:707) at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.waitForConfirmsOrDie(AutorecoveringChannel.java:707) 2024-10-04T23:20:23.712Z at io.debezium.server.rabbitmq.RabbitMqStreamChangeConsumer.handleBatch(RabbitMqStreamChangeConsumer.java:164) at io.debezium.server.rabbitmq.RabbitMqStreamChangeConsumer.handleBatch(RabbitMqStreamChangeConsumer.java:164) 2024-10-04T23:20:23.712Z ... 8 more ... 8 more 2024-10-04T23:20:23.720Z 2024-10-04 23:20:23,720 INFO [io.deb.ser.DebeziumServer] (main) Received request to stop the engine 2024-10-04 23:20:23,720 INFO [io.deb.ser.DebeziumServer] (main) Received request to stop the engine 2024-10-04T23:20:23.720Z 2024-10-04 23:20:23,720 INFO [io.deb.emb.EmbeddedEngine] (main) Stopping the embedded engine 2024-10-04 23:20:23,720 INFO [io.deb.emb.EmbeddedEngine] (main) Stopping the embedded engine 2024-10-04T23:20:23.722Z 2024-10-04 23:20:23,722 ERROR [io.qua.arc.imp.UncaughtExceptions] (main) Error occurred while destroying instance of CLASS bean [types=[io.debezium.server.rabbitmq.RabbitMqStreamChangeConsumer, io.debezium.engine.DebeziumEngine$ChangeConsumer<io.debezium.engine.ChangeEvent<java.lang.Object, java.lang.Object>>, java.lang.Object, io.debezium.server.BaseChangeConsumer], qualifiers=[@Default, @Any, @Named("rabbitmq")], target=io.debezium.server.rabbitmq.RabbitMqStreamChangeConsumer]: com.rabbitmq.client.AlreadyClosedException: channel is already closed due to clean channel shutdown; protocol method: #method<channel.close>(reply-code=406, reply-text=TIMEOUT WAITING FOR ACK, class-id=0, method-id=0) [Error Occurred After Shutdown] 2024-10-04 23:20:23,722 ERROR [io.qua.arc.imp.UncaughtExceptions] (main) Error occurred while destroying instance of CLASS bean [types=[io.debezium.server.rabbitmq.RabbitMqStreamChangeConsumer, io.debezium.engine.DebeziumEngine$ChangeConsumer<io.debezium.engine.ChangeEvent<java.lang.Object, java.lang.Object>>, java.lang.Object, io.debezium.server.BaseChangeConsumer], qualifiers=[@Default, @Any, @Named("rabbitmq")], target=io.debezium.server.rabbitmq.RabbitMqStreamChangeConsumer]: com.rabbitmq.client.AlreadyClosedException: channel is already closed due to clean channel shutdown; protocol method: #method<channel.close>(reply-code=406, reply-text=TIMEOUT WAITING FOR ACK, class-id=0, method-id=0) [Error Occurred After Shutdown] 2024-10-04T23:20:23.744Z 2024-10-04 23:20:23,743 INFO [io.quarkus] (main) debezium-server-dist stopped in 0.031s
How to reproduce the issue using our tutorial deployment?
This is a fairly rare case that I encountered in production while reviewing the logs and investigating an alert from our monitoring system, which indicated a mismatch in the number of messages between the databases. The issue occurs when there's a network failure or RabbitMQ goes down during the message publishing process, which can lead to this behavior.
Feature request or enhancement
For feature requests or enhancements, provide this information, please:
Which use case/requirement will be addressed by the proposed feature?
The feature would ensure that binlog offsets are not shifted until RabbitMQ confirms successful receipt of the message. This would prevent message loss in cases where RabbitMQ is down or unreachable.
Implementation ideas (optional)
Maybe if we move the call to committer.markProcessed(record) to after channel.waitForConfirmsOrDie(ackTimeout) to ensure that the offset is only shifted once RabbitMQ confirms the message.