Index: core/src/main/java/org/infinispan/transaction/xa/DeadlockDetectingGlobalTransaction.java =================================================================== --- core/src/main/java/org/infinispan/transaction/xa/DeadlockDetectingGlobalTransaction.java (revision 1521) +++ core/src/main/java/org/infinispan/transaction/xa/DeadlockDetectingGlobalTransaction.java (revision ) @@ -9,6 +9,7 @@ import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; +import java.util.Collection; import java.util.Collections; import java.util.Set; @@ -32,9 +33,8 @@ private volatile boolean isMarkedForRollback; - private transient volatile Object lockIntention; + private transient volatile Collection lockIntention; - + - public DeadlockDetectingGlobalTransaction() { } @@ -158,15 +158,13 @@ /** * Returns the key this transaction intends to lock. */ - public Object getLockIntention() { + public Collection getLockIntention() { return lockIntention; } - /** - * Sets the lock this transaction intends to lock. - */ - public void setLockIntention(Object lockIntention) { - this.lockIntention = lockIntention; + public void setLockIntention(Collection lockIntentionSet) { + if (log.isTraceEnabled()) log.trace("Setting lock intention to: " + lockIntentionSet); + this.lockIntention = lockIntentionSet; } public static class Externalizer extends GlobalTransaction.Externalizer { Index: core/src/test/java/org/infinispan/tx/EagerTxDeadlockDetectionTest.java =================================================================== --- core/src/test/java/org/infinispan/tx/EagerTxDeadlockDetectionTest.java (revision ) +++ core/src/test/java/org/infinispan/tx/EagerTxDeadlockDetectionTest.java (revision ) @@ -0,0 +1,138 @@ +package org.infinispan.tx; + +import org.infinispan.config.Configuration; +import org.infinispan.test.MultipleCacheManagersTest; +import org.infinispan.test.PerCacheExecutorThread; +import org.infinispan.test.TestingUtil; +import org.infinispan.util.concurrent.TimeoutException; +import org.infinispan.util.concurrent.locks.DeadlockDetectingLockManager; +import org.testng.annotations.Test; + +import javax.transaction.Status; + +/** + * @author Mircea.Markus@jboss.com + * @since 4.1 + */ +@Test(groups = "functional", testName = "tx.EagerTxDeadlockDetectionTest") +public class EagerTxDeadlockDetectionTest extends MultipleCacheManagersTest { + private PerCacheExecutorThread ex1; + private PerCacheExecutorThread ex2; + private DeadlockDetectingLockManager lm0; + private DeadlockDetectingLockManager lm1; + + @Override + protected void createCacheManagers() throws Throwable { + Configuration configuration = getConfiguration(); + createClusteredCaches(2, configuration); + ex1 = new PerCacheExecutorThread(cache(0), 0); + ex2 = new PerCacheExecutorThread(cache(1), 1); + lm0 = (DeadlockDetectingLockManager) TestingUtil.extractLockManager(cache(0)); + lm0.setExposeJmxStats(true); + lm1 = (DeadlockDetectingLockManager) TestingUtil.extractLockManager(cache(1)); + lm1.setExposeJmxStats(true); + } + + protected Configuration getConfiguration() throws Exception { + Configuration configuration = getDefaultClusteredConfig(Configuration.CacheMode.REPL_SYNC, true); + configuration.setUseEagerLocking(true); + configuration.setEnableDeadlockDetection(true); + configuration.setUseLockStriping(false); + return configuration; + } + + public void testDeadlock() throws Exception { + ex1.execute(PerCacheExecutorThread.Operations.BEGGIN_TX); + ex2.execute(PerCacheExecutorThread.Operations.BEGGIN_TX); + + ex1.setKeyValue("k1", "v1_1"); + ex1.execute(PerCacheExecutorThread.Operations.PUT_KEY_VALUE); + ex2.setKeyValue("k2", "v2_2"); + ex2.execute(PerCacheExecutorThread.Operations.PUT_KEY_VALUE); + assert lm0.isLocked("k1"); + assert lm0.isLocked("k2"); + assert lm1.isLocked("k1"); + assert lm1.isLocked("k2"); + + log.trace("After first set of puts"); + + ex1.clearResponse(); + ex2.clearResponse(); + ex2.setKeyValue("k1", "v1_2"); + ex2.executeNoResponse(PerCacheExecutorThread.Operations.PUT_KEY_VALUE); + ex1.setKeyValue("k2", "v2_1"); + ex1.executeNoResponse(PerCacheExecutorThread.Operations.PUT_KEY_VALUE); + ex1.waitForResponse(); + ex2.waitForResponse(); + + + boolean b1 = ex1.lastResponse() instanceof Exception; + boolean b2 = ex2.lastResponse() instanceof Exception; + assert xor(b1, b2) : "Both are " + (b1 || b2); + + assert xor(ex1.getOngoingTransaction().getStatus() == Status.STATUS_MARKED_ROLLBACK, + ex2.getOngoingTransaction().getStatus() == Status.STATUS_MARKED_ROLLBACK); + + + ex1.execute(PerCacheExecutorThread.Operations.COMMIT_TX); + ex2.execute(PerCacheExecutorThread.Operations.COMMIT_TX); + + + assert cache(0).get("k1") != null; + assert cache(0).get("k2") != null; + assert cache(1).get("k1") != null; + assert cache(1).get("k2") != null; + + long totalDeadlocks = lm0.getLocallyInterruptedTransactions() + lm1.getLocallyInterruptedTransactions(); + assert totalDeadlocks == 1 : "Expected 1 but received " + totalDeadlocks; + } + + public void testTransactionNotAffectedByTimeout() throws Exception { + ex1.execute(PerCacheExecutorThread.Operations.BEGGIN_TX); + ex2.execute(PerCacheExecutorThread.Operations.BEGGIN_TX); + + ex1.setKeyValue("k1", "v1"); + ex1.execute(PerCacheExecutorThread.Operations.PUT_KEY_VALUE); + + assert lm0.isLocked("k1"); + assert lm1.isLocked("k2"); + + ex2.setKeyValue("k1", "v2"); + ex2.execute(PerCacheExecutorThread.Operations.PUT_KEY_VALUE); + + assert ex2.lastResponse() instanceof Exception; + + log.trace("After first set of puts"); + + ex1.clearResponse(); + ex2.clearResponse(); + ex2.setKeyValue("k1", "v1_2"); + ex2.executeNoResponse(PerCacheExecutorThread.Operations.PUT_KEY_VALUE); + ex1.setKeyValue("k2", "v2_1"); + ex1.executeNoResponse(PerCacheExecutorThread.Operations.PUT_KEY_VALUE); + ex1.waitForResponse(); + ex2.waitForResponse(); + + + boolean b1 = ex1.lastResponse() instanceof Exception; + boolean b2 = ex2.lastResponse() instanceof Exception; + assert xor(b1, b2) : "Both are " + (b1 || b2); + + assert xor(ex1.getOngoingTransaction().getStatus() == Status.STATUS_MARKED_ROLLBACK, + ex2.getOngoingTransaction().getStatus() == Status.STATUS_MARKED_ROLLBACK); + + + ex1.execute(PerCacheExecutorThread.Operations.COMMIT_TX); + ex2.execute(PerCacheExecutorThread.Operations.COMMIT_TX); + + + assert cache(0).get("k1") != null; + assert cache(0).get("k2") != null; + assert cache(1).get("k1") != null; + assert cache(1).get("k2") != null; + + long totalDeadlocks = lm0.getLocallyInterruptedTransactions() + lm1.getLocallyInterruptedTransactions(); + assert totalDeadlocks == 1 : "Expected 1 but received " + totalDeadlocks; + + } +} Index: core/src/main/java/org/infinispan/interceptors/DeadlockDetectingInterceptor.java =================================================================== --- core/src/main/java/org/infinispan/interceptors/DeadlockDetectingInterceptor.java (revision 1521) +++ core/src/main/java/org/infinispan/interceptors/DeadlockDetectingInterceptor.java (revision ) @@ -1,6 +1,8 @@ package org.infinispan.interceptors; import org.infinispan.commands.DataCommand; +import org.infinispan.commands.VisitableCommand; +import org.infinispan.commands.control.LockControlCommand; import org.infinispan.commands.tx.PrepareCommand; import org.infinispan.commands.tx.RollbackCommand; import org.infinispan.commands.write.PutKeyValueCommand; @@ -18,8 +20,11 @@ import org.infinispan.util.concurrent.locks.DeadlockDetectedException; import org.infinispan.util.concurrent.locks.LockManager; +import javax.transaction.SystemException; import javax.transaction.Transaction; import javax.transaction.TransactionManager; +import java.util.Collection; +import java.util.Collections; import java.util.HashSet; /** @@ -60,59 +65,83 @@ } } - private Object handleDataCommand(InvocationContext ctx, DataCommand command) throws Throwable { + private Object handleDataCommand(InvocationContext ctx, VisitableCommand command, Collection lockIntention) throws Throwable { if (ctx.isInTxScope()) { DeadlockDetectingGlobalTransaction gtx = (DeadlockDetectingGlobalTransaction) ctx.getLockOwner(); - gtx.setLockIntention(command.getKey()); + gtx.setLockIntention(lockIntention); gtx.setProcessingThread(Thread.currentThread()); } try { return invokeNextInterceptor(ctx, command); - } catch (InterruptedException ie) { + } catch (InterruptedException ie) {//this means that the tx was interrupted on this node + releaseLocks(ctx); + throw ie; + } catch (DeadlockDetectedException dde) { //this mean that I gave myself a shutdown request + releaseLocks(ctx); + throw dde; + } catch (Exception ex) {//now look for a remote interruption + Throwable throwable = ex; + while (throwable.getCause() != null) { + if (throwable.getCause() instanceof InterruptedException) { + releaseLocks(ctx); + } + throwable = throwable.getCause(); + } + if (log.isTraceEnabled()) { + log.trace("Skipping exception:" + ex); + } + throw ex; + } + } + + private void releaseLocks(InvocationContext ctx) throws SystemException, InterruptedException { - if (ctx.isInTxScope()) { - lockManager.releaseLocks(ctx); - if (ctx.isOriginLocal()) { - Transaction transaction = txManager.getTransaction(); - if (trace) - log.trace("Marking the transaction for rollback! : " + transaction); - if (transaction == null) { - throw new IllegalStateException("We're running in a local transaction, there MUST be one " + + if (ctx.isInTxScope()) { + lockManager.releaseLocks(ctx); + if (ctx.isOriginLocal()) { + Transaction transaction = txManager.getTransaction(); + if (trace) + log.trace("Marking the transaction for rollback! : " + transaction); + if (transaction == null) { + throw new IllegalStateException("We're running in a local transaction, there MUST be one " + - "associated witht the local thread but none found! (null)"); + "associated with the local thread but none found! (null)"); - } - transaction.setRollbackOnly(); - txTable.removeLocalTransaction(transaction); - throw new DeadlockDetectedException("Deadlock request was detected for locally originated tx " + transaction + - "; it was marked for rollback"); - } else { - DeadlockDetectingGlobalTransaction gtx = (DeadlockDetectingGlobalTransaction) ctx.getLockOwner(); - gtx.setMarkedForRollback(true); - throw new DeadlockDetectedException("Deadlock request was detected for remotely originated tx " + gtx + - "; it was marked for rollback"); - } - } else { - if (trace) + } + transaction.setRollbackOnly(); + txTable.removeLocalTransaction(transaction); + throw new DeadlockDetectedException("Deadlock request was detected for locally originated tx " + transaction + + "; it was marked for rollback"); + } else { + DeadlockDetectingGlobalTransaction gtx = (DeadlockDetectingGlobalTransaction) ctx.getLockOwner(); + gtx.setMarkedForRollback(true); + throw new DeadlockDetectedException("Deadlock request was detected for remotely originated tx " + gtx + + "; it was marked for rollback"); + } + } else { + if (trace) - log.trace("Received an interrupt request, but we're not running within the scope of a transaction, so passing it up the stack", ie); - throw ie; + log.trace("Received an interrupt request, but we're not running within the scope of a transaction, so passing it up the stack"); - } - } + } + } - } @Override public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable { - return handleDataCommand(ctx, command); + return handleDataCommand(ctx, command, Collections.singletonList(command.getKey())); } @Override public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable { - return handleDataCommand(ctx, command); + return handleDataCommand(ctx, command, Collections.singletonList(command.getKey())); } @Override public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) throws Throwable { - return handleDataCommand(ctx, command); + return handleDataCommand(ctx, command, Collections.singletonList(command.getKey())); } @Override + public Object visitLockControlCommand(TxInvocationContext ctx, LockControlCommand command) throws Throwable { + return handleDataCommand(ctx, command, command.getKeys()); + } + + @Override public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand command) throws Throwable { DeadlockDetectingGlobalTransaction globalTransaction = (DeadlockDetectingGlobalTransaction) ctx.getGlobalTransaction(); globalTransaction.setProcessingThread(Thread.currentThread()); Index: core/src/test/resources/log4j.xml =================================================================== --- core/src/test/resources/log4j.xml (revision 1944) +++ core/src/test/resources/log4j.xml (revision ) @@ -45,7 +45,7 @@ - + @@ -65,7 +65,7 @@ - + Index: core/src/main/java/org/infinispan/util/concurrent/locks/DeadlockDetectingLockManager.java =================================================================== --- core/src/main/java/org/infinispan/util/concurrent/locks/DeadlockDetectingLockManager.java (revision 1536) +++ core/src/main/java/org/infinispan/util/concurrent/locks/DeadlockDetectingLockManager.java (revision ) @@ -91,9 +91,9 @@ private void localVsLocalDld(InvocationContext ctx, DeadlockDetectingGlobalTransaction lockOwnerTx) { if (trace) log.trace("Looking for local vs local deadlocks"); DeadlockDetectingGlobalTransaction thisThreadsTx = (DeadlockDetectingGlobalTransaction) ctx.getLockOwner(); - boolean weOwnLock = ownsLock(lockOwnerTx.getLockIntention(), thisThreadsTx); + boolean weOwnLock = ownsLock_(lockOwnerTx, thisThreadsTx); if (trace) { - log.trace("Other owner's intention is " + lockOwnerTx.getLockIntention() + ". Do we(" + thisThreadsTx + ") own lock for it? " + weOwnLock + ". Lock owner is " + getOwner(lockOwnerTx.getLockIntention())); + log.trace("Other owner's intention is " + lockOwnerTx.getLockIntention() + ". Do we(" + thisThreadsTx + ") own lock for it? " + weOwnLock); } if (weOwnLock) { boolean iShouldInterrupt = thisThreadsTx.thisWillInterrupt(lockOwnerTx); @@ -106,6 +106,19 @@ } } + private boolean ownsLock_(DeadlockDetectingGlobalTransaction lockOwnerTx, DeadlockDetectingGlobalTransaction thisThreadsTx) { + for (Object o : lockOwnerTx.getLockIntention()) { + boolean ownsLock = ownsLock(o, thisThreadsTx); + if (log.isTraceEnabled()) { + log.trace("Do we[ " + thisThreadsTx + " ] own lock on '" + o + "'? " + ownsLock + (ownsLock ? "" : " Owner is: " + getOwner(o))); + } + if (ownsLock) { + return true; + } + } + return false; + } + private boolean remoteVsRemoteDld(Object key, InvocationContext ctx, long lockTimeout, long start, long now, DeadlockDetectingGlobalTransaction lockOwnerTx) throws InterruptedException { TxInvocationContext remoteTxContext = (TxInvocationContext) ctx; Address origin = remoteTxContext.getGlobalTransaction().getAddress(); Index: core/src/main/java/org/infinispan/interceptors/ImplicitEagerLockingInterceptor.java =================================================================== --- core/src/main/java/org/infinispan/interceptors/ImplicitEagerLockingInterceptor.java (revision 604) +++ core/src/main/java/org/infinispan/interceptors/ImplicitEagerLockingInterceptor.java (revision ) @@ -35,8 +35,7 @@ } @Override - public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) - throws Throwable { + public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable { boolean localTxScope = ctx.isInTxScope() & ctx.isOriginLocal(); if (localTxScope) { lockEagerly(ctx, Collections.singleton(command.getKey())); Index: core/src/main/java/org/infinispan/interceptors/LockingInterceptor.java =================================================================== --- core/src/main/java/org/infinispan/interceptors/LockingInterceptor.java (revision 1848) +++ core/src/main/java/org/infinispan/interceptors/LockingInterceptor.java (revision ) @@ -133,22 +133,18 @@ @Override public Object visitLockControlCommand(TxInvocationContext ctx, LockControlCommand c) throws Throwable { boolean localTxScope = ctx.isOriginLocal() && ctx.isInTxScope(); - boolean shouldInvokeOnCluster = false; try { if (localTxScope) { c.attachGlobalTransaction((GlobalTransaction) ctx.getLockOwner()); } + Object result = null; for (Object key : c.getKeys()) { - if (c.isImplicit() && localTxScope && !lockManager.ownsLock(key, ctx.getLockOwner())) { - //if even one key is unlocked we need to invoke this lock command cluster wide... - shouldInvokeOnCluster = true; + if (c.isImplicit() && localTxScope && !lockManager.ownsLock(key, ctx.getLockOwner()) || c.isExplicit()) { + result = invokeNextInterceptor(ctx, c); } entryFactory.wrapEntryForWriting(ctx, key, true, false, false, false, false); } - if (shouldInvokeOnCluster || c.isExplicit()) - return invokeNextInterceptor(ctx, c); - else - return null; + return result; } finally { if (ctx.isInTxScope()) { doAfterCall(ctx); Index: core/src/test/java/org/infinispan/test/PerCacheExecutorThread.java =================================================================== --- core/src/test/java/org/infinispan/test/PerCacheExecutorThread.java (revision 612) +++ core/src/test/java/org/infinispan/test/PerCacheExecutorThread.java (revision ) @@ -4,6 +4,7 @@ import org.infinispan.util.logging.Log; import org.infinispan.util.logging.LogFactory; +import javax.transaction.Transaction; import javax.transaction.TransactionManager; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -24,6 +25,7 @@ private BlockingQueue toExecute = new ArrayBlockingQueue(1); private volatile Object response; private CountDownLatch responseLatch = new CountDownLatch(1); + private volatile Transaction ongoingTransaction; private volatile Object key, value; @@ -75,6 +77,7 @@ TransactionManager txManager = TestingUtil.getTransactionManager(cache); try { txManager.begin(); + ongoingTransaction = txManager.getTransaction(); setResponse(OperationsResult.BEGGIN_TX_OK); } catch (Exception e) { log.trace("Failure on beggining tx", e); @@ -86,6 +89,7 @@ TransactionManager txManager = TestingUtil.getTransactionManager(cache); try { txManager.commit(); + ongoingTransaction = null; setResponse(OperationsResult.COMMIT_TX_OK); } catch (Exception e) { log.trace("Exception while committing tx", e); @@ -210,6 +214,9 @@ */ public static enum OperationsResult { BEGGIN_TX_OK, COMMIT_TX_OK, PUT_KEY_VALUE_OK, REMOVE_KEY_OK, REPLACE_KEY_VALUE_OK, STOP_THREAD_OK + } + public Transaction getOngoingTransaction() { + return ongoingTransaction; } } Index: cachestore/remote/src/main/java/org/infinispan/loaders/remote/InternalCacheEntryMarshaller.java =================================================================== --- cachestore/remote/src/main/java/org/infinispan/loaders/remote/InternalCacheEntryMarshaller.java (revision 1850) +++ cachestore/remote/src/main/java/org/infinispan/loaders/remote/InternalCacheEntryMarshaller.java (revision ) @@ -5,6 +5,7 @@ import org.infinispan.marshall.Marshaller; import java.io.IOException; +import java.util.Properties; /** * Marshaller used internally by the remote cache store. @@ -21,7 +22,12 @@ } @Override - public byte[] marshallObject(Object toMarshall) { + public void init(Properties config) { + //ignore + } + + @Override + public byte[] marshallObject(Object toMarshall, boolean isKeyHint) { try { return marshaller.objectToByteBuffer(toMarshall); } catch (IOException e) {