Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-8307

Debezium shifts binlog offset despite RabbitMQ Timeout and unconfirmed messages

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Done
    • Icon: Blocker Blocker
    • 3.0.2.Final
    • 2.5.4.Final
    • debezium-server
    • None
    • False
    • None
    • False
    • Critical

      What Debezium connector do you use and what version?

      Debezium Server version: 2.5.4
      Connector: RabbitMQ

      What is the connector configuration?

      DEBEZIUM_SOURCE_CONNECTOR_CLASS=io.debezium.connector.mysql.MySqlConnector
      DEBEZIUM_SOURCE_OFFSET_FLUSH_INTERVAL_MS=0

      1. Snapshot
        DEBEZIUM_SOURCE_SNAPSHOT_MODE=schema_only
      2. Sink
        DEBEZIUM_SINK_TYPE=rabbitmq

        What is the captured database version and mode of deployment?

      Captured database: MySQL (version 8.0.x)

      What behavior do you expect?

      When RabbitMQ is down or unreachable, Debezium should not shift the binlog offset until RabbitMQ confirms that the message has been successfully processed. The system should wait for the ack before committing the offset.

      What behavior do you see?

      Debezium shifts the binlog offset forward even when RabbitMQ does not confirm the message, due to a TimeoutException. This leads to a loss of messages, as the offset is committed before the message is confirmed by RabbitMQ.

      Do you see the same behaviour using the latest released Debezium version?

      Yes, I am observing this behavior in version 2.5.4. I have not yet tested with any alpha/beta/CR versions.

      Do you have the connector logs, ideally from start till finish?

      Yes, here is a snippet from the logs where the exception occurs:

       

      try {
          channel.waitForConfirmsOrDie(ackTimeout);
      }
      catch (IOException | TimeoutException e) {
          throw new DebeziumException(e);
      } 

      Code snuppet taken from: rabbitmq/src/main/java/io/debezium/server/rabbitmq/RabbitMqStreamChangeConsumer.java

      Logs:

      2024-10-04T23:20:23.712Z
      2024-10-04 23:20:23,711 ERROR [io.deb.ser.ConnectorLifecycle] (pool-7-thread-1) Connector completed: success = 'false', message = 'Stopping connector after error in the application's handler method: java.util.concurrent.TimeoutException', error = 'io.debezium.DebeziumException: java.util.concurrent.TimeoutException': io.debezium.DebeziumException: java.util.concurrent.TimeoutException
      2024-10-04 23:20:23,711 ERROR [io.deb.ser.ConnectorLifecycle] (pool-7-thread-1) Connector completed: success = 'false', message = 'Stopping connector after error in the application's handler method: java.util.concurrent.TimeoutException', error = 'io.debezium.DebeziumException: java.util.concurrent.TimeoutException': io.debezium.DebeziumException: java.util.concurrent.TimeoutException
      2024-10-04T23:20:23.712Z
          at io.debezium.server.rabbitmq.RabbitMqStreamChangeConsumer.handleBatch(RabbitMqStreamChangeConsumer.java:167)
      at io.debezium.server.rabbitmq.RabbitMqStreamChangeConsumer.handleBatch(RabbitMqStreamChangeConsumer.java:167)
      2024-10-04T23:20:23.712Z
          at io.debezium.embedded.ConvertingEngineBuilder$ConvertingChangeConsumer.handleBatch(ConvertingEngineBuilder.java:108)
      at io.debezium.embedded.ConvertingEngineBuilder$ConvertingChangeConsumer.handleBatch(ConvertingEngineBuilder.java:108)
      2024-10-04T23:20:23.712Z
          at io.debezium.embedded.EmbeddedEngine.pollRecords(EmbeddedEngine.java:735)
      at io.debezium.embedded.EmbeddedEngine.pollRecords(EmbeddedEngine.java:735)
      2024-10-04T23:20:23.712Z
          at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:475)
      at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:475)
      2024-10-04T23:20:23.712Z
          at io.debezium.embedded.ConvertingEngineBuilder$1.run(ConvertingEngineBuilder.java:248)
      at io.debezium.embedded.ConvertingEngineBuilder$1.run(ConvertingEngineBuilder.java:248)
      2024-10-04T23:20:23.712Z
          at io.debezium.server.DebeziumServer.lambda$start$1(DebeziumServer.java:170)
      at io.debezium.server.DebeziumServer.lambda$start$1(DebeziumServer.java:170)
      2024-10-04T23:20:23.712Z
          at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      2024-10-04T23:20:23.712Z
          at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      2024-10-04T23:20:23.712Z
          at java.base/java.lang.Thread.run(Thread.java:829)
      at java.base/java.lang.Thread.run(Thread.java:829)
      2024-10-04T23:20:23.712Z
      Caused by: java.util.concurrent.TimeoutException
      Caused by: java.util.concurrent.TimeoutException
      2024-10-04T23:20:23.712Z
          at com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:77)
      at com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:77)
      2024-10-04T23:20:23.712Z
          at com.rabbitmq.utility.BlockingCell.uninterruptibleGet(BlockingCell.java:120)
      at com.rabbitmq.utility.BlockingCell.uninterruptibleGet(BlockingCell.java:120)
      2024-10-04T23:20:23.712Z
          at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
      at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
      2024-10-04T23:20:23.712Z
          at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502)
      at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502)
      2024-10-04T23:20:23.712Z
          at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:617)
      at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:617)
      2024-10-04T23:20:23.712Z
          at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:542)
      at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:542)
      2024-10-04T23:20:23.712Z
          at com.rabbitmq.client.impl.ChannelN.waitForConfirmsOrDie(ChannelN.java:252)
      at com.rabbitmq.client.impl.ChannelN.waitForConfirmsOrDie(ChannelN.java:252)
      2024-10-04T23:20:23.712Z
          at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.waitForConfirmsOrDie(AutorecoveringChannel.java:707)
      at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.waitForConfirmsOrDie(AutorecoveringChannel.java:707)
      2024-10-04T23:20:23.712Z
          at io.debezium.server.rabbitmq.RabbitMqStreamChangeConsumer.handleBatch(RabbitMqStreamChangeConsumer.java:164)
      at io.debezium.server.rabbitmq.RabbitMqStreamChangeConsumer.handleBatch(RabbitMqStreamChangeConsumer.java:164)
      2024-10-04T23:20:23.712Z
          ... 8 more
      ... 8 more
      2024-10-04T23:20:23.720Z
      2024-10-04 23:20:23,720 INFO  [io.deb.ser.DebeziumServer] (main) Received request to stop the engine
      2024-10-04 23:20:23,720 INFO [io.deb.ser.DebeziumServer] (main) Received request to stop the engine
      2024-10-04T23:20:23.720Z
      2024-10-04 23:20:23,720 INFO  [io.deb.emb.EmbeddedEngine] (main) Stopping the embedded engine
      2024-10-04 23:20:23,720 INFO [io.deb.emb.EmbeddedEngine] (main) Stopping the embedded engine
      2024-10-04T23:20:23.722Z
      2024-10-04 23:20:23,722 ERROR [io.qua.arc.imp.UncaughtExceptions] (main) Error occurred while destroying instance of CLASS bean [types=[io.debezium.server.rabbitmq.RabbitMqStreamChangeConsumer, io.debezium.engine.DebeziumEngine$ChangeConsumer<io.debezium.engine.ChangeEvent<java.lang.Object, java.lang.Object>>, java.lang.Object, io.debezium.server.BaseChangeConsumer], qualifiers=[@Default, @Any, @Named("rabbitmq")], target=io.debezium.server.rabbitmq.RabbitMqStreamChangeConsumer]: com.rabbitmq.client.AlreadyClosedException: channel is already closed due to clean channel shutdown; protocol method: #method<channel.close>(reply-code=406, reply-text=TIMEOUT WAITING FOR ACK, class-id=0, method-id=0) [Error Occurred After Shutdown]
      2024-10-04 23:20:23,722 ERROR [io.qua.arc.imp.UncaughtExceptions] (main) Error occurred while destroying instance of CLASS bean [types=[io.debezium.server.rabbitmq.RabbitMqStreamChangeConsumer, io.debezium.engine.DebeziumEngine$ChangeConsumer<io.debezium.engine.ChangeEvent<java.lang.Object, java.lang.Object>>, java.lang.Object, io.debezium.server.BaseChangeConsumer], qualifiers=[@Default, @Any, @Named("rabbitmq")], target=io.debezium.server.rabbitmq.RabbitMqStreamChangeConsumer]: com.rabbitmq.client.AlreadyClosedException: channel is already closed due to clean channel shutdown; protocol method: #method<channel.close>(reply-code=406, reply-text=TIMEOUT WAITING FOR ACK, class-id=0, method-id=0) [Error Occurred After Shutdown]
      2024-10-04T23:20:23.744Z
      2024-10-04 23:20:23,743 INFO  [io.quarkus] (main) debezium-server-dist stopped in 0.031s 

       

       

      How to reproduce the issue using our tutorial deployment?

      This is a fairly rare case that I encountered in production while reviewing the logs and investigating an alert from our monitoring system, which indicated a mismatch in the number of messages between the databases. The issue occurs when there's a network failure or RabbitMQ goes down during the message publishing process, which can lead to this behavior.

      Feature request or enhancement

      For feature requests or enhancements, provide this information, please:

      Which use case/requirement will be addressed by the proposed feature?

      The feature would ensure that binlog offsets are not shifted until RabbitMQ confirms successful receipt of the message. This would prevent message loss in cases where RabbitMQ is down or unreachable.

      Implementation ideas (optional)

      Maybe if we move the call to committer.markProcessed(record) to after channel.waitForConfirmsOrDie(ackTimeout) to ensure that the offset is only shifted once RabbitMQ confirms the message.

              Unassigned Unassigned
              e.lopatenko Evgeniy Lopatenko (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

                Created:
                Updated:
                Resolved: