Uploaded image for project: 'AMQ Broker'
  1. AMQ Broker
  2. ENTMQBR-960

[AMQP] Messages are lost in cluster

XMLWordPrintable

      active/active cluster with two brokers

      A message was sent to a queue using AMQP 1.0 to broker1. A client at broker2 tried to read it.

      But broker2 throws an exception and the message is not sent to the client:

      09:34:53,146 ERROR [org.apache.activemq.artemis.core.server] AMQ224016: Caught exception: java.lang.NullPointerException
              at org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage.setAddress(AMQPMessage.java:612) [artemis-amqp-protocol-2.4.0.jar:]
              at org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage.setAddress(AMQPMessage.java:63) [artemis-amqp-protocol-2.4.0.jar:]
              at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1368) [artemis-server-2.4.0.jar:2.4.0]
              at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1311) [artemis-server-2.4.0.jar:2.4.0]
              at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1304) [artemis-server-2.4.0.jar:2.4.0]
              at org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler.onSessionSend(ServerSessionPacketHandler.java:690) [artemis-server-2.4.0.jar:2.4.0]
              at org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler.onMessagePacket(ServerSessionPacketHandler.java:290) [artemis-server-2.4.0.jar:2.4.0]
              at org.apache.activemq.artemis.utils.actors.Actor.doTask(Actor.java:33) [artemis-commons-2.4.0.jar:2.4.0]
              at org.apache.activemq.artemis.utils.actors.ProcessorBase$ExecutorTask.run(ProcessorBase.java:53) [artemis-commons-2.4.0.jar:2.4.0]
              at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42) [artemis-commons-2.4.0.jar:2.4.0]
              at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31) [artemis-commons-2.4.0.jar:2.4.0]
              at org.apache.activemq.artemis.utils.actors.ProcessorBase$ExecutorTask.run(ProcessorBase.java:53) [artemis-commons-2.4.0.jar:2.4.0]
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [rt.jar:1.8.0_151]
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [rt.jar:1.8.0_151]
              at java.lang.Thread.run(Thread.java:748) [rt.jar:1.8.0_151]
      

      The queue is not created on the fly but configured in the brokers, both brokers have this config:

        <queues>
             <queue name="awe.test.queue">
                <address>awe.test.queue</address>
                <durable>true</durable>
              </queue>
        </queues>
      

      The cluster configuration is pretty much copied from the examples:

            <!-- Clustering configuration -->
            <broadcast-groups>
               <broadcast-group name="my-broadcast-group">
                  <group-address>${udp-address:231.7.7.7}</group-address>
                  <group-port>9876</group-port>
                  <broadcast-period>100</broadcast-period>
                  <connector-ref>netty-connector</connector-ref>
               </broadcast-group>
            </broadcast-groups>
      
            <discovery-groups>
               <discovery-group name="my-discovery-group">
                  <group-address>${udp-address:231.7.7.7}</group-address>
                  <group-port>9876</group-port>
                  <refresh-timeout>10000</refresh-timeout>
               </discovery-group>
            </discovery-groups>
      
            <cluster-connections>
               <cluster-connection name="my-cluster">
                  <connector-ref>netty-connector</connector-ref>
                  <retry-interval>500</retry-interval>
                  <use-duplicate-detection>true</use-duplicate-detection>
                  <message-load-balancing>STRICT</message-load-balancing>
                  <max-hops>1</max-hops>
                  <discovery-group-ref discovery-group-name="my-discovery-group"/>
               </cluster-connection>
            </cluster-connections>
      
            <!-- a colocated server that will allow shared store full backups to be requested-->
            <ha-policy>
               <shared-store>
                  <colocated>
                     <backup-port-offset>100</backup-port-offset>
                     <backup-request-retries>-1</backup-request-retries>
                     <backup-request-retry-interval>2000</backup-request-retry-interval>
                     <max-backups>1</max-backups>
                     <request-backup>true</request-backup>
                     <master>
                        <failover-on-shutdown>true</failover-on-shutdown>
                     </master>
                     <slave>
                        <failover-on-shutdown>true</failover-on-shutdown>
                     </slave>
                  </colocated>
               </shared-store>
            </ha-policy>
      

      qpid-send and qpid-receive were used as test clients. Sending is done like:

      qpid-send -b localhost:9800 -a awe.test.queue '--connection-option={protocol:amqp1.0}' --content-string 'test message Do 7. Dez 09:31:51 CET 2017' --durable=yes
      

      Grabbed the message from the logfile. It looks like this:

      0 -> @transfer(20) [handle=0, delivery-id=0, delivery-tag=b"\x00\x00\x00\x00", message-format=0, settled=false, more=false] 
      (79) "\x00Sp\xc0\x04\x02AP\x00\x00St\xc1\x14\x04\xa1\x02snR\x01\xa1\x02ts\x81\x14\xfd\xf8\xcb\xdaG\x94U\x00Sw\xa1(test message Do 7. Dez 09:31:51 CET 2017"
      

      Reading is done using:

      qpid-receive -b localhost:9802 -a awe.test.queue '--connection-option={protocol:amqp1.0}' -t --timeout 500
      

      I can see, that AMQPMessage.getAddress() can return null:

         @Override
         public String getAddress() {
            if (address == null) {
               Properties properties = getProtonMessage().getProperties();
               if (properties != null) {
                  return properties.getTo();
               } else {
                  return null;
               }
            } else {
               return address;
            }
      }
      

      But ServerSessionImpl.send() does not check for null, but it should. This is because SimpleString.toSimpleString() can return null if the input is null:

            SimpleString address = message.getAddressSimpleString();
      
            if (defaultAddress == null && address != null) {
               defaultAddress = address;
            }
      
            if (address == null) {
               // We don't want to force a re-encode when the message gets sent to the consumer
               message.setAddress(defaultAddress);
            }
      

      SimpleString:

         public static SimpleString toSimpleString(final String string) {
            if (string == null) {
               return null;
            }
            return new SimpleString(string);
      }
      

              rh-ee-ataylor Andy Taylor
              rhn-support-rkieley Roderick Kieley
              Michal Toth Michal Toth
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

                Created:
                Updated:
                Resolved: