-
Story
-
Resolution: Unresolved
-
Minor
-
None
-
AMQ 7.2.0.GA
-
None
If I attempt to send an AMQP message that has an array as a value of a property, broker is unable to convert that for OpenWire consumer. Consuming the message with AMQP works fine.
[main] 16:00:51,178 INFO [org.apache.activemq.artemis.core.server] AMQ221001: Apache ActiveMQ Artemis Message Broker version 2.7.0-SNAPSHOT [localhost, nodeID=80e5d6b6-755b-11e8-b990-2201c4a6b177] [Thread-4 (ActiveMQ-server-org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl$5@6328d34a)] 16:00:54,507 WARN [org.apache.activemq.artemis.core.server] AMQ222151: removing consumer which did not handle a message, consumer=ServerConsumerImpl [id=0, filter=null, binding=LocalQueueBinding [address=myq, queue=QueueImpl[name=myq, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=80e5d6b6-755b-11e8-b990-2201c4a6b177], temp=false]@6e4629a8, filter=null, name=myq, clusterName=myq80e5d6b6-755b-11e8-b990-2201c4a6b177]], message=Reference[8]:NON-RELIABLE:AMQPMessage [durable=false, messageID=8, address=myq, size=132, applicationProperties=ApplicationProperties{{kornys=[pepa, tomasek, janicka]}}, properties=Properties{messageId=1, userId=null, to='kornysek', subject='null', replyTo='null', correlationId=kornys, contentType=null, contentEncoding=null, absoluteExpiryTime=null, creationTime=null, groupId='null', groupSequence=null, replyToGroupId='null'}, extraProperties = TypedProperties[_AMQ_AD=myq]]: java.lang.RuntimeException: class java.util.ArrayList is not a valid property type at org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage.toCore(AMQPMessage.java:1206) [:] at org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage.toCore(AMQPMessage.java:1212) [:] at org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession.sendMessage(AMQSession.java:313) [:] at org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl.deliverStandardMessage(ServerConsumerImpl.java:1106) [:] at org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl.proceedDeliver(ServerConsumerImpl.java:464) [:] at org.apache.activemq.artemis.core.server.impl.QueueImpl.proceedDeliver(QueueImpl.java:2938) [:] at org.apache.activemq.artemis.core.server.impl.QueueImpl.deliver(QueueImpl.java:2406) [:] at org.apache.activemq.artemis.core.server.impl.QueueImpl.access$2000(QueueImpl.java:107) [:] at org.apache.activemq.artemis.core.server.impl.QueueImpl$DeliverRunner.run(QueueImpl.java:3211) [:] at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42) [:] at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31) [:] at org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:66) [:] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [rt.jar:1.8.0_172] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [rt.jar:1.8.0_172] at org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118) [:] Caused by: org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException: class java.util.ArrayList is not a valid property type at org.apache.activemq.artemis.utils.collections.TypedProperties.setObjectProperty(TypedProperties.java:1056) [:] at org.apache.activemq.artemis.core.message.impl.CoreMessage.putObjectProperty(CoreMessage.java:1010) [:] at org.apache.activemq.artemis.core.message.impl.CoreMessage.putObjectProperty(CoreMessage.java:1029) [:] at org.apache.activemq.artemis.core.message.impl.CoreMessage.putObjectProperty(CoreMessage.java:52) [:] at org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage.setObjectProperty(ServerJMSMessage.java:348) [:] at org.apache.activemq.artemis.protocol.amqp.converter.AmqpCoreConverter.setProperty(AmqpCoreConverter.java:365) [:] at org.apache.activemq.artemis.protocol.amqp.converter.AmqpCoreConverter.populateMessage(AmqpCoreConverter.java:255) [:] at org.apache.activemq.artemis.protocol.amqp.converter.AmqpCoreConverter.toCore(AmqpCoreConverter.java:189) [:] at org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter.toCore(AMQPConverter.java:43) [:] at org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage.toCore(AMQPMessage.java:1204) [:] ... 14 more failed to receive a message using openwire, trying amqp now qpid-jms says [pepa, tomasek, janicka] [main] 16:00:56,030 INFO [org.apache.activemq.artemis.core.server] AMQ221002: Apache ActiveMQ Artemis Message Broker version 2.7.0-SNAPSHOT [80e5d6b6-755b-11e8-b990-2201c4a6b177] stopped, uptime 6.105 seconds [main] 16:00:56,321 INFO [org.apache.activemq.artemis.core.server] **** end #test testSendRhealikeMessage() *** java.lang.AssertionError: OpenWire did not receive the message
/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * <p> * http://www.apache.org/licenses/LICENSE-2.0 * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.activemq.artemis.tests.integration.crossprotocol; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQXAConnectionFactory; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.qpid.jms.JmsConnectionFactory; import org.jetbrains.annotations.NotNull; import org.junit.After; import org.junit.Before; import org.junit.Test; import javax.jms.*; import java.io.IOException; import java.io.OutputStream; import java.net.Socket; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.concurrent.TimeUnit; public class AMQPRheaToOpenWireTestCase extends ActiveMQTestBase { public static final String OWHOST = "localhost"; public static final int OWPORT = 61616; protected static final String urlString = "tcp://" + OWHOST + ":" + OWPORT + "?wireFormat.cacheEnabled=true"; private ActiveMQServer server; protected ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(urlString); protected ActiveMQXAConnectionFactory xaFactory = new ActiveMQXAConnectionFactory(urlString); private JmsConnectionFactory qpidfactory = new JmsConnectionFactory("amqp://localhost:61616"); protected String queueName = "myq"; private SimpleString coreQueue; @Override @Before public void setUp() throws Exception { super.setUp(); server = createServer(true, true); server.start(); server.waitForActivation(10, TimeUnit.SECONDS); Configuration serverConfig = server.getConfiguration(); serverConfig.getAddressesSettings().put("#", new AddressSettings().setAutoCreateQueues(false).setAutoCreateAddresses(false).setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ"))); serverConfig.setSecurityEnabled(false); coreQueue = new SimpleString(queueName); server.createQueue(coreQueue, RoutingType.ANYCAST, coreQueue, null, false, false); } @Override @After public void tearDown() throws Exception { server.stop(); } @Test public void testSendRhealikeMessage() throws Exception { sendRhealikeMessageToQueue(); try (Connection owc = factory.createConnection(); Session ows = owc.createSession(false, Session.CLIENT_ACKNOWLEDGE); MessageConsumer owr = ows.createConsumer(ows.createQueue(queueName))) { owc.start(); if (owr.receive(1000) == null) { System.out.println("failed to receive a message using openwire, trying amqp now"); owc.close(); // free the message for next consumer try (Connection ac = qpidfactory.createConnection(); Session as = ac.createSession(false, Session.CLIENT_ACKNOWLEDGE); MessageConsumer ar = as.createConsumer(as.createQueue(queueName))) { ac.start(); System.out.println("qpid-jms says " + ar.receive(1000).getObjectProperty("kornys")); } fail("OpenWire did not receive the message"); } } } /** * Queue name is myq, that is hardcoded in the protocol payload */ private void sendRhealikeMessageToQueue() throws Exception { // the message below is something that qpid-jms cannot send (unless robbie's secret feature is used?), if I try, it would fail with // javax.jms.MessageFormatException: Only objectified primitive objects and String types are allowed but was: [pepa, tomasek, janicka] type: class java.util.Arrays$ArrayList // try (Connection ac = qpidfactory.createConnection(); // Session as = ac.createSession(false, Session.AUTO_ACKNOWLEDGE); // MessageProducer ap = as.createProducer(as.createQueue(queueName))) { // ac.start(); // final Message message = as.createMessage(); // message.setObjectProperty("kornys", Arrays.asList("pepa", "tomasek", "janicka")); // ap.send(message); // } int peer0_2[] = { 0x41, 0x4d, 0x51, 0x50, 0x00, 0x01, 0x00, 0x00}; int peer0_3[] = { 0x00, 0x00, 0x00, 0x3a, 0x02, 0x00, 0x00, 0x00, 0x00, 0x53, 0x10, 0xd0, 0x00, 0x00, 0x00, 0x2a, 0x00, 0x00, 0x00, 0x01, 0xa1, 0x24, 0x38, 0x62, 0x32, 0x32, 0x35, 0x37, 0x63, 0x31, 0x2d, 0x62, 0x62, 0x36, 0x36, 0x2d, 0x39, 0x39, 0x34, 0x35, 0x2d, 0x39, 0x62, 0x61, 0x33, 0x2d, 0x32, 0x63, 0x34, 0x39, 0x39, 0x61, 0x31, 0x35, 0x38, 0x66, 0x65, 0x35}; int peer0_4[] = { 0x00, 0x00, 0x00, 0x20, 0x02, 0x00, 0x00, 0x00, 0x00, 0x53, 0x11, 0xd0, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x04, 0x40, 0x43, 0x70, 0x00, 0x00, 0x08, 0x00, 0x70, 0xff, 0xff, 0xff, 0xff}; int peer0_5[] = { 0x00, 0x00, 0x00, 0x56, 0x02, 0x00, 0x00, 0x00, 0x00, 0x53, 0x12, 0xd0, 0x00, 0x00, 0x00, 0x46, 0x00, 0x00, 0x00, 0x0a, 0xa1, 0x24, 0x30, 0x39, 0x62, 0x35, 0x62, 0x66, 0x33, 0x63, 0x2d, 0x37, 0x65, 0x66, 0x63, 0x2d, 0x34, 0x34, 0x34, 0x39, 0x2d, 0x61, 0x36, 0x63, 0x35, 0x2d, 0x38, 0x37, 0x32, 0x32, 0x36, 0x32, 0x63, 0x39, 0x62, 0x36, 0x33, 0x35, 0x43, 0x42, 0x40, 0x40, 0x00, 0x53, 0x28, 0x45, 0x00, 0x53, 0x29, 0xd0, 0x00, 0x00, 0x00, 0x09, 0x00, 0x00, 0x00, 0x01, 0xa1, 0x03, 0x6d, 0x79, 0x71, 0x40, 0x40, 0x43}; int peer0_6[] = { 0x00, 0x00, 0x00, 0x90, 0x02, 0x00, 0x00, 0x00, 0x00, 0x53, 0x14, 0xd0, 0x00, 0x00, 0x00, 0x0c, 0x00, 0x00, 0x00, 0x06, 0x43, 0x43, 0xa0, 0x01, 0x30, 0x43, 0x42, 0x42, 0x00, 0x53, 0x70, 0x45, 0x00, 0x53, 0x73, 0xd0, 0x00, 0x00, 0x00, 0x1b, 0x00, 0x00, 0x00, 0x06, 0x53, 0x01, 0x40, 0xa1, 0x08, 0x6b, 0x6f, 0x72, 0x6e, 0x79, 0x73, 0x65, 0x6b, 0x40, 0x40, 0xa1, 0x06, 0x6b, 0x6f, 0x72, 0x6e, 0x79, 0x73, 0x00, 0x53, 0x74, 0xd1, 0x00, 0x00, 0x00, 0x2d, 0x00, 0x00, 0x00, 0x02, 0xa1, 0x06, 0x6b, 0x6f, 0x72, 0x6e, 0x79, 0x73, 0xd0, 0x00, 0x00, 0x00, 0x1c, 0x00, 0x00, 0x00, 0x03, 0xa1, 0x04, 0x70, 0x65, 0x70, 0x61, 0xa1, 0x07, 0x74, 0x6f, 0x6d, 0x61, 0x73, 0x65, 0x6b, 0xa1, 0x07, 0x6a, 0x61, 0x6e, 0x69, 0x63, 0x6b, 0x61, 0x00, 0x53, 0x77, 0xd1, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x02, 0xa1, 0x08, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x52, 0x01}; int peer0_7[] = { 0x00, 0x00, 0x00, 0x0c, 0x02, 0x00, 0x00, 0x00, 0x00, 0x53, 0x18, 0x45}; int messages[][] = {peer0_2, peer0_3, peer0_4, peer0_5, peer0_6, peer0_7}; Socket s = new Socket(OWHOST, OWPORT); OutputStream stream = s.getOutputStream(); for (int buffer[] : messages) { convertToByteArray(buffer); stream.write(convertToByteArray(buffer)); stream.flush(); Thread.sleep(500); } s.close(); } @NotNull private byte[] convertToByteArray(int[] buffer) { ByteBuffer b = ByteBuffer.allocate(buffer.length); for (int value : buffer) { b.put((byte) value); } return b.array(); } }
- relates to
-
ENTMQBR-1696 AMQP message containing UnsignedInteger value cannot be converted to Core
- New