-
Bug
-
Resolution: Done
-
Major
-
AMQ 7.0.3.GA
active/active cluster with two brokers
A message was sent to a queue using AMQP 1.0 to broker1. A client at broker2 tried to read it.
But broker2 throws an exception and the message is not sent to the client:
09:34:53,146 ERROR [org.apache.activemq.artemis.core.server] AMQ224016: Caught exception: java.lang.NullPointerException at org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage.setAddress(AMQPMessage.java:612) [artemis-amqp-protocol-2.4.0.jar:] at org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage.setAddress(AMQPMessage.java:63) [artemis-amqp-protocol-2.4.0.jar:] at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1368) [artemis-server-2.4.0.jar:2.4.0] at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1311) [artemis-server-2.4.0.jar:2.4.0] at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1304) [artemis-server-2.4.0.jar:2.4.0] at org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler.onSessionSend(ServerSessionPacketHandler.java:690) [artemis-server-2.4.0.jar:2.4.0] at org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler.onMessagePacket(ServerSessionPacketHandler.java:290) [artemis-server-2.4.0.jar:2.4.0] at org.apache.activemq.artemis.utils.actors.Actor.doTask(Actor.java:33) [artemis-commons-2.4.0.jar:2.4.0] at org.apache.activemq.artemis.utils.actors.ProcessorBase$ExecutorTask.run(ProcessorBase.java:53) [artemis-commons-2.4.0.jar:2.4.0] at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42) [artemis-commons-2.4.0.jar:2.4.0] at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31) [artemis-commons-2.4.0.jar:2.4.0] at org.apache.activemq.artemis.utils.actors.ProcessorBase$ExecutorTask.run(ProcessorBase.java:53) [artemis-commons-2.4.0.jar:2.4.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [rt.jar:1.8.0_151] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [rt.jar:1.8.0_151] at java.lang.Thread.run(Thread.java:748) [rt.jar:1.8.0_151]
The queue is not created on the fly but configured in the brokers, both brokers have this config:
<queues> <queue name="awe.test.queue"> <address>awe.test.queue</address> <durable>true</durable> </queue> </queues>
The cluster configuration is pretty much copied from the examples:
<!-- Clustering configuration --> <broadcast-groups> <broadcast-group name="my-broadcast-group"> <group-address>${udp-address:231.7.7.7}</group-address> <group-port>9876</group-port> <broadcast-period>100</broadcast-period> <connector-ref>netty-connector</connector-ref> </broadcast-group> </broadcast-groups> <discovery-groups> <discovery-group name="my-discovery-group"> <group-address>${udp-address:231.7.7.7}</group-address> <group-port>9876</group-port> <refresh-timeout>10000</refresh-timeout> </discovery-group> </discovery-groups> <cluster-connections> <cluster-connection name="my-cluster"> <connector-ref>netty-connector</connector-ref> <retry-interval>500</retry-interval> <use-duplicate-detection>true</use-duplicate-detection> <message-load-balancing>STRICT</message-load-balancing> <max-hops>1</max-hops> <discovery-group-ref discovery-group-name="my-discovery-group"/> </cluster-connection> </cluster-connections> <!-- a colocated server that will allow shared store full backups to be requested--> <ha-policy> <shared-store> <colocated> <backup-port-offset>100</backup-port-offset> <backup-request-retries>-1</backup-request-retries> <backup-request-retry-interval>2000</backup-request-retry-interval> <max-backups>1</max-backups> <request-backup>true</request-backup> <master> <failover-on-shutdown>true</failover-on-shutdown> </master> <slave> <failover-on-shutdown>true</failover-on-shutdown> </slave> </colocated> </shared-store> </ha-policy>
qpid-send and qpid-receive were used as test clients. Sending is done like:
qpid-send -b localhost:9800 -a awe.test.queue '--connection-option={protocol:amqp1.0}' --content-string 'test message Do 7. Dez 09:31:51 CET 2017' --durable=yes
Grabbed the message from the logfile. It looks like this:
0 -> @transfer(20) [handle=0, delivery-id=0, delivery-tag=b"\x00\x00\x00\x00", message-format=0, settled=false, more=false] (79) "\x00Sp\xc0\x04\x02AP\x00\x00St\xc1\x14\x04\xa1\x02snR\x01\xa1\x02ts\x81\x14\xfd\xf8\xcb\xdaG\x94U\x00Sw\xa1(test message Do 7. Dez 09:31:51 CET 2017"
Reading is done using:
qpid-receive -b localhost:9802 -a awe.test.queue '--connection-option={protocol:amqp1.0}' -t --timeout 500
I can see, that AMQPMessage.getAddress() can return null:
@Override public String getAddress() { if (address == null) { Properties properties = getProtonMessage().getProperties(); if (properties != null) { return properties.getTo(); } else { return null; } } else { return address; } }
But ServerSessionImpl.send() does not check for null, but it should. This is because SimpleString.toSimpleString() can return null if the input is null:
SimpleString address = message.getAddressSimpleString(); if (defaultAddress == null && address != null) { defaultAddress = address; } if (address == null) { // We don't want to force a re-encode when the message gets sent to the consumer message.setAddress(defaultAddress); }
SimpleString:
public static SimpleString toSimpleString(final String string) { if (string == null) { return null; } return new SimpleString(string); }
- relates to
-
ENTMQBR-966 Unsettled AMQP messages are lost when Receiver Link is opened on remote cluster member
- Closed
- incorporates
-
ARTEMIS-1542 Loading...