Details
-
Story
-
Resolution: Won't Do
-
Minor
-
None
-
AMQ 7.4.0.CR2
-
None
Description
AMQP 1.0 says
[...] link can be globally identified by the (ordered) tuple (A,B,<name>). Consequently, a link can only be active in one connection at a time. If an attempt is made to attach the link subsequently when it is not suspended, then the link can be 'stolen', i.e., the second attach succeeds and the first attach MUST then be closed with a link error of stolen. This behavior ensures that in the event of a connection failure occurring and being noticed by one party, that re-establishment has the desired effect.
http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#doc-idp298752
This is not what happens. First attach is not closed with a link error of stolen, instead, if there is a consumer limit set, the broker disallows the second attach with AMQ119005: error creating consumer, AMQ229200: Maximum Consumer Limit Reached on Queue.
This can be reproduced by the following broker test, which is taken from ENTMQCL-1540, a bug that can be considered as being caused by this one. The test below passes for Core JMS, and fails with Qpid JMS.
package org.apache.activemq.artemis.tests.integration.crossprotocol; import org.apache.activemq.artemis.tests.integration.openwire.OpenWireTestBase; import org.apache.activemq.artemis.tests.util.Wait; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import javax.jms.JMSException; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import java.util.Arrays; import static org.apache.activemq.artemis.tests.util.CFUtil.createTopicConnectionFactory; @RunWith(Parameterized.class) public class UnsubscribeDurableSubTest extends OpenWireTestBase { private String protocol; private TopicConnectionFactory connectionFactory; public UnsubscribeDurableSubTest(String protocol) { this.protocol = protocol; } @Parameterized.Parameters(name = "protocol={0}") public static Iterable<Object[]> data() { return Arrays.asList(new Object[][]{ {"OPENWIRE"}, {"CORE"}, {"AMQP"}, }); } @Before public void setupCF() { connectionFactory = createTopicConnectionFactory(protocol, urlString); } @Before public void setupServer() throws Exception { Wait.assertTrue(server::isStarted); Wait.assertTrue(server::isActive); } @Test public void testUnsubscribingWithUnackedMessage() throws JMSException { final String aDurableSubscriber = "aDurableSubscriber"; TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) connectionFactory; TopicConnection topicConnection = topicConnectionFactory.createTopicConnection(); topicConnection.setClientID("sUnitTestClientID345674498"); TopicSession topicSession = topicConnection.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE); Topic topic = topicSession.createTopic("topic.sample"); TopicPublisher topicPublisher = topicSession.createPublisher(topic); TopicSubscriber topicSubscriber = topicSession.createDurableSubscriber(topic, aDurableSubscriber); TextMessage textMessage = topicSession.createTextMessage("aTextMessage"); topicPublisher.publish(textMessage); topicConnection.start(); TextMessage receivedTextMessage = (TextMessage) topicSubscriber.receive(3000); // don't ack this, important for the reproducer topicConnection.stop(); topicSubscriber.close(); topicSession.unsubscribe(aDurableSubscriber); topicSession.close(); topicConnection.close(); } }
Output with PN_TRACE_FRM=1 env. variable set
[...] [865446877:1] -> Attach{name='aDurableSubscriber', handle=1, role=RECEIVER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=Source{address='topic.sample', durable=UNSETTLED_STATE, expiryPolicy=NEVER, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=copy, filter=null, defaultOutcome=Modified{deliveryFailed=true, undeliverableHere=null, messageAnnotations=null}, outcomes=[amqp:accepted:list, amqp:rejected:list, amqp:released:list, amqp:modified:list], capabilities=[topic]}, target=Target{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=null, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties=null} [1421646142:1] <- Attach{name='aDurableSubscriber', handle=1, role=RECEIVER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=Source{address='topic.sample', durable=UNSETTLED_STATE, expiryPolicy=NEVER, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=copy, filter=null, defaultOutcome=Modified{deliveryFailed=true, undeliverableHere=null, messageAnnotations=null}, outcomes=[amqp:accepted:list, amqp:rejected:list, amqp:released:list, amqp:modified:list], capabilities=[topic]}, target=Target{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=null, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties=null} [1421646142:1] -> Attach{name='aDurableSubscriber', handle=1, role=SENDER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=Source{address='topic.sample', durable=UNSETTLED_STATE, expiryPolicy=NEVER, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=copy, filter=null, defaultOutcome=Modified{deliveryFailed=true, undeliverableHere=null, messageAnnotations=null}, outcomes=[amqp:accepted:list, amqp:rejected:list, amqp:released:list, amqp:modified:list], capabilities=[topic]}, target=Target{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=0, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties=null} [865446877:1] <- Attach{name='aDurableSubscriber', handle=1, role=SENDER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=Source{address='topic.sample', durable=UNSETTLED_STATE, expiryPolicy=NEVER, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=copy, filter=null, defaultOutcome=Modified{deliveryFailed=true, undeliverableHere=null, messageAnnotations=null}, outcomes=[amqp:accepted:list, amqp:rejected:list, amqp:released:list, amqp:modified:list], capabilities=[topic]}, target=Target{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=0, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties=null} [865446877:1] -> Flow{nextIncomingId=1, incomingWindow=2047, nextOutgoingId=1, outgoingWindow=2147483647, handle=1, deliveryCount=0, linkCredit=1000, available=null, drain=false, echo=false, properties=null} [1421646142:1] <- Flow{nextIncomingId=1, incomingWindow=2047, nextOutgoingId=1, outgoingWindow=2147483647, handle=1, deliveryCount=0, linkCredit=1000, available=null, drain=false, echo=false, properties=null} [865446877:1] -> Transfer{handle=0, deliveryId=0, deliveryTag=\x00, messageFormat=0, settled=false, more=false, rcvSettleMode=null, state=null, resume=false, aborted=false, batchable=false} (161) "\x00Sp\xc0\x02\x01A\x00Sr\xc1)\x04\xa3\x0ex-opt-jms-destQ\x01\xa3\x12x-opt-jms-msg-typeQ\x05\x00Ss\xd0\x00\x00\x00S\x00\x00\x00\x0a\xa1/ID:fbf39778-494a-4973-a908-0bf97f37a260:1:1:1-1@\xa1\x0ctopic.sample@@@@@@\x83\x00\x00\x01l\x04t\xc1y\x00Sw\xa1\x0caTextMessage" [1421646142:1] <- Transfer{handle=0, deliveryId=0, deliveryTag=\x00, messageFormat=0, settled=false, more=false, rcvSettleMode=null, state=null, resume=false, aborted=false, batchable=false} (161) "\x00Sp\xc0\x02\x01A\x00Sr\xc1)\x04\xa3\x0ex-opt-jms-destQ\x01\xa3\x12x-opt-jms-msg-typeQ\x05\x00Ss\xd0\x00\x00\x00S\x00\x00\x00\x0a\xa1/ID:fbf39778-494a-4973-a908-0bf97f37a260:1:1:1-1@\xa1\x0ctopic.sample@@@@@@\x83\x00\x00\x01l\x04t\xc1y\x00Sw\xa1\x0caTextMessage" [1421646142:1] -> Transfer{handle=1, deliveryId=0, deliveryTag=\x00, messageFormat=0, settled=false, more=false, rcvSettleMode=null, state=null, resume=false, aborted=false, batchable=false} (161) "\x00Sp\xc0\x02\x01A\x00Sr\xc1)\x04\xa3\x0ex-opt-jms-destQ\x01\xa3\x12x-opt-jms-msg-typeQ\x05\x00Ss\xd0\x00\x00\x00S\x00\x00\x00\x0a\xa1/ID:fbf39778-494a-4973-a908-0bf97f37a260:1:1:1-1@\xa1\x0ctopic.sample@@@@@@\x83\x00\x00\x01l\x04t\xc1y\x00Sw\xa1\x0caTextMessage" [1421646142:1] -> Disposition{role=RECEIVER, first=0, last=0, settled=true, state=Accepted{}, batchable=false} [865446877:1] <- Transfer{handle=1, deliveryId=0, deliveryTag=\x00, messageFormat=0, settled=false, more=false, rcvSettleMode=null, state=null, resume=false, aborted=false, batchable=false} (161) "\x00Sp\xc0\x02\x01A\x00Sr\xc1)\x04\xa3\x0ex-opt-jms-destQ\x01\xa3\x12x-opt-jms-msg-typeQ\x05\x00Ss\xd0\x00\x00\x00S\x00\x00\x00\x0a\xa1/ID:fbf39778-494a-4973-a908-0bf97f37a260:1:1:1-1@\xa1\x0ctopic.sample@@@@@@\x83\x00\x00\x01l\x04t\xc1y\x00Sw\xa1\x0caTextMessage" [865446877:1] <- Disposition{role=RECEIVER, first=0, last=0, settled=true, state=Accepted{}, batchable=false} [865446877:1] -> Flow{nextIncomingId=2, incomingWindow=2047, nextOutgoingId=2, outgoingWindow=2147483647, handle=1, deliveryCount=1, linkCredit=999, available=null, drain=true, echo=false, properties=null} [1421646142:1] <- Flow{nextIncomingId=2, incomingWindow=2047, nextOutgoingId=2, outgoingWindow=2147483647, handle=1, deliveryCount=1, linkCredit=999, available=null, drain=true, echo=false, properties=null} [1421646142:1] -> Flow{nextIncomingId=2, incomingWindow=2147483647, nextOutgoingId=2, outgoingWindow=2147483647, handle=1, deliveryCount=1000, linkCredit=0, available=null, drain=true, echo=false, properties=null} [865446877:1] <- Flow{nextIncomingId=2, incomingWindow=2147483647, nextOutgoingId=2, outgoingWindow=2147483647, handle=1, deliveryCount=1000, linkCredit=0, available=null, drain=true, echo=false, properties=null} [865446877:0] -> Attach{name='aDurableSubscriber', handle=0, role=RECEIVER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=null, target=Target{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=null, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties=null} [1421646142:0] <- Attach{name='aDurableSubscriber', handle=0, role=RECEIVER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=null, target=Target{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=null, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties=null} [1421646142:0] -> Attach{name='aDurableSubscriber', handle=0, role=SENDER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=null, target=Target{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=0, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties=null} [1421646142:0] -> Detach{handle=0, closed=true, error=Error{condition=amqp:internal-error, description='AMQ119005: error creating consumer, AMQ229200: Maximum Consumer Limit Reached on Queue:(address=topic.sample,queue=sUnitTestClientID345674498.aDurableSubscriber)', info=null}} [865446877:0] <- Attach{name='aDurableSubscriber', handle=0, role=SENDER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=null, target=Target{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=0, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties=null} [865446877:0] <- Detach{handle=0, closed=true, error=Error{condition=amqp:internal-error, description='AMQ119005: error creating consumer, AMQ229200: Maximum Consumer Limit Reached on Queue:(address=topic.sample,queue=sUnitTestClientID345674498.aDurableSubscriber)', info=null}} [865446877:0] -> Detach{handle=0, closed=true, error=null} [1421646142:0] <- Detach{handle=0, closed=true, error=null} [1421646142:0] -> Close{error=Error{condition=amqp:connection:forced, description='null', info=null}} [865446877:0] <- Close{error=Error{condition=amqp:connection:forced, description='null', info=null}} [865446877:0] -> Close{error=null}