-
Bug
-
Resolution: Done
-
Major
-
None
When using the mqtt-virtual-topic-subscriptions to map MQTT subscriptions to virtual-topic consumer queues in a network of brokers, we have seen that when under a load, unsubscribing MQTT subscribers can result in an Async Error, resulting in a restart of the Network Bridge:
2020-08-13 08:24:29,662 | WARN | pool-16-thread-1 | Service | ivemq.broker.TransportConnection 304 | 162 - org.apache.activemq.activemq-osgi - 5.11.0.redhat-630377-10 | Async error occurred: javax.jms.JMSException: Destination still has an active subscription: queue://Consumer.340601550021719:AT_LEAST_ONCE.VirtualTopic.TEST.1.>2020-08-13 08:24:29,663 | INFO | Task-310 | DemandForwardingBridgeSupport | rk.DemandForwardingBridgeSupport 908 | 162 - org.apache.activemq.activemq-osgi - 5.11.0.redhat-630377-10 | Network connection between vm://AMQ1 and ssl:///10.0.0.102:62021 shutdown due to a local error: javax.jms.JMSException: Destination still has an active subscription: queue://Consumer.340601550021719:AT_LEAST_ONCE.VirtualTopic.TEST.1.>
This appears to be intentional behavior on the part of the bridge in response to an unhandled error:
public void serviceLocalException(MessageDispatch messageDispatch, Throwable error) { LOG.trace("serviceLocalException: disposed {} ex", disposed.get(), error); if (!disposed.get()) { if (error instanceof DestinationDoesNotExistException && ((DestinationDoesNotExistException) error).isTemporary()) { // not a reason to terminate the bridge - temps can disappear with // pending sends as the demand sub may outlive the remote dest if (messageDispatch != null) { LOG.warn("PoisonAck of {} on forwarding error: {}", messageDispatch.getMessage().getMessageId(), error); try { MessageAck poisonAck = new MessageAck(messageDispatch, MessageAck.POSION_ACK_TYPE, 1); poisonAck.setPoisonCause(error); localBroker.oneway(poisonAck); } catch (IOException ioe) { LOG.error("Failed to posion ack message following forward failure: ", ioe); } fireFailedForwardAdvisory(messageDispatch, error); } else { LOG.warn("Ignoring exception on forwarding to non existent temp dest: ", error); } return; } LOG.info("Network connection between {} and {} shutdown due to a local error: {}", new Object[]{localBroker, remoteBroker, error}); LOG.debug("The local Exception was: {}", error, error); brokerService.getTaskRunnerFactory().execute(new Runnable() { @Override public void run() { ServiceSupport.dispose(getControllingService()); } }); fireBridgeFailed(error); } }
And the error is thrown when trying to remove a queue associated with a mapped subscription:
@Override public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { // No timeout.. then try to shut down right way, fails if there are // current subscribers. if (timeout == 0) { for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) { Subscription sub = iter.next(); if (sub.matches(destination) ) { throw new JMSException("Destination still has an active subscription: " + destination); } } } if (timeout > 0) { // TODO: implement a way to notify the subscribers that we want to // take the down // the destination and that they should un-subscribe.. Then wait up // to timeout time before // dropping the subscription. } LOG.debug("{} removing destination: {}", broker.getBrokerName(), destination); destinationsLock.writeLock().lock(); try { Destination dest = destinations.remove(destination); if (dest != null) { // timeout<0 or we timed out, we now force any remaining // subscriptions to un-subscribe. for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) { Subscription sub = iter.next(); if (sub.matches(destination)) { dest.removeSubscription(context, sub, 0l); } } destinationMap.unsynchronizedRemove(destination, dest); dispose(context, dest); DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor(); if (destinationInterceptor != null) { destinationInterceptor.remove(dest); } } else { LOG.debug("Cannot remove a destination that doesn't exist: {}", destination); } } finally { destinationsLock.writeLock().unlock(); } }
I would guess the error on removal might have something to do with a bridge consumer that is still connected, but still investigating.