Uploaded image for project: 'AMQ Broker'
  1. AMQ Broker
  2. ENTMQBR-2696

Link stealing, mandatory part of AMQP 1.0 spec, is not supported

    XMLWordPrintable

Details

    • Story
    • Resolution: Won't Do
    • Minor
    • None
    • AMQ 7.4.0.CR2
    • amqp-protocol
    • None

    Description

      AMQP 1.0 says

      [...] link can be globally identified by the (ordered) tuple (A,B,<name>). Consequently, a link can only be active in one connection at a time. If an attempt is made to attach the link subsequently when it is not suspended, then the link can be 'stolen', i.e., the second attach succeeds and the first attach MUST then be closed with a link error of stolen. This behavior ensures that in the event of a connection failure occurring and being noticed by one party, that re-establishment has the desired effect.
      http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#doc-idp298752

      This is not what happens. First attach is not closed with a link error of stolen, instead, if there is a consumer limit set, the broker disallows the second attach with AMQ119005: error creating consumer, AMQ229200: Maximum Consumer Limit Reached on Queue.

      This can be reproduced by the following broker test, which is taken from ENTMQCL-1540, a bug that can be considered as being caused by this one. The test below passes for Core JMS, and fails with Qpid JMS.

      tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/UnsubscribeDurableSubTest.java
      package org.apache.activemq.artemis.tests.integration.crossprotocol;
      
      import org.apache.activemq.artemis.tests.integration.openwire.OpenWireTestBase;
      import org.apache.activemq.artemis.tests.util.Wait;
      import org.junit.Before;
      import org.junit.Test;
      import org.junit.runner.RunWith;
      import org.junit.runners.Parameterized;
      
      import javax.jms.JMSException;
      import javax.jms.Session;
      import javax.jms.TextMessage;
      import javax.jms.Topic;
      import javax.jms.TopicConnection;
      import javax.jms.TopicConnectionFactory;
      import javax.jms.TopicPublisher;
      import javax.jms.TopicSession;
      import javax.jms.TopicSubscriber;
      import java.util.Arrays;
      
      import static org.apache.activemq.artemis.tests.util.CFUtil.createTopicConnectionFactory;
      
      @RunWith(Parameterized.class)
      public class UnsubscribeDurableSubTest extends OpenWireTestBase {
      
          private String protocol;
          private TopicConnectionFactory connectionFactory;
      
          public UnsubscribeDurableSubTest(String protocol) {
              this.protocol = protocol;
          }
      
          @Parameterized.Parameters(name = "protocol={0}")
          public static Iterable<Object[]> data() {
              return Arrays.asList(new Object[][]{
                      {"OPENWIRE"},
                      {"CORE"},
                      {"AMQP"},
              });
          }
      
          @Before
          public void setupCF() {
              connectionFactory = createTopicConnectionFactory(protocol, urlString);
          }
      
          @Before
          public void setupServer() throws Exception {
              Wait.assertTrue(server::isStarted);
              Wait.assertTrue(server::isActive);
          }
      
          @Test
          public void testUnsubscribingWithUnackedMessage() throws JMSException {
              final String aDurableSubscriber = "aDurableSubscriber";
      
              TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) connectionFactory;
              TopicConnection topicConnection = topicConnectionFactory.createTopicConnection();
              topicConnection.setClientID("sUnitTestClientID345674498");
              TopicSession topicSession = topicConnection.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE);
              Topic topic = topicSession.createTopic("topic.sample");
              TopicPublisher topicPublisher = topicSession.createPublisher(topic);
              TopicSubscriber topicSubscriber = topicSession.createDurableSubscriber(topic, aDurableSubscriber);
      
              TextMessage textMessage = topicSession.createTextMessage("aTextMessage");
              topicPublisher.publish(textMessage);
              topicConnection.start();
      
              TextMessage receivedTextMessage = (TextMessage) topicSubscriber.receive(3000);
              // don't ack this, important for the reproducer
      
              topicConnection.stop();
              topicSubscriber.close();
      
              topicSession.unsubscribe(aDurableSubscriber);
              topicSession.close();
      
              topicConnection.close();
          }
      }
      

      Output with PN_TRACE_FRM=1 env. variable set

      [...]
      [865446877:1] -> Attach{name='aDurableSubscriber', handle=1, role=RECEIVER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=Source{address='topic.sample', durable=UNSETTLED_STATE, expiryPolicy=NEVER, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=copy, filter=null, defaultOutcome=Modified{deliveryFailed=true, undeliverableHere=null, messageAnnotations=null}, outcomes=[amqp:accepted:list, amqp:rejected:list, amqp:released:list, amqp:modified:list], capabilities=[topic]}, target=Target{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=null, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties=null}
      [1421646142:1] <- Attach{name='aDurableSubscriber', handle=1, role=RECEIVER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=Source{address='topic.sample', durable=UNSETTLED_STATE, expiryPolicy=NEVER, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=copy, filter=null, defaultOutcome=Modified{deliveryFailed=true, undeliverableHere=null, messageAnnotations=null}, outcomes=[amqp:accepted:list, amqp:rejected:list, amqp:released:list, amqp:modified:list], capabilities=[topic]}, target=Target{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=null, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties=null}
      [1421646142:1] -> Attach{name='aDurableSubscriber', handle=1, role=SENDER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=Source{address='topic.sample', durable=UNSETTLED_STATE, expiryPolicy=NEVER, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=copy, filter=null, defaultOutcome=Modified{deliveryFailed=true, undeliverableHere=null, messageAnnotations=null}, outcomes=[amqp:accepted:list, amqp:rejected:list, amqp:released:list, amqp:modified:list], capabilities=[topic]}, target=Target{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=0, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties=null}
      [865446877:1] <- Attach{name='aDurableSubscriber', handle=1, role=SENDER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=Source{address='topic.sample', durable=UNSETTLED_STATE, expiryPolicy=NEVER, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=copy, filter=null, defaultOutcome=Modified{deliveryFailed=true, undeliverableHere=null, messageAnnotations=null}, outcomes=[amqp:accepted:list, amqp:rejected:list, amqp:released:list, amqp:modified:list], capabilities=[topic]}, target=Target{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=0, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties=null}
      [865446877:1] -> Flow{nextIncomingId=1, incomingWindow=2047, nextOutgoingId=1, outgoingWindow=2147483647, handle=1, deliveryCount=0, linkCredit=1000, available=null, drain=false, echo=false, properties=null}
      [1421646142:1] <- Flow{nextIncomingId=1, incomingWindow=2047, nextOutgoingId=1, outgoingWindow=2147483647, handle=1, deliveryCount=0, linkCredit=1000, available=null, drain=false, echo=false, properties=null}
      [865446877:1] -> Transfer{handle=0, deliveryId=0, deliveryTag=\x00, messageFormat=0, settled=false, more=false, rcvSettleMode=null, state=null, resume=false, aborted=false, batchable=false} (161) "\x00Sp\xc0\x02\x01A\x00Sr\xc1)\x04\xa3\x0ex-opt-jms-destQ\x01\xa3\x12x-opt-jms-msg-typeQ\x05\x00Ss\xd0\x00\x00\x00S\x00\x00\x00\x0a\xa1/ID:fbf39778-494a-4973-a908-0bf97f37a260:1:1:1-1@\xa1\x0ctopic.sample@@@@@@\x83\x00\x00\x01l\x04t\xc1y\x00Sw\xa1\x0caTextMessage"
      [1421646142:1] <- Transfer{handle=0, deliveryId=0, deliveryTag=\x00, messageFormat=0, settled=false, more=false, rcvSettleMode=null, state=null, resume=false, aborted=false, batchable=false} (161) "\x00Sp\xc0\x02\x01A\x00Sr\xc1)\x04\xa3\x0ex-opt-jms-destQ\x01\xa3\x12x-opt-jms-msg-typeQ\x05\x00Ss\xd0\x00\x00\x00S\x00\x00\x00\x0a\xa1/ID:fbf39778-494a-4973-a908-0bf97f37a260:1:1:1-1@\xa1\x0ctopic.sample@@@@@@\x83\x00\x00\x01l\x04t\xc1y\x00Sw\xa1\x0caTextMessage"
      [1421646142:1] -> Transfer{handle=1, deliveryId=0, deliveryTag=\x00, messageFormat=0, settled=false, more=false, rcvSettleMode=null, state=null, resume=false, aborted=false, batchable=false} (161) "\x00Sp\xc0\x02\x01A\x00Sr\xc1)\x04\xa3\x0ex-opt-jms-destQ\x01\xa3\x12x-opt-jms-msg-typeQ\x05\x00Ss\xd0\x00\x00\x00S\x00\x00\x00\x0a\xa1/ID:fbf39778-494a-4973-a908-0bf97f37a260:1:1:1-1@\xa1\x0ctopic.sample@@@@@@\x83\x00\x00\x01l\x04t\xc1y\x00Sw\xa1\x0caTextMessage"
      [1421646142:1] -> Disposition{role=RECEIVER, first=0, last=0, settled=true, state=Accepted{}, batchable=false}
      [865446877:1] <- Transfer{handle=1, deliveryId=0, deliveryTag=\x00, messageFormat=0, settled=false, more=false, rcvSettleMode=null, state=null, resume=false, aborted=false, batchable=false} (161) "\x00Sp\xc0\x02\x01A\x00Sr\xc1)\x04\xa3\x0ex-opt-jms-destQ\x01\xa3\x12x-opt-jms-msg-typeQ\x05\x00Ss\xd0\x00\x00\x00S\x00\x00\x00\x0a\xa1/ID:fbf39778-494a-4973-a908-0bf97f37a260:1:1:1-1@\xa1\x0ctopic.sample@@@@@@\x83\x00\x00\x01l\x04t\xc1y\x00Sw\xa1\x0caTextMessage"
      [865446877:1] <- Disposition{role=RECEIVER, first=0, last=0, settled=true, state=Accepted{}, batchable=false}
      [865446877:1] -> Flow{nextIncomingId=2, incomingWindow=2047, nextOutgoingId=2, outgoingWindow=2147483647, handle=1, deliveryCount=1, linkCredit=999, available=null, drain=true, echo=false, properties=null}
      [1421646142:1] <- Flow{nextIncomingId=2, incomingWindow=2047, nextOutgoingId=2, outgoingWindow=2147483647, handle=1, deliveryCount=1, linkCredit=999, available=null, drain=true, echo=false, properties=null}
      [1421646142:1] -> Flow{nextIncomingId=2, incomingWindow=2147483647, nextOutgoingId=2, outgoingWindow=2147483647, handle=1, deliveryCount=1000, linkCredit=0, available=null, drain=true, echo=false, properties=null}
      [865446877:1] <- Flow{nextIncomingId=2, incomingWindow=2147483647, nextOutgoingId=2, outgoingWindow=2147483647, handle=1, deliveryCount=1000, linkCredit=0, available=null, drain=true, echo=false, properties=null}
      [865446877:0] -> Attach{name='aDurableSubscriber', handle=0, role=RECEIVER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=null, target=Target{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=null, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties=null}
      [1421646142:0] <- Attach{name='aDurableSubscriber', handle=0, role=RECEIVER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=null, target=Target{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=null, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties=null}
      [1421646142:0] -> Attach{name='aDurableSubscriber', handle=0, role=SENDER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=null, target=Target{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=0, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties=null}
      [1421646142:0] -> Detach{handle=0, closed=true, error=Error{condition=amqp:internal-error, description='AMQ119005: error creating consumer, AMQ229200: Maximum Consumer Limit Reached on Queue:(address=topic.sample,queue=sUnitTestClientID345674498.aDurableSubscriber)', info=null}}
      [865446877:0] <- Attach{name='aDurableSubscriber', handle=0, role=SENDER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=null, target=Target{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=0, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties=null}
      [865446877:0] <- Detach{handle=0, closed=true, error=Error{condition=amqp:internal-error, description='AMQ119005: error creating consumer, AMQ229200: Maximum Consumer Limit Reached on Queue:(address=topic.sample,queue=sUnitTestClientID345674498.aDurableSubscriber)', info=null}}
      [865446877:0] -> Detach{handle=0, closed=true, error=null}
      [1421646142:0] <- Detach{handle=0, closed=true, error=null}
      [1421646142:0] -> Close{error=Error{condition=amqp:connection:forced, description='null', info=null}}
      [865446877:0] <- Close{error=Error{condition=amqp:connection:forced, description='null', info=null}}
      [865446877:0] -> Close{error=null}
      

      Attachments

        Activity

          People

            rh-ee-ataylor Andy Taylor
            jdanek@redhat.com Jiri Daněk
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: