Uploaded image for project: 'AMQ Broker'
  1. AMQ Broker
  2. ENTMQBR-513

Three consumers killed as slow if there is not enough messages for all of them to be fast

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Done
    • Icon: Major Major
    • A-MQ 7.0.0.ER16
    • A-MQ 7.0.0.ER14
    • None
    • None
    • Hide

      In broker.xml

         </security-settings>
      
      	  <address-settings>
      		 <!--default for catch all-->
      		 <address-setting match="#">
      
      ...
      
      	  <slow-consumer-threshold>1</slow-consumer-threshold>
      	  <slow-consumer-policy>KILL</slow-consumer-policy>
      	  <slow-consumer-check-period>5</slow-consumer-check-period>
      
      		 </address-setting>
      	  </address-settings>
      

      Start three consumers, each of them uses the same command (1). Then start the producer using command (2). The consumers receive messages in a round-robin fashion. After few seconds, all consumers are killed as slow.

      1) ./aac5_receiver.py -b 127.0.0.1:5672/jms.queue.myqueue --reactor-prefetch 2 -c 200 --log-msgs dict --conn-reconnect none

      2) ./aac5_sender.py -b 127.0.0.1:5672/jms.queue.myqueue --log-msgs dict -c 200 --duration 100

      Show
      In broker.xml </security-settings> <address-settings> <!--default for catch all--> <address-setting match="#"> ... <slow-consumer-threshold>1</slow-consumer-threshold> <slow-consumer-policy>KILL</slow-consumer-policy> <slow-consumer-check-period>5</slow-consumer-check-period> </address-setting> </address-settings> Start three consumers, each of them uses the same command (1). Then start the producer using command (2). The consumers receive messages in a round-robin fashion. After few seconds, all consumers are killed as slow. 1) ./aac5_receiver.py -b 127.0.0.1:5672/jms.queue.myqueue --reactor-prefetch 2 -c 200 --log-msgs dict --conn-reconnect none 2) ./aac5_sender.py -b 127.0.0.1:5672/jms.queue.myqueue --log-msgs dict -c 200 --duration 100

      We have one queue. Imagine messages are produced at 2 msgs/s. There are three consumers and slow consumer limit is 1 msgs/s. What happens is that all three consumers get killed as slow, even though it is impossible for any of them to be fast, since messages are distributed equally between the consumers (round-robin).

                         --------   />  2/3 msg/s
            2msg/s ->    |   B  |   ->  2/3 msg/s
                         --------   \>  2/3 msg/s
      

      broker log:

      18:51:22,601 INFO  [org.apache.activemq.artemis.core.server] AMQ221048: Consumer 8b0731cb-d826-11e6-b834-5203fed98f54:0 attached to queue 'jms.queue.myqueue' from /127.0.0.1:53716 identified as 'slow.' Expected consumption rate: 1 msgs/second; actual consumption rate: 0.8 msgs/second.
      
      ...
      
      18:51:22,579 INFO  [org.apache.activemq.artemis.core.server] AMQ221048: Consumer 8bb1f0bc-d826-11e6-b834-5203fed98f54:0 attached to queue 'jms.queue.myqueue' from /127.0.0.1:53718 identified as 'slow.' Expected consumption rate: 1 msgs/second; actual consumption rate: 0.61 msgs/second.
      
      ...
      
      18:51:22,597 WARN  [org.apache.activemq.artemis.core.server] AMQ222107: Cleared up resources for session 8bb1f0bc-d826-11e6-b834-5203fed98f54
      18:51:22,598 TRACE [org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl] Connection removed 1031628788 from server ActiveMQServerImpl::serverUUID=91404edf-d821-11e6-9278-8a0df283ae70: java.lang.Exception: trace
              at org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl.connectionDestroyed(RemotingServiceImpl.java:525) [artemis-server-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
              at org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor$Listener.connectionDestroyed(NettyAcceptor.java:683) [artemis-server-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
              at org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection.close(NettyConnection.java:208) [artemis-core-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
              at org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection.internalClose(ActiveMQProtonRemotingConnection.java:140) [artemis-amqp-protocol-2.0.0-SNAPSHOT.jar:]
              at org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection.fail(ActiveMQProtonRemotingConnection.java:77) [artemis-amqp-protocol-2.0.0-SNAPSHOT.jar:]
              at org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection.fail(AbstractRemotingConnection.java:210) [artemis-core-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
              at org.apache.activemq.artemis.core.server.impl.QueueImpl$SlowConsumerReaperRunnable.run(QueueImpl.java:3161) [artemis-server-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
              at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [rt.jar:1.8.0_112]
              at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [rt.jar:1.8.0_112]
              at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [rt.jar:1.8.0_112]
              at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [rt.jar:1.8.0_112]
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [rt.jar:1.8.0_112]
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [rt.jar:1.8.0_112]
              at java.lang.Thread.run(Thread.java:745) [rt.jar:1.8.0_112]
      
      18:51:22,598 INFO  [org.apache.activemq.artemis.core.server] AMQ221048: Consumer 8a76b19a-d826-11e6-b834-5203fed98f54:0 attached to queue 'jms.queue.myqueue' from /127.0.0.1:53714 identified as 'slow.' Expected consumption rate: 1 msgs/second; actual consumption rate: 0.6 msgs/second.
      18:51:22,599 DEBUG [org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl] RemotingServiceImpl::removing connection ID 978508991
      

      I tried investigating in broker code and I think this is a case that is not explicitly handled.

      I think this has real consumer impact in a situation when producer rate is usually high (so it requires multiple consumers working in parallel), but may occasionally drop close to consumer-threshold. In this case, broker disconnects all consumers who then have to reconnect and message processing is delayed for the time of the reconnecting.

      org/apache/activemq/artemis/core/server/impl/QueueImpl.java: org.apache.activemq.artemis.core.server.impl.QueueImpl.SlowConsumerReaperRunnable#run

               float queueRate = getRate();
               if (logger.isDebugEnabled()) {
                  logger.debug(getAddress() + ":" + getName() + " has " + getConsumerCount() + " consumer(s) and is receiving messages at a rate of " + queueRate + " msgs/second.");
               }
               for (Consumer consumer : getConsumers()) {
                  if (consumer instanceof ServerConsumerImpl) {
                     ServerConsumerImpl serverConsumer = (ServerConsumerImpl) consumer;
                     float consumerRate = serverConsumer.getRate();
                     if (queueRate < threshold) {
                        if (logger.isDebugEnabled()) {
                           logger.debug("Insufficient messages received on queue \"" + getName() + "\" to satisfy slow-consumer-threshold. Skipping inspection of consumer.");
                        }
                     } else if (consumerRate < threshold) {
                        RemotingConnection connection = null;
                        ActiveMQServer server = ((PostOfficeImpl) postOffice).getServer();
                        RemotingService remotingService = server.getRemotingService();
      
                        for (RemotingConnection potentialConnection : remotingService.getConnections()) {
                           if (potentialConnection.getID().toString().equals(serverConsumer.getConnectionID())) {
                              connection = potentialConnection;
                           }
                        }
      
                        serverConsumer.fireSlowConsumer();
      
                        if (connection != null) {
                           ActiveMQServerLogger.LOGGER.slowConsumerDetected(serverConsumer.getSessionID(), serverConsumer.getID(), getName().toString(), connection.getRemoteAddress(), threshold, consumerRate);
                           if (policy.equals(SlowConsumerPolicy.KILL)) {
                              connection.killMessage(server.getNodeID());
      

              gaohoward Howard Gao
              jdanek@redhat.com Jiri Daněk
              Dmitrii Puzikov Dmitrii Puzikov (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

                Created:
                Updated:
                Resolved: