-
Bug
-
Resolution: Done
-
Major
-
A-MQ 7.0.0.ER14
-
None
-
None
We have one queue. Imagine messages are produced at 2 msgs/s. There are three consumers and slow consumer limit is 1 msgs/s. What happens is that all three consumers get killed as slow, even though it is impossible for any of them to be fast, since messages are distributed equally between the consumers (round-robin).
-------- /> 2/3 msg/s 2msg/s -> | B | -> 2/3 msg/s -------- \> 2/3 msg/s
broker log:
18:51:22,601 INFO [org.apache.activemq.artemis.core.server] AMQ221048: Consumer 8b0731cb-d826-11e6-b834-5203fed98f54:0 attached to queue 'jms.queue.myqueue' from /127.0.0.1:53716 identified as 'slow.' Expected consumption rate: 1 msgs/second; actual consumption rate: 0.8 msgs/second. ... 18:51:22,579 INFO [org.apache.activemq.artemis.core.server] AMQ221048: Consumer 8bb1f0bc-d826-11e6-b834-5203fed98f54:0 attached to queue 'jms.queue.myqueue' from /127.0.0.1:53718 identified as 'slow.' Expected consumption rate: 1 msgs/second; actual consumption rate: 0.61 msgs/second. ... 18:51:22,597 WARN [org.apache.activemq.artemis.core.server] AMQ222107: Cleared up resources for session 8bb1f0bc-d826-11e6-b834-5203fed98f54 18:51:22,598 TRACE [org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl] Connection removed 1031628788 from server ActiveMQServerImpl::serverUUID=91404edf-d821-11e6-9278-8a0df283ae70: java.lang.Exception: trace at org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl.connectionDestroyed(RemotingServiceImpl.java:525) [artemis-server-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor$Listener.connectionDestroyed(NettyAcceptor.java:683) [artemis-server-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection.close(NettyConnection.java:208) [artemis-core-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection.internalClose(ActiveMQProtonRemotingConnection.java:140) [artemis-amqp-protocol-2.0.0-SNAPSHOT.jar:] at org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection.fail(ActiveMQProtonRemotingConnection.java:77) [artemis-amqp-protocol-2.0.0-SNAPSHOT.jar:] at org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection.fail(AbstractRemotingConnection.java:210) [artemis-core-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.activemq.artemis.core.server.impl.QueueImpl$SlowConsumerReaperRunnable.run(QueueImpl.java:3161) [artemis-server-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [rt.jar:1.8.0_112] at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [rt.jar:1.8.0_112] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [rt.jar:1.8.0_112] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [rt.jar:1.8.0_112] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [rt.jar:1.8.0_112] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [rt.jar:1.8.0_112] at java.lang.Thread.run(Thread.java:745) [rt.jar:1.8.0_112] 18:51:22,598 INFO [org.apache.activemq.artemis.core.server] AMQ221048: Consumer 8a76b19a-d826-11e6-b834-5203fed98f54:0 attached to queue 'jms.queue.myqueue' from /127.0.0.1:53714 identified as 'slow.' Expected consumption rate: 1 msgs/second; actual consumption rate: 0.6 msgs/second. 18:51:22,599 DEBUG [org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl] RemotingServiceImpl::removing connection ID 978508991
I tried investigating in broker code and I think this is a case that is not explicitly handled.
I think this has real consumer impact in a situation when producer rate is usually high (so it requires multiple consumers working in parallel), but may occasionally drop close to consumer-threshold. In this case, broker disconnects all consumers who then have to reconnect and message processing is delayed for the time of the reconnecting.
org/apache/activemq/artemis/core/server/impl/QueueImpl.java: org.apache.activemq.artemis.core.server.impl.QueueImpl.SlowConsumerReaperRunnable#run
float queueRate = getRate(); if (logger.isDebugEnabled()) { logger.debug(getAddress() + ":" + getName() + " has " + getConsumerCount() + " consumer(s) and is receiving messages at a rate of " + queueRate + " msgs/second."); } for (Consumer consumer : getConsumers()) { if (consumer instanceof ServerConsumerImpl) { ServerConsumerImpl serverConsumer = (ServerConsumerImpl) consumer; float consumerRate = serverConsumer.getRate(); if (queueRate < threshold) { if (logger.isDebugEnabled()) { logger.debug("Insufficient messages received on queue \"" + getName() + "\" to satisfy slow-consumer-threshold. Skipping inspection of consumer."); } } else if (consumerRate < threshold) { RemotingConnection connection = null; ActiveMQServer server = ((PostOfficeImpl) postOffice).getServer(); RemotingService remotingService = server.getRemotingService(); for (RemotingConnection potentialConnection : remotingService.getConnections()) { if (potentialConnection.getID().toString().equals(serverConsumer.getConnectionID())) { connection = potentialConnection; } } serverConsumer.fireSlowConsumer(); if (connection != null) { ActiveMQServerLogger.LOGGER.slowConsumerDetected(serverConsumer.getSessionID(), serverConsumer.getID(), getName().toString(), connection.getRemoteAddress(), threshold, consumerRate); if (policy.equals(SlowConsumerPolicy.KILL)) { connection.killMessage(server.getNodeID());
- is cloned by
-
JBEAP-8627 Three consumers killed as slow if there is not enough messages for all of them to be fast
- Closed