Uploaded image for project: 'Infinispan'
  1. Infinispan
  2. ISPN-1541

NotifyingNotifiableFuture's FutureListener can not invoke Future API on FutureListener#futureDone callback

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Done
    • Icon: Major Major
    • 5.1.0.CR1
    • 5.0.1.FINAL, 5.1.0.BETA4
    • Core
    • None

      Invoking Future#get from a FutureListener attached to a NotifyingNotifiableFuture passed as a parameter to RpcManager#invokeRemotelyInFuture throws InterruptedException! Sounds more complicated than it really is! In another words, the following code throws InterruptedException

      NotifyingFutureImpl f = ...
            f.attachListener(new FutureListener<Object>() {
               
               @Override
               public void futureDone(Future<Object> future) {
                  try {      
                     future.get();
                  } catch (Exception e) {   
                     e.printStackTrace();
                  }             
               }
            });
            CommandsFactory cf = cache1.getAdvancedCache().getComponentRegistry().getComponent(CommandsFactory.class);
            cache1.getAdvancedCache().getRpcManager().invokeRemotelyInFuture(null, cf.buildPutKeyValueCommand("k","v", -1, -1, null), f);
      

      the reason why we get InterruptedException is that we invoke notifyDone which in turn invokes futureDone from the same thread invoking the future's callable. In effect we invoke future#get on the same the same thread that is running the callable for the associated future!

       public final void invokeRemotelyInFuture(final Collection<Address> recipients, final ReplicableCommand rpc, final boolean usePriorityQueue, final NotifyingNotifiableFuture<Object> l, final long timeout) {
            if (trace) log.tracef("%s invoking in future call %s to recipient list %s", t.getAddress(), rpc, recipients);
            final CountDownLatch futureSet = new CountDownLatch(1);
            Callable<Object> c = new Callable<Object>() {
               public Object call() throws Exception {
                  Object result = null;
                  try {
                     result = invokeRemotely(recipients, rpc, true, usePriorityQueue, timeout);
                  } finally {
                     try {
                        futureSet.await();
                     } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                     } finally {
                        l.notifyDone();
                     }
                  }
                  return result;
               }
            };
            l.setNetworkFuture(asyncExecutor.submit(c));
            futureSet.countDown();      
         }
      

      The solution is to wait for callable to finish and invoke notifyDone from the other thread. We have two choices here:

      a) invoke notifyDone from the same thread invoking invokeRemotelyInFuture. This in turn changes invokeRemotelyInFuture to be less async than it should be

      b) invoke callback on another thread

      public final void invokeRemotelyInFuture(final Collection<Address> recipients,
                  final ReplicableCommand rpc, final boolean usePriorityQueue,
                  final NotifyingNotifiableFuture<Object> l, final long timeout) {
            if (trace)
               log.tracef("%s invoking in future call %s to recipient list %s", t.getAddress(), rpc,
                        recipients);
            final CountDownLatch callableCompleted = new CountDownLatch(1);
            Callable<Object> c = new Callable<Object>() {
               public Object call() throws Exception {
                  Object result = null;
                  try {
                     result = invokeRemotely(recipients, rpc, true, usePriorityQueue, timeout);
                  } finally {
                     callableCompleted.countDown();
                  }
                  return result;
               }
            };
            l.setNetworkFuture(asyncExecutor.submit(c));
            asyncExecutor.submit(new Runnable() {
               
               @Override
               public void run() {
                  try {
                     callableCompleted.await();
                  } catch (Exception e) {
                  } finally {
                     l.notifyDone();
                  }       
               }
            });     
         }
      

      We can of course, leave things as they are right now but in such case we have to warn the implementer of FutureListener that he/she can not call future.get() from a FutureListener!

              vblagoje Vladimir Blagojevic (Inactive)
              vblagoje Vladimir Blagojevic (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

                Created:
                Updated:
                Resolved: