Uploaded image for project: 'JBoss A-MQ'
  1. JBoss A-MQ
  2. ENTMQ-2405

Possible Race Condition When Unsubscribing MQTT Clients in a Network of Brokers

XMLWordPrintable

    • Hide

      TBD:

      Issue occurs in a network of brokers, with many consumer subscribing with setCleanSession(false), then unsubscribing and resubscribing to drop the associated queues.

      Show
      TBD: Issue occurs in a network of brokers, with many consumer subscribing with setCleanSession(false), then unsubscribing and resubscribing to drop the associated queues.

      When using the mqtt-virtual-topic-subscriptions to map MQTT subscriptions to virtual-topic consumer queues in a network of brokers, we have seen that when under a load, unsubscribing MQTT subscribers can result in an Async Error, resulting in a restart of the Network Bridge:

      2020-08-13 08:24:29,662 | WARN  | pool-16-thread-1 | Service                          | ivemq.broker.TransportConnection  304 | 162 - org.apache.activemq.activemq-osgi - 5.11.0.redhat-630377-10 | Async error occurred: javax.jms.JMSException: Destination still has an active subscription: queue://Consumer.340601550021719:AT_LEAST_ONCE.VirtualTopic.TEST.1.>2020-08-13 08:24:29,663 | INFO  | Task-310 | DemandForwardingBridgeSupport    | rk.DemandForwardingBridgeSupport  908 | 162 - org.apache.activemq.activemq-osgi - 5.11.0.redhat-630377-10 | Network connection between vm://AMQ1 and ssl:///10.0.0.102:62021 shutdown due to a local error: javax.jms.JMSException: Destination still has an active subscription: queue://Consumer.340601550021719:AT_LEAST_ONCE.VirtualTopic.TEST.1.>
       

      This appears to be intentional behavior on the part of the bridge in response to an unhandled error:

          public void serviceLocalException(MessageDispatch messageDispatch, Throwable error) {
              LOG.trace("serviceLocalException: disposed {} ex", disposed.get(), error);
              if (!disposed.get()) {
                  if (error instanceof DestinationDoesNotExistException && ((DestinationDoesNotExistException) error).isTemporary()) {
                      // not a reason to terminate the bridge - temps can disappear with
                      // pending sends as the demand sub may outlive the remote dest
                      if (messageDispatch != null) {
                          LOG.warn("PoisonAck of {} on forwarding error: {}", messageDispatch.getMessage().getMessageId(), error);
                          try {
                              MessageAck poisonAck = new MessageAck(messageDispatch, MessageAck.POSION_ACK_TYPE, 1);
                              poisonAck.setPoisonCause(error);
                              localBroker.oneway(poisonAck);
                          } catch (IOException ioe) {
                              LOG.error("Failed to posion ack message following forward failure: ", ioe);
                          }
                          fireFailedForwardAdvisory(messageDispatch, error);
                      } else {
                          LOG.warn("Ignoring exception on forwarding to non existent temp dest: ", error);
                      }
                      return;
                  }
      
                  LOG.info("Network connection between {} and {} shutdown due to a local error: {}", new Object[]{localBroker, remoteBroker, error});
                  LOG.debug("The local Exception was: {}", error, error);
      
                  brokerService.getTaskRunnerFactory().execute(new Runnable() {
                      @Override
                      public void run() {
                          ServiceSupport.dispose(getControllingService());
                      }
                  });
                  fireBridgeFailed(error);
              }
          }
      

      And the error is thrown when trying to remove a queue associated with a mapped subscription:

          @Override
          public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout)
                  throws Exception {
      
              // No timeout.. then try to shut down right way, fails if there are
              // current subscribers.
              if (timeout == 0) {
                  for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
                      Subscription sub = iter.next();
                      if (sub.matches(destination) ) {
                          throw new JMSException("Destination still has an active subscription: " + destination);
                      }
                  }
              }
      
              if (timeout > 0) {
                  // TODO: implement a way to notify the subscribers that we want to
                  // take the down
                  // the destination and that they should un-subscribe.. Then wait up
                  // to timeout time before
                  // dropping the subscription.
              }
      
              LOG.debug("{} removing destination: {}", broker.getBrokerName(), destination);
      
              destinationsLock.writeLock().lock();
              try {
                  Destination dest = destinations.remove(destination);
                  if (dest != null) {
                      // timeout<0 or we timed out, we now force any remaining
                      // subscriptions to un-subscribe.
                      for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
                          Subscription sub = iter.next();
                          if (sub.matches(destination)) {
                              dest.removeSubscription(context, sub, 0l);
                          }
                      }
                      destinationMap.unsynchronizedRemove(destination, dest);
                      dispose(context, dest);
                      DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
                      if (destinationInterceptor != null) {
                          destinationInterceptor.remove(dest);
                      }
      
                  } else {
                      LOG.debug("Cannot remove a destination that doesn't exist: {}", destination);
                  }
              } finally {
                  destinationsLock.writeLock().unlock();
              }
          }
      

      I would guess the error on removal might have something to do with a bridge consumer that is still connected, but still investigating.

              gtully@redhat.com Gary Tully
              rhn-support-dhawkins Duane Hawkins
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

                Created:
                Updated:
                Resolved: