Uploaded image for project: 'AMQ Broker'
  1. AMQ Broker
  2. ENTMQBR-966

Unsettled AMQP messages are lost when Receiver Link is opened on remote cluster member

XMLWordPrintable

    • Release Notes
    • An issue causing message loss has been fixed in this release. Previously, if messages were sent to a broker using the AMQP address, and the address was not set on the messages, then some of the messages could be lost if they were redistributed.
    • Documented as Resolved Issue

      In a 2-broker cluster, if AMQP messages are sent to broker0, and then a receiver link (consumer) is connected to broker1, the messages are removed from broker0, and attempted to be forwarded across the cluster bridge, but are lost.

      This is similar (but not identical to) ENTMQBR-960 which was fixed upstream a few weeks back. In ENTMQBR-960, the receiver links are present on both brokers are the time the message are sent. They are now load balanced and received properly.

      In this issue, the receiver link is created after the messages already exist in broker0. I'm not sure if the desired behavior is for these messages simply to stay on broker0 and not cross over to broker1, but right now, they are sent over the cluster bridge and lost forever.

      In this case, it looks like the 2nd arg to ClusterConnectionBridge.beforeForward() is null. I have attached some native AMQP client code that demonstrates this issue (AMQP .Net Lite client, c#):

      using System;
      using System.Collections.Generic;
      using System.Threading;
      using Amqp.Framing;
      using Amqp.Extensions;
      using Amqp.Sasl;
      using Amqp.Types;
      
      namespace Amqp.Extensions.Examples
      {
          class Program
          {
              static void Main(string[] args)
              {
                  // create receiver link on broker0
                  Connection connection0 = new Connection(new Address("amqp://localhost:5672"));       
                  Session session0 = new Session(connection0);
                  ReceiverLink receiver0 = new ReceiverLink(session0, "test", "orders");
                  
                  // send messages to broker0
                  SenderLink sender = new SenderLink(session0, "sender", "orders");
                  Message message = new Message("a message!");
                  
                  for (var i = 0; i < 5; i++)
                  {
                      sender.Send(message);
                      Thread.Sleep(100);
                  }
      
                  // receive 1 of 5, works as expected...
                  Message m = receiver0.Receive(TimeSpan.FromSeconds(1));
                  Console.WriteLine(m.Body);
                  receiver0.Accept(m);
                  session0.Close();
                  connection0.Close();
      
                  // create receiver link on broker1
                  Connection connection1 = new Connection(new Address("amqp://localhost:5673"));       
                  Session session1 = new Session(connection1);
                  ReceiverLink receiver1 = new ReceiverLink(session1, "test", "orders");
      
                  // these 4 messages are removed from broker0 (ack'd) but never delivered. NPE seen in logs on broker1
                  for (var i = 0; i < 4; i++)
                  {   
                      m = receiver1.Receive(TimeSpan.FromSeconds(1));
                      if (m != null)
                      {
                          Console.WriteLine(m.Body);
                          receiver1.Accept(m);
                      }
                  }
      
                  sender.Close();
                  session1.Close();
                  connection1.Close();
              }
          }
      }
      

              csuconic@redhat.com Clebert Suconic
              rhn-support-rkieley Roderick Kieley
              Michal Toth Michal Toth
              Votes:
              0 Vote for this issue
              Watchers:
              8 Start watching this issue

                Created:
                Updated:
                Resolved: