-
Bug
-
Resolution: Done
-
Major
-
None
When using the mqtt-virtual-topic-subscriptions to map MQTT subscriptions to virtual-topic consumer queues in a network of brokers, we have seen that when under a load, unsubscribing MQTT subscribers can result in an Async Error, resulting in a restart of the Network Bridge:
2020-08-13 08:24:29,662 | WARN | pool-16-thread-1 | Service | ivemq.broker.TransportConnection 304 | 162 - org.apache.activemq.activemq-osgi - 5.11.0.redhat-630377-10 | Async error occurred: javax.jms.JMSException: Destination still has an active subscription: queue://Consumer.340601550021719:AT_LEAST_ONCE.VirtualTopic.TEST.1.>2020-08-13 08:24:29,663 | INFO | Task-310 | DemandForwardingBridgeSupport | rk.DemandForwardingBridgeSupport 908 | 162 - org.apache.activemq.activemq-osgi - 5.11.0.redhat-630377-10 | Network connection between vm://AMQ1 and ssl:///10.0.0.102:62021 shutdown due to a local error: javax.jms.JMSException: Destination still has an active subscription: queue://Consumer.340601550021719:AT_LEAST_ONCE.VirtualTopic.TEST.1.>
This appears to be intentional behavior on the part of the bridge in response to an unhandled error:
public void serviceLocalException(MessageDispatch messageDispatch, Throwable error) {
LOG.trace("serviceLocalException: disposed {} ex", disposed.get(), error);
if (!disposed.get()) {
if (error instanceof DestinationDoesNotExistException && ((DestinationDoesNotExistException) error).isTemporary()) {
// not a reason to terminate the bridge - temps can disappear with
// pending sends as the demand sub may outlive the remote dest
if (messageDispatch != null) {
LOG.warn("PoisonAck of {} on forwarding error: {}", messageDispatch.getMessage().getMessageId(), error);
try {
MessageAck poisonAck = new MessageAck(messageDispatch, MessageAck.POSION_ACK_TYPE, 1);
poisonAck.setPoisonCause(error);
localBroker.oneway(poisonAck);
} catch (IOException ioe) {
LOG.error("Failed to posion ack message following forward failure: ", ioe);
}
fireFailedForwardAdvisory(messageDispatch, error);
} else {
LOG.warn("Ignoring exception on forwarding to non existent temp dest: ", error);
}
return;
}
LOG.info("Network connection between {} and {} shutdown due to a local error: {}", new Object[]{localBroker, remoteBroker, error});
LOG.debug("The local Exception was: {}", error, error);
brokerService.getTaskRunnerFactory().execute(new Runnable() {
@Override
public void run() {
ServiceSupport.dispose(getControllingService());
}
});
fireBridgeFailed(error);
}
}
And the error is thrown when trying to remove a queue associated with a mapped subscription:
@Override
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout)
throws Exception {
// No timeout.. then try to shut down right way, fails if there are
// current subscribers.
if (timeout == 0) {
for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
Subscription sub = iter.next();
if (sub.matches(destination) ) {
throw new JMSException("Destination still has an active subscription: " + destination);
}
}
}
if (timeout > 0) {
// TODO: implement a way to notify the subscribers that we want to
// take the down
// the destination and that they should un-subscribe.. Then wait up
// to timeout time before
// dropping the subscription.
}
LOG.debug("{} removing destination: {}", broker.getBrokerName(), destination);
destinationsLock.writeLock().lock();
try {
Destination dest = destinations.remove(destination);
if (dest != null) {
// timeout<0 or we timed out, we now force any remaining
// subscriptions to un-subscribe.
for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
Subscription sub = iter.next();
if (sub.matches(destination)) {
dest.removeSubscription(context, sub, 0l);
}
}
destinationMap.unsynchronizedRemove(destination, dest);
dispose(context, dest);
DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
if (destinationInterceptor != null) {
destinationInterceptor.remove(dest);
}
} else {
LOG.debug("Cannot remove a destination that doesn't exist: {}", destination);
}
} finally {
destinationsLock.writeLock().unlock();
}
}
I would guess the error on removal might have something to do with a bridge consumer that is still connected, but still investigating.