Uploaded image for project: 'AMQ Broker'
  1. AMQ Broker
  2. ENTMQBR-5727

Federated queue messages are not acknowledged in the upstream broker.

    XMLWordPrintable

Details

    • False
    • False
    • Hide
      • Run 2 brokers and set this federation configuration on broker2
      <connectors>
          <connector name="broker1">tcp://localhost:61616</connector>
          <connector name="broker2">tcp://localhost:61617</connector>
      </connectors>
      
      <federations>
      <federation name="fed">
          <upstream name="fed-upstream">
              <circuit-breaker-timeout>1000</circuit-breaker-timeout>
              <static-connectors>
                  <connector-ref>broker1</connector-ref>
              </static-connectors>
              <policy ref="policySetA"/>
          </upstream>
      
          <policy-set name="policySetA">
              <policy ref="queue-federation" />
          </policy-set>
      
          <queue-policy name="queue-federation" >
              <include queue-match="exampleQueue" address-match="#" />
          </queue-policy>
      </federation>
      </federations>
      • Now consume from broker2 and produce to broker1
      • Let all the messages get consumed and check the stats on broker1:
      |NAME|CONSUMER_COUNT |MESSAGE_COUNT |MESSAGES_ADDED|DELIVERING_COUNT |ACK
      |exampleQueue |1     |1             |15             |1                |14     
      
      •  Producer
      public static void main(String[] args) throws Exception{
              JmsConnectionFactory connectionFactory = new JmsConnectionFactory("amqp://localhost:61616");
              Connection connection = connectionFactory.createConnection();
              Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
              Destination queue = session.createQueue("exampleQueue");
              MessageProducer messageProducer = session.createProducer(queue);
              for (int i = 0; i<1802; i++) {
                  Message message = session.createTextMessage("Hello"+i);
                  messageProducer.send(message);
              }
              connection.close();
          }
      • Consumer (please note that the prefetch is irrelevant here as changing it doesn't affect the outcome)
       public static void main(String[] args) throws Exception{
              JmsConnectionFactory connectionFactory = new JmsConnectionFactory("amqp://localhost:61617?jms.prefetchPolicy.queuePrefetch=1");;
              Connection connection = connectionFactory.createConnection();
              connection.start();
              Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
              Destination queue = session.createQueue("exampleQueue");
              session.createConsumer(queue).setMessageListener(message -> {
                  TextMessage textMessage = (TextMessage)message;
                  try {
                      System.out.println(">> A new message "+textMessage.getText());
                  } catch (JMSException e) {
                      e.printStackTrace();
                  }
              });
          }
      • A strange thing I noticed is that if I send 1801 messages, all of them are acked but if I send 1802, it shows 1 message in the delivering state.
      Show
      Run 2 brokers and set this federation configuration on broker2 <connectors> <connector name= "broker1" >tcp: //localhost:61616</connector> <connector name= "broker2" >tcp: //localhost:61617</connector> </connectors> <federations> <federation name= "fed" > <upstream name= "fed-upstream" > <circuit-breaker-timeout>1000</circuit-breaker-timeout> < static -connectors> <connector-ref>broker1</connector-ref> </ static -connectors> <policy ref= "policySetA" /> </upstream> <policy-set name= "policySetA" > <policy ref= "queue-federation" /> </policy-set> <queue-policy name= "queue-federation" > <include queue-match= "exampleQueue" address-match= "#" /> </queue-policy> </federation> </federations> Now consume from broker2 and produce to broker1 Let all the messages get consumed and check the stats on broker1: |NAME|CONSUMER_COUNT |MESSAGE_COUNT |MESSAGES_ADDED|DELIVERING_COUNT |ACK |exampleQueue |1 |1 |15 |1 |14  Producer public static void main( String [] args) throws Exception{ JmsConnectionFactory connectionFactory = new JmsConnectionFactory( "amqp: //localhost:61616" ); Connection connection = connectionFactory.createConnection(); Session session = connection.createSession( false ,Session.AUTO_ACKNOWLEDGE); Destination queue = session.createQueue( "exampleQueue" ); MessageProducer messageProducer = session.createProducer(queue); for ( int i = 0; i<1802; i++) { Message message = session.createTextMessage( "Hello" +i); messageProducer.send(message); } connection.close(); } Consumer (please note that the prefetch is irrelevant here as changing it doesn't affect the outcome) public static void main( String [] args) throws Exception{ JmsConnectionFactory connectionFactory = new JmsConnectionFactory( "amqp: //localhost:61617?jms.prefetchPolicy.queuePrefetch=1" );; Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession( false ,Session.AUTO_ACKNOWLEDGE); Destination queue = session.createQueue( "exampleQueue" ); session.createConsumer(queue).setMessageListener(message -> { TextMessage textMessage = (TextMessage)message; try { System .out.println( ">> A new message " +textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } }); } A strange thing I noticed is that if I send 1801 messages, all of them are acked but if I send 1802, it shows 1 message in the delivering state.

    Description

      Protocol: AMQP

      It seems that the upstream broker delivers all the messages to the consumers of the downstream broker but checking the metrics for the upstream broker tells us that the last few messages were never acknowledged. We can also see them if we call the listDeliveringMessages mBean. 

      {
        "ServerConsumer [id=58e610d7:5d23a784-3e56-11ec-98da-525400ba0306:0, filter=null, binding=LocalQueueBinding [address=exampleQueue, queue=QueueImpl[name=exampleQueue, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::name=broker1], temp=false]@da19eaa]]": [
          {
            "durable": true,
            "address": "exampleQueue",
            "messageID": 20182,
            "expiration": 0,
            "priority": 4,
            "userID": "ID:ID:3e86948e-a843-4961-b100-2b1e5ba33e34:1:1:1-1",
            "timestamp": 1636130143888
          }
        ]
      }

       

      After this, if the consumer is restarted, the metrics are reset to 0 but if the broker is restarted, it delivers these pending messages again.

      This puts it at the risk of delivering duplicate messages.

      Attachments

        Issue Links

          Activity

            People

              rhn-support-jbertram Justin Bertram
              rhn-support-adongre Avinash Dongre
              Oleg Sushchenko Oleg Sushchenko
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: