Uploaded image for project: 'AMQ Broker'
  1. AMQ Broker
  2. ENTMQBR-1694

RuntimeException when converting AMQP message containing list in a property to OpenWire

XMLWordPrintable

    • Icon: Story Story
    • Resolution: Unresolved
    • Icon: Minor Minor
    • None
    • AMQ 7.2.0.GA
    • None

      If I attempt to send an AMQP message that has an array as a value of a property, broker is unable to convert that for OpenWire consumer. Consuming the message with AMQP works fine.

      [main] 16:00:51,178 INFO  [org.apache.activemq.artemis.core.server] AMQ221001: Apache ActiveMQ Artemis Message Broker version 2.7.0-SNAPSHOT [localhost, nodeID=80e5d6b6-755b-11e8-b990-2201c4a6b177] 
      [Thread-4 (ActiveMQ-server-org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl$5@6328d34a)] 16:00:54,507 WARN  [org.apache.activemq.artemis.core.server] AMQ222151: removing consumer which did not handle a message, consumer=ServerConsumerImpl [id=0, filter=null, binding=LocalQueueBinding [address=myq, queue=QueueImpl[name=myq, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=80e5d6b6-755b-11e8-b990-2201c4a6b177], temp=false]@6e4629a8, filter=null, name=myq, clusterName=myq80e5d6b6-755b-11e8-b990-2201c4a6b177]], message=Reference[8]:NON-RELIABLE:AMQPMessage [durable=false, messageID=8, address=myq, size=132, applicationProperties=ApplicationProperties{{kornys=[pepa, tomasek, janicka]}}, properties=Properties{messageId=1, userId=null, to='kornysek', subject='null', replyTo='null', correlationId=kornys, contentType=null, contentEncoding=null, absoluteExpiryTime=null, creationTime=null, groupId='null', groupSequence=null, replyToGroupId='null'}, extraProperties = TypedProperties[_AMQ_AD=myq]]: java.lang.RuntimeException: class java.util.ArrayList is not a valid property type
      	at org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage.toCore(AMQPMessage.java:1206) [:]
      	at org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage.toCore(AMQPMessage.java:1212) [:]
      	at org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession.sendMessage(AMQSession.java:313) [:]
      	at org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl.deliverStandardMessage(ServerConsumerImpl.java:1106) [:]
      	at org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl.proceedDeliver(ServerConsumerImpl.java:464) [:]
      	at org.apache.activemq.artemis.core.server.impl.QueueImpl.proceedDeliver(QueueImpl.java:2938) [:]
      	at org.apache.activemq.artemis.core.server.impl.QueueImpl.deliver(QueueImpl.java:2406) [:]
      	at org.apache.activemq.artemis.core.server.impl.QueueImpl.access$2000(QueueImpl.java:107) [:]
      	at org.apache.activemq.artemis.core.server.impl.QueueImpl$DeliverRunner.run(QueueImpl.java:3211) [:]
      	at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42) [:]
      	at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31) [:]
      	at org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:66) [:]
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [rt.jar:1.8.0_172]
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [rt.jar:1.8.0_172]
      	at org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118) [:]
      Caused by: org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException: class java.util.ArrayList is not a valid property type
      	at org.apache.activemq.artemis.utils.collections.TypedProperties.setObjectProperty(TypedProperties.java:1056) [:]
      	at org.apache.activemq.artemis.core.message.impl.CoreMessage.putObjectProperty(CoreMessage.java:1010) [:]
      	at org.apache.activemq.artemis.core.message.impl.CoreMessage.putObjectProperty(CoreMessage.java:1029) [:]
      	at org.apache.activemq.artemis.core.message.impl.CoreMessage.putObjectProperty(CoreMessage.java:52) [:]
      	at org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage.setObjectProperty(ServerJMSMessage.java:348) [:]
      	at org.apache.activemq.artemis.protocol.amqp.converter.AmqpCoreConverter.setProperty(AmqpCoreConverter.java:365) [:]
      	at org.apache.activemq.artemis.protocol.amqp.converter.AmqpCoreConverter.populateMessage(AmqpCoreConverter.java:255) [:]
      	at org.apache.activemq.artemis.protocol.amqp.converter.AmqpCoreConverter.toCore(AmqpCoreConverter.java:189) [:]
      	at org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter.toCore(AMQPConverter.java:43) [:]
      	at org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage.toCore(AMQPMessage.java:1204) [:]
      	... 14 more
      
      failed to receive a message using openwire, trying amqp now
      qpid-jms says [pepa, tomasek, janicka]
      [main] 16:00:56,030 INFO  [org.apache.activemq.artemis.core.server] AMQ221002: Apache ActiveMQ Artemis Message Broker version 2.7.0-SNAPSHOT [80e5d6b6-755b-11e8-b990-2201c4a6b177] stopped, uptime 6.105 seconds
      [main] 16:00:56,321 INFO  [org.apache.activemq.artemis.core.server] **** end #test testSendRhealikeMessage() ***
      
      java.lang.AssertionError: OpenWire did not receive the message
      
      /**
       * Licensed to the Apache Software Foundation (ASF) under one or more
       * contributor license agreements. See the NOTICE file distributed with
       * this work for additional information regarding copyright ownership.
       * The ASF licenses this file to You under the Apache License, Version 2.0
       * (the "License"); you may not use this file except in compliance with
       * the License. You may obtain a copy of the License at
       * <p>
       * http://www.apache.org/licenses/LICENSE-2.0
       * <p>
       * Unless required by applicable law or agreed to in writing, software
       * distributed under the License is distributed on an "AS IS" BASIS,
       * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
       * See the License for the specific language governing permissions and
       * limitations under the License.
       */
      package org.apache.activemq.artemis.tests.integration.crossprotocol;
      
      import org.apache.activemq.ActiveMQConnectionFactory;
      import org.apache.activemq.ActiveMQXAConnectionFactory;
      import org.apache.activemq.artemis.api.core.RoutingType;
      import org.apache.activemq.artemis.api.core.SimpleString;
      import org.apache.activemq.artemis.core.config.Configuration;
      import org.apache.activemq.artemis.core.server.ActiveMQServer;
      import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
      import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
      import org.apache.qpid.jms.JmsConnectionFactory;
      import org.jetbrains.annotations.NotNull;
      import org.junit.After;
      import org.junit.Before;
      import org.junit.Test;
      
      import javax.jms.*;
      import java.io.IOException;
      import java.io.OutputStream;
      import java.net.Socket;
      import java.nio.ByteBuffer;
      import java.util.Arrays;
      import java.util.concurrent.TimeUnit;
      
      public class AMQPRheaToOpenWireTestCase extends ActiveMQTestBase {
      
          public static final String OWHOST = "localhost";
          public static final int OWPORT = 61616;
          protected static final String urlString = "tcp://" + OWHOST + ":" + OWPORT + "?wireFormat.cacheEnabled=true";
      
          private ActiveMQServer server;
          protected ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(urlString);
          protected ActiveMQXAConnectionFactory xaFactory = new ActiveMQXAConnectionFactory(urlString);
          private JmsConnectionFactory qpidfactory = new JmsConnectionFactory("amqp://localhost:61616");
          protected String queueName = "myq";
          private SimpleString coreQueue;
      
          @Override
          @Before
          public void setUp() throws Exception {
              super.setUp();
              server = createServer(true, true);
              server.start();
              server.waitForActivation(10, TimeUnit.SECONDS);
      
              Configuration serverConfig = server.getConfiguration();
              serverConfig.getAddressesSettings().put("#", new AddressSettings().setAutoCreateQueues(false).setAutoCreateAddresses(false).setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ")));
              serverConfig.setSecurityEnabled(false);
              coreQueue = new SimpleString(queueName);
              server.createQueue(coreQueue, RoutingType.ANYCAST, coreQueue, null, false, false);
          }
      
          @Override
          @After
          public void tearDown() throws Exception {
              server.stop();
          }
      
          @Test
          public void testSendRhealikeMessage() throws Exception {
              sendRhealikeMessageToQueue();
              try (Connection owc = factory.createConnection();
                   Session ows = owc.createSession(false, Session.CLIENT_ACKNOWLEDGE);
                   MessageConsumer owr = ows.createConsumer(ows.createQueue(queueName))) {
                  owc.start();
                  if (owr.receive(1000) == null) {
                      System.out.println("failed to receive a message using openwire, trying amqp now");
      
                      owc.close(); // free the message for next consumer
      
                      try (Connection ac = qpidfactory.createConnection();
                           Session as = ac.createSession(false, Session.CLIENT_ACKNOWLEDGE);
                           MessageConsumer ar = as.createConsumer(as.createQueue(queueName))) {
                          ac.start();
                          System.out.println("qpid-jms says " + ar.receive(1000).getObjectProperty("kornys"));
                      }
      
                      fail("OpenWire did not receive the message");
                  }
              }
          }
      
          /**
           * Queue name is myq, that is hardcoded in the protocol payload
           */
          private void sendRhealikeMessageToQueue() throws Exception {
              // the message below is something that qpid-jms cannot send (unless robbie's secret feature is used?), if I try, it would fail with
              // javax.jms.MessageFormatException: Only objectified primitive objects and String types are allowed but was: [pepa, tomasek, janicka] type: class java.util.Arrays$ArrayList
      //        try (Connection ac = qpidfactory.createConnection();
      //             Session as = ac.createSession(false, Session.AUTO_ACKNOWLEDGE);
      //             MessageProducer ap = as.createProducer(as.createQueue(queueName))) {
      //            ac.start();
      //            final Message message = as.createMessage();
      //            message.setObjectProperty("kornys", Arrays.asList("pepa", "tomasek", "janicka"));
      //            ap.send(message);
      //        }
      
              int peer0_2[] = {
                      0x41, 0x4d, 0x51, 0x50, 0x00, 0x01, 0x00, 0x00};
              int peer0_3[] = {
                      0x00, 0x00, 0x00, 0x3a, 0x02, 0x00, 0x00, 0x00,
                      0x00, 0x53, 0x10, 0xd0, 0x00, 0x00, 0x00, 0x2a,
                      0x00, 0x00, 0x00, 0x01, 0xa1, 0x24, 0x38, 0x62,
                      0x32, 0x32, 0x35, 0x37, 0x63, 0x31, 0x2d, 0x62,
                      0x62, 0x36, 0x36, 0x2d, 0x39, 0x39, 0x34, 0x35,
                      0x2d, 0x39, 0x62, 0x61, 0x33, 0x2d, 0x32, 0x63,
                      0x34, 0x39, 0x39, 0x61, 0x31, 0x35, 0x38, 0x66,
                      0x65, 0x35};
              int peer0_4[] = {
                      0x00, 0x00, 0x00, 0x20, 0x02, 0x00, 0x00, 0x00,
                      0x00, 0x53, 0x11, 0xd0, 0x00, 0x00, 0x00, 0x10,
                      0x00, 0x00, 0x00, 0x04, 0x40, 0x43, 0x70, 0x00,
                      0x00, 0x08, 0x00, 0x70, 0xff, 0xff, 0xff, 0xff};
              int peer0_5[] = {
                      0x00, 0x00, 0x00, 0x56, 0x02, 0x00, 0x00, 0x00,
                      0x00, 0x53, 0x12, 0xd0, 0x00, 0x00, 0x00, 0x46,
                      0x00, 0x00, 0x00, 0x0a, 0xa1, 0x24, 0x30, 0x39,
                      0x62, 0x35, 0x62, 0x66, 0x33, 0x63, 0x2d, 0x37,
                      0x65, 0x66, 0x63, 0x2d, 0x34, 0x34, 0x34, 0x39,
                      0x2d, 0x61, 0x36, 0x63, 0x35, 0x2d, 0x38, 0x37,
                      0x32, 0x32, 0x36, 0x32, 0x63, 0x39, 0x62, 0x36,
                      0x33, 0x35, 0x43, 0x42, 0x40, 0x40, 0x00, 0x53,
                      0x28, 0x45, 0x00, 0x53, 0x29, 0xd0, 0x00, 0x00,
                      0x00, 0x09, 0x00, 0x00, 0x00, 0x01, 0xa1, 0x03,
                      0x6d, 0x79, 0x71, 0x40, 0x40, 0x43};
              int peer0_6[] = {
                      0x00, 0x00, 0x00, 0x90, 0x02, 0x00, 0x00, 0x00,
                      0x00, 0x53, 0x14, 0xd0, 0x00, 0x00, 0x00, 0x0c,
                      0x00, 0x00, 0x00, 0x06, 0x43, 0x43, 0xa0, 0x01,
                      0x30, 0x43, 0x42, 0x42, 0x00, 0x53, 0x70, 0x45,
                      0x00, 0x53, 0x73, 0xd0, 0x00, 0x00, 0x00, 0x1b,
                      0x00, 0x00, 0x00, 0x06, 0x53, 0x01, 0x40, 0xa1,
                      0x08, 0x6b, 0x6f, 0x72, 0x6e, 0x79, 0x73, 0x65,
                      0x6b, 0x40, 0x40, 0xa1, 0x06, 0x6b, 0x6f, 0x72,
                      0x6e, 0x79, 0x73, 0x00, 0x53, 0x74, 0xd1, 0x00,
                      0x00, 0x00, 0x2d, 0x00, 0x00, 0x00, 0x02, 0xa1,
                      0x06, 0x6b, 0x6f, 0x72, 0x6e, 0x79, 0x73, 0xd0,
                      0x00, 0x00, 0x00, 0x1c, 0x00, 0x00, 0x00, 0x03,
                      0xa1, 0x04, 0x70, 0x65, 0x70, 0x61, 0xa1, 0x07,
                      0x74, 0x6f, 0x6d, 0x61, 0x73, 0x65, 0x6b, 0xa1,
                      0x07, 0x6a, 0x61, 0x6e, 0x69, 0x63, 0x6b, 0x61,
                      0x00, 0x53, 0x77, 0xd1, 0x00, 0x00, 0x00, 0x10,
                      0x00, 0x00, 0x00, 0x02, 0xa1, 0x08, 0x73, 0x65,
                      0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x52, 0x01};
              int peer0_7[] = {
                      0x00, 0x00, 0x00, 0x0c, 0x02, 0x00, 0x00, 0x00,
                      0x00, 0x53, 0x18, 0x45};
              int messages[][] = {peer0_2, peer0_3, peer0_4, peer0_5, peer0_6, peer0_7};
      
              Socket s = new Socket(OWHOST, OWPORT);
              OutputStream stream = s.getOutputStream();
              for (int buffer[] : messages) {
                  convertToByteArray(buffer);
                  stream.write(convertToByteArray(buffer));
                  stream.flush();
                  Thread.sleep(500);
              }
              s.close();
          }
      
          @NotNull
          private byte[] convertToByteArray(int[] buffer) {
              ByteBuffer b = ByteBuffer.allocate(buffer.length);
              for (int value : buffer) {
                  b.put((byte) value);
              }
              return b.array();
          }
      }
      

              rh-ee-ataylor Andy Taylor
              jdanek@redhat.com Jiri Daněk
              Votes:
              1 Vote for this issue
              Watchers:
              1 Start watching this issue

                Created:
                Updated: