-
Bug
-
Resolution: Done
-
Major
-
None
-
AMQ 7.2.0.GA
-
None
If I send a message from rhea AMQP client (https://github.com/amqp/rhea) to a queue and then attempting to receive the message using activemq-client, I get the following exception
2018-06-21 16:12:24,031 WARN [org.apache.activemq.artemis.core.server] Error during message dispatch: java.lang.NullPointerException at org.apache.activemq.artemis.utils.collections.TypedProperties.getMap(TypedProperties.java:1012) [artemis-commons-2.6.1.amq-720003-redhat-1.jar:2.6.1.amq-720003-redhat-1] at org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter.toAMQMessageMapType(OpenWireMessageConverter.java:720) [artemis-openwire-protocol-2.6.1.amq-720003-redhat-1.jar:] at org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter.toAMQMessage(OpenWireMessageConverter.java:535) [artemis-openwire-protocol-2.6.1.amq-720003-redhat-1.jar:] at org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter.createMessageDispatch(OpenWireMessageConverter.java:492) [artemis-openwire-protocol-2.6.1.amq-720003-redhat-1.jar:] at org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer.handleDeliver(AMQConsumer.java:255) [artemis-openwire-protocol-2.6.1.amq-720003-redhat-1.jar:] at org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession.sendMessage(AMQSession.java:313) [artemis-openwire-protocol-2.6.1.amq-720003-redhat-1.jar:] at org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl.deliverStandardMessage(ServerConsumerImpl.java:1106) [artemis-server-2.6.1.amq-720003-redhat-1.jar:2.6.1.amq-720003-redhat-1] at org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl.proceedDeliver(ServerConsumerImpl.java:464) [artemis-server-2.6.1.amq-720003-redhat-1.jar:2.6.1.amq-720003-redhat-1] at org.apache.activemq.artemis.core.server.impl.QueueImpl.proceedDeliver(QueueImpl.java:2938) [artemis-server-2.6.1.amq-720003-redhat-1.jar:2.6.1.amq-720003-redhat-1] at org.apache.activemq.artemis.core.server.impl.QueueImpl.deliver(QueueImpl.java:2406) [artemis-server-2.6.1.amq-720003-redhat-1.jar:2.6.1.amq-720003-redhat-1] at org.apache.activemq.artemis.core.server.impl.QueueImpl.access$2000(QueueImpl.java:107) [artemis-server-2.6.1.amq-720003-redhat-1.jar:2.6.1.amq-720003-redhat-1] at org.apache.activemq.artemis.core.server.impl.QueueImpl$DeliverRunner.run(QueueImpl.java:3211) [artemis-server-2.6.1.amq-720003-redhat-1.jar:2.6.1.amq-720003-redhat-1] at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42) [artemis-commons-2.6.1.amq-720003-redhat-1.jar:2.6.1.amq-720003-redhat-1] at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31) [artemis-commons-2.6.1.amq-720003-redhat-1.jar:2.6.1.amq-720003-redhat-1] at org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:66) [artemis-commons-2.6.1.amq-720003-redhat-1.jar:2.6.1.amq-720003-redhat-1] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [rt.jar:1.8.0_152-release] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [rt.jar:1.8.0_152-release] at org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118) [artemis-commons-2.6.1.amq-720003-redhat-1.jar:2.6.1.amq-720003-redhat-1]
This differs from ENTMQBR-1694 in that I'd expect the message gets converted successfully. Similar test (JAMQMsgPatterns/test_direct_transient_empty_message) is passing for JMS Core -> OpenWire and for proton-python -> OpenWire. I do not know what is special about Rhea that it's message is not converted.
/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * <p> * http://www.apache.org/licenses/LICENSE-2.0 * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.activemq.artemis.tests.integration.crossprotocol; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQXAConnectionFactory; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.qpid.jms.JmsConnectionFactory; import org.jetbrains.annotations.NotNull; import org.junit.After; import org.junit.Before; import org.junit.Test; import javax.jms.*; import java.io.IOException; import java.io.OutputStream; import java.net.Socket; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.concurrent.TimeUnit; public class AMQPRheaToOpenWireTestCase extends ActiveMQTestBase { public static final String OWHOST = "localhost"; public static final int OWPORT = 61616; protected static final String urlString = "tcp://" + OWHOST + ":" + OWPORT + "?wireFormat.cacheEnabled=true"; private ActiveMQServer server; protected ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(urlString); protected ActiveMQXAConnectionFactory xaFactory = new ActiveMQXAConnectionFactory(urlString); private JmsConnectionFactory qpidfactory = new JmsConnectionFactory("amqp://localhost:61616"); protected String queueName = "JAMQMsgPatterns0b7TestssDbvJpl"; private SimpleString coreQueue; @Override @Before public void setUp() throws Exception { super.setUp(); server = createServer(true, true); server.start(); server.waitForActivation(10, TimeUnit.SECONDS); Configuration serverConfig = server.getConfiguration(); serverConfig.getAddressesSettings().put("#", new AddressSettings().setAutoCreateQueues(false).setAutoCreateAddresses(false).setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ"))); serverConfig.setSecurityEnabled(false); coreQueue = new SimpleString(queueName); server.createQueue(coreQueue, RoutingType.ANYCAST, coreQueue, null, false, false); } @Override @After public void tearDown() throws Exception { server.stop(); } @Test public void testSendRhealikeMessage() throws Exception { sendRhealikeMessageToQueue(); try (Connection owc = factory.createConnection(); Session ows = owc.createSession(false, Session.CLIENT_ACKNOWLEDGE); MessageConsumer owr = ows.createConsumer(ows.createQueue(queueName))) { owc.start(); if (owr.receive(1000) == null) { System.out.println("failed to receive a message using openwire, trying amqp now"); owc.close(); // free the message for next consumer try (Connection ac = qpidfactory.createConnection(); Session as = ac.createSession(false, Session.CLIENT_ACKNOWLEDGE); MessageConsumer ar = as.createConsumer(as.createQueue(queueName))) { ac.start(); System.out.println("qpid-jms says " + ar.receive(1000)); } fail("OpenWire did not receive the message"); } } } /** * Queue name is JAMQMsgPatterns0b7TestssDbvJpl, that is hardcoded in the protocol payload */ private void sendRhealikeMessageToQueue() throws Exception { int peer0_2[] = { 0x41, 0x4d, 0x51, 0x50, 0x00, 0x01, 0x00, 0x00 }; int peer0_3[] = { 0x00, 0x00, 0x00, 0x4b, 0x02, 0x00, 0x00, 0x00, 0x00, 0x53, 0x10, 0xd0, 0x00, 0x00, 0x00, 0x3b, 0x00, 0x00, 0x00, 0x0a, 0xa1, 0x24, 0x61, 0x33, 0x64, 0x61, 0x30, 0x64, 0x38, 0x32, 0x2d, 0x30, 0x30, 0x63, 0x61, 0x2d, 0x38, 0x65, 0x34, 0x63, 0x2d, 0x39, 0x66, 0x37, 0x37, 0x2d, 0x38, 0x34, 0x35, 0x31, 0x66, 0x38, 0x63, 0x65, 0x35, 0x35, 0x38, 0x64, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0xd1, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00 }; int peer0_4[] = { 0x00, 0x00, 0x00, 0x20, 0x02, 0x00, 0x00, 0x00, 0x00, 0x53, 0x11, 0xd0, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x04, 0x40, 0x43, 0x70, 0x00, 0x00, 0x08, 0x00, 0x70, 0xff, 0xff, 0xff, 0xff }; int peer0_5[] = { 0x00, 0x00, 0x00, 0x72, 0x02, 0x00, 0x00, 0x00, 0x00, 0x53, 0x12, 0xd0, 0x00, 0x00, 0x00, 0x62, 0x00, 0x00, 0x00, 0x0a, 0xa1, 0x24, 0x36, 0x34, 0x35, 0x66, 0x64, 0x64, 0x39, 0x38, 0x2d, 0x61, 0x65, 0x66, 0x63, 0x2d, 0x35, 0x30, 0x34, 0x38, 0x2d, 0x61, 0x31, 0x32, 0x61, 0x2d, 0x64, 0x39, 0x38, 0x36, 0x34, 0x65, 0x65, 0x63, 0x65, 0x39, 0x63, 0x31, 0x43, 0x42, 0x40, 0x40, 0x00, 0x53, 0x28, 0x45, 0x00, 0x53, 0x29, 0xd0, 0x00, 0x00, 0x00, 0x25, 0x00, 0x00, 0x00, 0x02, 0xa1, 0x1e, 0x4a, 0x41, 0x4d, 0x51, 0x4d, 0x73, 0x67, 0x50, 0x61, 0x74, 0x74, 0x65, 0x72, 0x6e, 0x73, 0x30, 0x62, 0x37, 0x54, 0x65, 0x73, 0x74, 0x73, 0x73, 0x44, 0x62, 0x76, 0x4a, 0x70, 0x6c, 0x43, 0x40, 0x40, 0x43 }; int peer0_6[] = { 0x00, 0x00, 0x00, 0x9a, 0x02, 0x00, 0x00, 0x00, 0x00, 0x53, 0x14, 0xd0, 0x00, 0x00, 0x00, 0x0c, 0x00, 0x00, 0x00, 0x06, 0x43, 0x43, 0xa0, 0x01, 0x30, 0x43, 0x42, 0x42, 0x00, 0x53, 0x70, 0xd0, 0x00, 0x00, 0x00, 0x09, 0x00, 0x00, 0x00, 0x05, 0x42, 0x40, 0x40, 0x40, 0x43, 0x00, 0x53, 0x72, 0xd1, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x53, 0x73, 0xd0, 0x00, 0x00, 0x00, 0x41, 0x00, 0x00, 0x00, 0x0d, 0x40, 0x40, 0xa1, 0x1e, 0x4a, 0x41, 0x4d, 0x51, 0x4d, 0x73, 0x67, 0x50, 0x61, 0x74, 0x74, 0x65, 0x72, 0x6e, 0x73, 0x30, 0x62, 0x37, 0x54, 0x65, 0x73, 0x74, 0x73, 0x73, 0x44, 0x62, 0x76, 0x4a, 0x70, 0x6c, 0x40, 0x40, 0xa1, 0x09, 0x63, 0x6f, 0x72, 0x72, 0x2d, 0x69, 0x64, 0x2d, 0x34, 0xa3, 0x06, 0x73, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x00, 0x53, 0x74, 0xd1, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x53, 0x77, 0xd1, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00 }; int peer0_7[] = { 0x00, 0x00, 0x00, 0x16, 0x02, 0x00, 0x00, 0x00, 0x00, 0x53, 0x16, 0xd0, 0x00, 0x00, 0x00, 0x06, 0x00, 0x00, 0x00, 0x02, 0x43, 0x42 }; int peer0_8[] = { 0x00, 0x00, 0x00, 0x0c, 0x02, 0x00, 0x00, 0x00, 0x00, 0x53, 0x18, 0x45 }; int messages[][] = {peer0_2, peer0_3, peer0_4, peer0_5, peer0_6, peer0_7, peer0_8}; Socket s = new Socket(OWHOST, OWPORT); OutputStream stream = s.getOutputStream(); for (int buffer[] : messages) { convertToByteArray(buffer); stream.write(convertToByteArray(buffer)); stream.flush(); Thread.sleep(500); } s.close(); } @NotNull private byte[] convertToByteArray(int[] buffer) { ByteBuffer b = ByteBuffer.allocate(buffer.length); for (int value : buffer) { b.put((byte) value); } return b.array(); } }