In org.jgroups.protocols.Executing, There is a race condition between the handling of ExecutorEvent.TASK_SUBMIT in down() and handleConsumerFoundResponse().
This code in handleConsumerFoundResponse() can throw an NPE:
final Long requestId = _requestId.get(runnable); owner = new Owner(address, requestId);
The problem happens when there is no value in the _requestId map for the given runnable: An NPE is thrown when the null requestId is unboxed as it is passed to the constructor of Owner.
We have seen this exception just twice in a test system where many hundreds of millions of items have been processed, so it's pretty rare.
Looking at the code, the most likely scenario under which this can happen is:
- A node has an empty queue of runnables
- It receives a CONSUMER_FOUND message (discussed below)
- Just as it does this, a new runnable is enqueued in down(). Then, before the new requestId is added to the map...
- handleConsumerFoundResponse() gets the runnable from the queue, but finds no requestId in the map
It seems like it should not be possible for a node to receive a CONSUMER_FOUND when it has no runnables (since it hasn't asked for any) but the code does handle this:
handleConsumerFoundResponse() says:
if (runnable == null) { owner = new Owner(address, threadId); // For some reason we don't have a runnable anymore // so we have to send back to the coordinator that // the consumer is still available. The runnable // would be removed on a cancel sendToCoordinator(Type.CONSUMER_READY, owner.getRequestId(), owner.getAddress()); }
We do not believe any tasks were cancelled in the cases where we have seen the NPE, so perhaps there is some other way it can happen.
In any case the solution should be quite simple: re-order the additions of the runnable to the queue and the requestId to the map:
case ExecutorEvent.TASK_SUBMIT: Runnable runnable = (Runnable)evt.getArg(); _awaitingConsumer.add(runnable); // We are limited to a number of concurrent request id's // equal to 2^63-1. This is quite large and if it // overflows it will still be positive long requestId = Math.abs(counter.getAndIncrement()); if(requestId == Long.MIN_VALUE) { counter.set(0); requestId = Math.abs(counter.getAndIncrement()); } _requestId.put(runnable, requestId);
We're seeing the problem in 3.2.7, but the code appears to the be the same in the master branch.