Uploaded image for project: 'AMQ Broker'
  1. AMQ Broker
  2. ENTMQBR-2333

Temporary Queue Address Leak In Clustered Configuration

    XMLWordPrintable

Details

    • Bug
    • Resolution: Won't Do
    • Major
    • None
    • AMQ 7.2.2.GA, AMQ 7.2.3.GA
    • broker-core, clustering
    • None
    • Hide
      Previously, if you had a temporary queue replicated across cluster nodes, closing connections to the temporary queue on a given node did not purge the server resources associated with the queue, i.e., the queue had an address leak. This issue is resolved. You can now use the address-queue-scan-period parameter to specify how often the broker scans queues for potential deletion.
      Show
      Previously, if you had a temporary queue replicated across cluster nodes, closing connections to the temporary queue on a given node did not purge the server resources associated with the queue, i.e., the queue had an address leak. This issue is resolved. You can now use the address-queue-scan-period parameter to specify how often the broker scans queues for potential deletion.
    • Hide

      Attaching example broker.xml for 2-node cluster, along with a producer using temporary queues for reply addresses. Code looks like:

      connection = factory.createConnection();
                  connection.setClientID(clientPrefix + "." + destination + "-" + Integer.toString(threadNum));
                  connection.start();
                  if (transacted) {
                      session = connection.createSession(transacted, Session.SESSION_TRANSACTED);
                  } else {
                      session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
                  }
      
                  if (dynamic) {
                      producer = session.createProducer(null);
                  } else {
                      producer = session.createProducer(destination);
                  }
                  producer.setTimeToLive(ttl);
                  if (persistent) {
                      producer.setDeliveryMode(DeliveryMode.PERSISTENT);
                  } else {
                      producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                  }
      
                  if (replyTo) {
                      tempDest = session.createTemporaryQueue();
                      responseConsumer = session.createConsumer(tempDest);
                      responseConsumer.setMessageListener(this);
                  }
      ...
      
          @Override
          public void onMessage(Message message) {
              String messageText = null;
              try {
                  if (message instanceof TextMessage) {
                      TextMessage textMessage = (TextMessage) message;
                      messageText = textMessage.getText();
                      LOG.info("messageText = " + messageText);
                  }
              } catch (JMSException e) {
                  //Handle the exception appropriately
              }
          }
      

      On the consumer side, I tried using the same session and spinning up a producer to produce to the replyTo address, and also tried it with a new session for each response. Behavior was the same, either way:

                          Message message = consumer.receive(messageTimeoutMs);
      
                          if (message != null) {
                              msgsRecd++;
                              LOG.info("Thread {}: {} : Got message {}; total: {} path: {}.", this.threadNum, System.currentTimeMillis(), message.getJMSMessageID(), msgsRecd, ((ActiveMQMessage) message).getBrokerPath());
      
                              if (message.getJMSReplyTo() != null) {    
                                  replySession = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
                                  MessageProducer producer = replySession.createProducer(message.getJMSReplyTo());
                                  producer.setTimeToLive(1000);
                                  String response = "Received: " + message.getJMSMessageID();
                                  LOG.info("Sending Response: " + response);
                                  Message responseMsg = new ActiveMQTextMessage();
                                  ((ActiveMQTextMessage)responseMsg).setText(response);
                                  producer.send(responseMsg);
                                  producer.close(); 
                                  replySession.close();
                                  replySession = null;
                              }
      ...
      

      To reproduce the issue:

      1. Deploy a new AMQ 7.6.0 instance host1 and one to host2
      2. Copy the broker-node1.xml file to the host1/$AMQ_INSTANCE/etc/ directory
      3. Copy the broker-node2.xml file to the host2/$AMQ_INSTANCE/etc/ directory
      4. Update cluster configuration as needed for new environment
      5. Edit bootstrap.xml and jolokia-access.xml to enable remote access
      6. Extract threaded-consumer-2.1.0-SNAPSHOT.zip, edit jndi.properties java.naming.provider.url to point to your host2 instance
      7. Extract threaded-producer-2.1.0-SNAPSHOT.zip, edit jndi.properties java.naming.provider.url to point to your host1 instance
      8. Start the consumer application and verify that it connects to the host2 AMQ instance
      9. Start the producer application and verify it connects to the host1 instance
      10. Watch for the creation of temporary reply queues on each instance
      11. Interrupt the producer application with CTRL-C, leaving the consumer running
      12. Wait for > 30 seconds (I waited 20 minutes)
      13. Observe that temporary queues are not cleaned up, as they should be
      14. Interrupt consumer with CTRL-C
      15. After a few seconds temporary reply queues are now cleaned up

      Show
      Attaching example broker.xml for 2-node cluster, along with a producer using temporary queues for reply addresses. Code looks like: connection = factory.createConnection(); connection.setClientID(clientPrefix + "." + destination + "-" + Integer .toString(threadNum)); connection.start(); if (transacted) { session = connection.createSession(transacted, Session.SESSION_TRANSACTED); } else { session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); } if (dynamic) { producer = session.createProducer( null ); } else { producer = session.createProducer(destination); } producer.setTimeToLive(ttl); if (persistent) { producer.setDeliveryMode(DeliveryMode.PERSISTENT); } else { producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); } if (replyTo) { tempDest = session.createTemporaryQueue(); responseConsumer = session.createConsumer(tempDest); responseConsumer.setMessageListener( this ); } ... @Override public void onMessage(Message message) { String messageText = null ; try { if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; messageText = textMessage.getText(); LOG.info( "messageText = " + messageText); } } catch (JMSException e) { //Handle the exception appropriately } } On the consumer side, I tried using the same session and spinning up a producer to produce to the replyTo address, and also tried it with a new session for each response. Behavior was the same, either way: Message message = consumer.receive(messageTimeoutMs); if (message != null ) { msgsRecd++; LOG.info( " Thread {}: {} : Got message {}; total: {} path: {}." , this .threadNum, System .currentTimeMillis(), message.getJMSMessageID(), msgsRecd, ((ActiveMQMessage) message).getBrokerPath()); if (message.getJMSReplyTo() != null ) { replySession = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); MessageProducer producer = replySession.createProducer(message.getJMSReplyTo()); producer.setTimeToLive(1000); String response = "Received: " + message.getJMSMessageID(); LOG.info( "Sending Response: " + response); Message responseMsg = new ActiveMQTextMessage(); ((ActiveMQTextMessage)responseMsg).setText(response); producer.send(responseMsg); producer.close(); replySession.close(); replySession = null ; } ... To reproduce the issue: 1. Deploy a new AMQ 7.6.0 instance host1 and one to host2 2. Copy the broker-node1.xml file to the host1/$AMQ_INSTANCE/etc/ directory 3. Copy the broker-node2.xml file to the host2/$AMQ_INSTANCE/etc/ directory 4. Update cluster configuration as needed for new environment 5. Edit bootstrap.xml and jolokia-access.xml to enable remote access 6. Extract threaded-consumer-2.1.0-SNAPSHOT.zip, edit jndi.properties java.naming.provider.url to point to your host2 instance 7. Extract threaded-producer-2.1.0-SNAPSHOT.zip, edit jndi.properties java.naming.provider.url to point to your host1 instance 8. Start the consumer application and verify that it connects to the host2 AMQ instance 9. Start the producer application and verify it connects to the host1 instance 10. Watch for the creation of temporary reply queues on each instance 11. Interrupt the producer application with CTRL-C, leaving the consumer running 12. Wait for > 30 seconds (I waited 20 minutes) 13. Observe that temporary queues are not cleaned up, as they should be 14. Interrupt consumer with CTRL-C 15. After a few seconds temporary reply queues are now cleaned up

    Description

      When a producer with a replyTo temporary address configuration connects to one node in a broker cluster and the consumer / responder connects to another node, the temporary (reply) queue and address persist in the node where the consumer / responder is attached until the consumer exits (even if a separate session is opened and closed for every reply message). On the producer node, the temporary queues are deleted when the producer exits, but the addresses persist, even after a broker restart. In testing, I have found the following:

      1. In a single-node configuration, all works as expected. If the producer with the replyTo listener exits, then the address and queue are both cleaned up, regardless of whether the consumer / responder has existed or is still listening.

      3. In a clustered configuration with the producer and consumer on the same node, behavior is as with a single-node

      2. In a clustered configuration (2 nodes, static connectors) with the producer and consumer connected to separate nodes, when the producer exits, I see the address remaining on the producer / replyTo node and the replyTo address and queue remain on the consumer node. When the consumer / responder exits, I do see the address and queue cleaned up on the consumer node, but the replyTo address remains on the producer node. The address persists, even after the broker is restarted.

      Attachments

        1. broker-node1.xml
          9 kB
        2. broker-node1.xml
          9 kB
        3. broker-node2.xml
          9 kB
        4. broker-node2.xml
          9 kB
        5. threaded-consumer-2.1.0-SNAPSHOT.zip
          24.84 MB
        6. threaded-consumer-2.1.0-SNAPSHOT-bin.zip
          24.87 MB
        7. threaded-producer-2.1.0-SNAPSHOT.zip
          18.91 MB
        8. threaded-producer-2.1.0-SNAPSHOT-bin.zip
          18.94 MB

        Activity

          People

            dbruscin Domenico Francesco Bruscino
            rhn-support-dhawkins Duane Hawkins
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: