Uploaded image for project: 'AMQ Broker'
  1. AMQ Broker
  2. ENTMQBR-766

AMQP messages lose durability flag when a conversion happens on broker

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Duplicate
    • Icon: Major Major
    • AMQ 7.1.0.CR2
    • AMQ 7.0.2.GA
    • None
    • Hide

      Send a message over AMQP and receive it over Core or OpenWire:

      java  -jar /home/jdanek/Work/repos/dtests/dtests/node_data/clients/aac1.jar sender  --log-msgs interop --broker 127.0.0.1:5672 --address "test_direct_transient_empty_message" --count 1                                    
      {'durable': True, 'priority': 4, 'ttl': 0, 'first-acquirer': False, 'delivery-count': 0, 'id': 'b57d7df0-aff2-4c35-ac6c-e9a92522846c:1:1:1-1', 'user-id': None, 'address': 'test_direct_transient_empty_message', 'subject': None, 'reply-to': None, 'correlation-id': None, 'content-type': None, 'content-encoding': None, 'absolute-expiry-time': 0, 'creation-time': 1502708614661, 'group-id': None, 'group-sequence': 0, 'reply-to-group-id': None, 'properties': {'JMSXDeliveryCount': 1}, 'content': None}
      
      java  -jar /home/jdanek/Work/repos/dtests/dtests/node_data/clients/acce.jar receiver  --log-msgs interop --broker tcp://127.0.0.1:61616 --address "test_direct_transient_empty_message" --count 1
      {'durable': False, 'priority': 4, 'ttl': 0, 'first-acquirer': False, 'delivery-count': 0, 'id': None, 'user-id':None, 'address': 'test_direct_transient_empty_message', 'subject': None, 'reply-to': None, 'correlation-id': None, 'content-type': None, 'content-encoding': None, 'absolute-expiry-time': 0, 'creation-time': 1502708614661, 'group-id': None, 'group-sequence': 0, 'reply-to-group-id': None, 'properties': {'JMS_AMQP_MA_x-opt-jms-msg-type': 0, 'JMSXDeliveryCount': '1', 'NATIVE_MESSAGE_ID': 'ID:b57d7df0-aff2-4c35-ac6c-e9a92522846c:1:1:1-1', 'JMS_AMQP_MA_x-opt-jms-dest': 0}, 'content': None, 'type': None}
      

      Observe that received message has durable: False, while the sent message had durable: True.

      Show
      Send a message over AMQP and receive it over Core or OpenWire: java -jar /home/jdanek/Work/repos/dtests/dtests/node_data/clients/aac1.jar sender --log-msgs interop --broker 127.0.0.1:5672 --address "test_direct_transient_empty_message" --count 1 {'durable': True, 'priority': 4, 'ttl': 0, 'first-acquirer': False, 'delivery-count': 0, 'id': 'b57d7df0-aff2-4c35-ac6c-e9a92522846c:1:1:1-1', 'user-id': None, 'address': 'test_direct_transient_empty_message', 'subject': None, 'reply-to': None, 'correlation-id': None, 'content-type': None, 'content-encoding': None, 'absolute-expiry-time': 0, 'creation-time': 1502708614661, 'group-id': None, 'group-sequence': 0, 'reply-to-group-id': None, 'properties': {'JMSXDeliveryCount': 1}, 'content': None} java -jar /home/jdanek/Work/repos/dtests/dtests/node_data/clients/acce.jar receiver --log-msgs interop --broker tcp://127.0.0.1:61616 --address "test_direct_transient_empty_message" --count 1 {'durable': False, 'priority': 4, 'ttl': 0, 'first-acquirer': False, 'delivery-count': 0, 'id': None, 'user-id':None, 'address': 'test_direct_transient_empty_message', 'subject': None, 'reply-to': None, 'correlation-id': None, 'content-type': None, 'content-encoding': None, 'absolute-expiry-time': 0, 'creation-time': 1502708614661, 'group-id': None, 'group-sequence': 0, 'reply-to-group-id': None, 'properties': {'JMS_AMQP_MA_x-opt-jms-msg-type': 0, 'JMSXDeliveryCount': '1', 'NATIVE_MESSAGE_ID': 'ID:b57d7df0-aff2-4c35-ac6c-e9a92522846c:1:1:1-1', 'JMS_AMQP_MA_x-opt-jms-dest': 0}, 'content': None, 'type': None} Observe that received message has durable: False , while the sent message had durable: True .

      AMQPMessage#toCore() calls AmqpCoreConverter#populateMessage to set durability and other properties. I believe the problem is that AmqpCoreConverter#populateMessage reads these things from proton message, which was obtained by calling AMQPMessage#getProtonMessage().

         public MessageImpl getProtonMessage() {
            if (protonMessage == null) {
               protonMessage = (MessageImpl) Message.Factory.create();
      
               if (data != null) {
                  data.readerIndex(0);
                  protonMessage.decode(data.nioBuffer());
                  this._header = protonMessage.getHeader();
       ---->      protonMessage.setHeader(null);
               }
            }
      
            return protonMessage;
         }
      

      Since the header in the proton message is on purpose blanked, populateMessage cannot see the durability setting and defaults it to false.

              rh-ee-ataylor Andy Taylor
              jdanek@redhat.com Jiri Daněk
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

                Created:
                Updated:
                Resolved: