Index: core/src/main/java/org/infinispan/transaction/xa/DeadlockDetectingGlobalTransaction.java
===================================================================
--- core/src/main/java/org/infinispan/transaction/xa/DeadlockDetectingGlobalTransaction.java (revision 1521)
+++ core/src/main/java/org/infinispan/transaction/xa/DeadlockDetectingGlobalTransaction.java (revision )
@@ -9,6 +9,7 @@
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import java.util.Collection;
import java.util.Collections;
import java.util.Set;
@@ -32,9 +33,8 @@
private volatile boolean isMarkedForRollback;
- private transient volatile Object lockIntention;
+ private transient volatile Collection lockIntention;
-
+
-
public DeadlockDetectingGlobalTransaction() {
}
@@ -158,15 +158,13 @@
/**
* Returns the key this transaction intends to lock.
*/
- public Object getLockIntention() {
+ public Collection getLockIntention() {
return lockIntention;
}
- /**
- * Sets the lock this transaction intends to lock.
- */
- public void setLockIntention(Object lockIntention) {
- this.lockIntention = lockIntention;
+ public void setLockIntention(Collection lockIntentionSet) {
+ if (log.isTraceEnabled()) log.trace("Setting lock intention to: " + lockIntentionSet);
+ this.lockIntention = lockIntentionSet;
}
public static class Externalizer extends GlobalTransaction.Externalizer {
Index: core/src/test/java/org/infinispan/tx/EagerTxDeadlockDetectionTest.java
===================================================================
--- core/src/test/java/org/infinispan/tx/EagerTxDeadlockDetectionTest.java (revision )
+++ core/src/test/java/org/infinispan/tx/EagerTxDeadlockDetectionTest.java (revision )
@@ -0,0 +1,138 @@
+package org.infinispan.tx;
+
+import org.infinispan.config.Configuration;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.infinispan.test.PerCacheExecutorThread;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.util.concurrent.TimeoutException;
+import org.infinispan.util.concurrent.locks.DeadlockDetectingLockManager;
+import org.testng.annotations.Test;
+
+import javax.transaction.Status;
+
+/**
+ * @author Mircea.Markus@jboss.com
+ * @since 4.1
+ */
+@Test(groups = "functional", testName = "tx.EagerTxDeadlockDetectionTest")
+public class EagerTxDeadlockDetectionTest extends MultipleCacheManagersTest {
+ private PerCacheExecutorThread ex1;
+ private PerCacheExecutorThread ex2;
+ private DeadlockDetectingLockManager lm0;
+ private DeadlockDetectingLockManager lm1;
+
+ @Override
+ protected void createCacheManagers() throws Throwable {
+ Configuration configuration = getConfiguration();
+ createClusteredCaches(2, configuration);
+ ex1 = new PerCacheExecutorThread(cache(0), 0);
+ ex2 = new PerCacheExecutorThread(cache(1), 1);
+ lm0 = (DeadlockDetectingLockManager) TestingUtil.extractLockManager(cache(0));
+ lm0.setExposeJmxStats(true);
+ lm1 = (DeadlockDetectingLockManager) TestingUtil.extractLockManager(cache(1));
+ lm1.setExposeJmxStats(true);
+ }
+
+ protected Configuration getConfiguration() throws Exception {
+ Configuration configuration = getDefaultClusteredConfig(Configuration.CacheMode.REPL_SYNC, true);
+ configuration.setUseEagerLocking(true);
+ configuration.setEnableDeadlockDetection(true);
+ configuration.setUseLockStriping(false);
+ return configuration;
+ }
+
+ public void testDeadlock() throws Exception {
+ ex1.execute(PerCacheExecutorThread.Operations.BEGGIN_TX);
+ ex2.execute(PerCacheExecutorThread.Operations.BEGGIN_TX);
+
+ ex1.setKeyValue("k1", "v1_1");
+ ex1.execute(PerCacheExecutorThread.Operations.PUT_KEY_VALUE);
+ ex2.setKeyValue("k2", "v2_2");
+ ex2.execute(PerCacheExecutorThread.Operations.PUT_KEY_VALUE);
+ assert lm0.isLocked("k1");
+ assert lm0.isLocked("k2");
+ assert lm1.isLocked("k1");
+ assert lm1.isLocked("k2");
+
+ log.trace("After first set of puts");
+
+ ex1.clearResponse();
+ ex2.clearResponse();
+ ex2.setKeyValue("k1", "v1_2");
+ ex2.executeNoResponse(PerCacheExecutorThread.Operations.PUT_KEY_VALUE);
+ ex1.setKeyValue("k2", "v2_1");
+ ex1.executeNoResponse(PerCacheExecutorThread.Operations.PUT_KEY_VALUE);
+ ex1.waitForResponse();
+ ex2.waitForResponse();
+
+
+ boolean b1 = ex1.lastResponse() instanceof Exception;
+ boolean b2 = ex2.lastResponse() instanceof Exception;
+ assert xor(b1, b2) : "Both are " + (b1 || b2);
+
+ assert xor(ex1.getOngoingTransaction().getStatus() == Status.STATUS_MARKED_ROLLBACK,
+ ex2.getOngoingTransaction().getStatus() == Status.STATUS_MARKED_ROLLBACK);
+
+
+ ex1.execute(PerCacheExecutorThread.Operations.COMMIT_TX);
+ ex2.execute(PerCacheExecutorThread.Operations.COMMIT_TX);
+
+
+ assert cache(0).get("k1") != null;
+ assert cache(0).get("k2") != null;
+ assert cache(1).get("k1") != null;
+ assert cache(1).get("k2") != null;
+
+ long totalDeadlocks = lm0.getLocallyInterruptedTransactions() + lm1.getLocallyInterruptedTransactions();
+ assert totalDeadlocks == 1 : "Expected 1 but received " + totalDeadlocks;
+ }
+
+ public void testTransactionNotAffectedByTimeout() throws Exception {
+ ex1.execute(PerCacheExecutorThread.Operations.BEGGIN_TX);
+ ex2.execute(PerCacheExecutorThread.Operations.BEGGIN_TX);
+
+ ex1.setKeyValue("k1", "v1");
+ ex1.execute(PerCacheExecutorThread.Operations.PUT_KEY_VALUE);
+
+ assert lm0.isLocked("k1");
+ assert lm1.isLocked("k2");
+
+ ex2.setKeyValue("k1", "v2");
+ ex2.execute(PerCacheExecutorThread.Operations.PUT_KEY_VALUE);
+
+ assert ex2.lastResponse() instanceof Exception;
+
+ log.trace("After first set of puts");
+
+ ex1.clearResponse();
+ ex2.clearResponse();
+ ex2.setKeyValue("k1", "v1_2");
+ ex2.executeNoResponse(PerCacheExecutorThread.Operations.PUT_KEY_VALUE);
+ ex1.setKeyValue("k2", "v2_1");
+ ex1.executeNoResponse(PerCacheExecutorThread.Operations.PUT_KEY_VALUE);
+ ex1.waitForResponse();
+ ex2.waitForResponse();
+
+
+ boolean b1 = ex1.lastResponse() instanceof Exception;
+ boolean b2 = ex2.lastResponse() instanceof Exception;
+ assert xor(b1, b2) : "Both are " + (b1 || b2);
+
+ assert xor(ex1.getOngoingTransaction().getStatus() == Status.STATUS_MARKED_ROLLBACK,
+ ex2.getOngoingTransaction().getStatus() == Status.STATUS_MARKED_ROLLBACK);
+
+
+ ex1.execute(PerCacheExecutorThread.Operations.COMMIT_TX);
+ ex2.execute(PerCacheExecutorThread.Operations.COMMIT_TX);
+
+
+ assert cache(0).get("k1") != null;
+ assert cache(0).get("k2") != null;
+ assert cache(1).get("k1") != null;
+ assert cache(1).get("k2") != null;
+
+ long totalDeadlocks = lm0.getLocallyInterruptedTransactions() + lm1.getLocallyInterruptedTransactions();
+ assert totalDeadlocks == 1 : "Expected 1 but received " + totalDeadlocks;
+
+ }
+}
Index: core/src/main/java/org/infinispan/interceptors/DeadlockDetectingInterceptor.java
===================================================================
--- core/src/main/java/org/infinispan/interceptors/DeadlockDetectingInterceptor.java (revision 1521)
+++ core/src/main/java/org/infinispan/interceptors/DeadlockDetectingInterceptor.java (revision )
@@ -1,6 +1,8 @@
package org.infinispan.interceptors;
import org.infinispan.commands.DataCommand;
+import org.infinispan.commands.VisitableCommand;
+import org.infinispan.commands.control.LockControlCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.tx.RollbackCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
@@ -18,8 +20,11 @@
import org.infinispan.util.concurrent.locks.DeadlockDetectedException;
import org.infinispan.util.concurrent.locks.LockManager;
+import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
/**
@@ -60,59 +65,83 @@
}
}
- private Object handleDataCommand(InvocationContext ctx, DataCommand command) throws Throwable {
+ private Object handleDataCommand(InvocationContext ctx, VisitableCommand command, Collection lockIntention) throws Throwable {
if (ctx.isInTxScope()) {
DeadlockDetectingGlobalTransaction gtx = (DeadlockDetectingGlobalTransaction) ctx.getLockOwner();
- gtx.setLockIntention(command.getKey());
+ gtx.setLockIntention(lockIntention);
gtx.setProcessingThread(Thread.currentThread());
}
try {
return invokeNextInterceptor(ctx, command);
- } catch (InterruptedException ie) {
+ } catch (InterruptedException ie) {//this means that the tx was interrupted on this node
+ releaseLocks(ctx);
+ throw ie;
+ } catch (DeadlockDetectedException dde) { //this mean that I gave myself a shutdown request
+ releaseLocks(ctx);
+ throw dde;
+ } catch (Exception ex) {//now look for a remote interruption
+ Throwable throwable = ex;
+ while (throwable.getCause() != null) {
+ if (throwable.getCause() instanceof InterruptedException) {
+ releaseLocks(ctx);
+ }
+ throwable = throwable.getCause();
+ }
+ if (log.isTraceEnabled()) {
+ log.trace("Skipping exception:" + ex);
+ }
+ throw ex;
+ }
+ }
+
+ private void releaseLocks(InvocationContext ctx) throws SystemException, InterruptedException {
- if (ctx.isInTxScope()) {
- lockManager.releaseLocks(ctx);
- if (ctx.isOriginLocal()) {
- Transaction transaction = txManager.getTransaction();
- if (trace)
- log.trace("Marking the transaction for rollback! : " + transaction);
- if (transaction == null) {
- throw new IllegalStateException("We're running in a local transaction, there MUST be one " +
+ if (ctx.isInTxScope()) {
+ lockManager.releaseLocks(ctx);
+ if (ctx.isOriginLocal()) {
+ Transaction transaction = txManager.getTransaction();
+ if (trace)
+ log.trace("Marking the transaction for rollback! : " + transaction);
+ if (transaction == null) {
+ throw new IllegalStateException("We're running in a local transaction, there MUST be one " +
- "associated witht the local thread but none found! (null)");
+ "associated with the local thread but none found! (null)");
- }
- transaction.setRollbackOnly();
- txTable.removeLocalTransaction(transaction);
- throw new DeadlockDetectedException("Deadlock request was detected for locally originated tx " + transaction +
- "; it was marked for rollback");
- } else {
- DeadlockDetectingGlobalTransaction gtx = (DeadlockDetectingGlobalTransaction) ctx.getLockOwner();
- gtx.setMarkedForRollback(true);
- throw new DeadlockDetectedException("Deadlock request was detected for remotely originated tx " + gtx +
- "; it was marked for rollback");
- }
- } else {
- if (trace)
+ }
+ transaction.setRollbackOnly();
+ txTable.removeLocalTransaction(transaction);
+ throw new DeadlockDetectedException("Deadlock request was detected for locally originated tx " + transaction +
+ "; it was marked for rollback");
+ } else {
+ DeadlockDetectingGlobalTransaction gtx = (DeadlockDetectingGlobalTransaction) ctx.getLockOwner();
+ gtx.setMarkedForRollback(true);
+ throw new DeadlockDetectedException("Deadlock request was detected for remotely originated tx " + gtx +
+ "; it was marked for rollback");
+ }
+ } else {
+ if (trace)
- log.trace("Received an interrupt request, but we're not running within the scope of a transaction, so passing it up the stack", ie);
- throw ie;
+ log.trace("Received an interrupt request, but we're not running within the scope of a transaction, so passing it up the stack");
- }
- }
+ }
+ }
- }
@Override
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
- return handleDataCommand(ctx, command);
+ return handleDataCommand(ctx, command, Collections.singletonList(command.getKey()));
}
@Override
public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable {
- return handleDataCommand(ctx, command);
+ return handleDataCommand(ctx, command, Collections.singletonList(command.getKey()));
}
@Override
public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) throws Throwable {
- return handleDataCommand(ctx, command);
+ return handleDataCommand(ctx, command, Collections.singletonList(command.getKey()));
}
@Override
+ public Object visitLockControlCommand(TxInvocationContext ctx, LockControlCommand command) throws Throwable {
+ return handleDataCommand(ctx, command, command.getKeys());
+ }
+
+ @Override
public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand command) throws Throwable {
DeadlockDetectingGlobalTransaction globalTransaction = (DeadlockDetectingGlobalTransaction) ctx.getGlobalTransaction();
globalTransaction.setProcessingThread(Thread.currentThread());
Index: core/src/test/resources/log4j.xml
===================================================================
--- core/src/test/resources/log4j.xml (revision 1944)
+++ core/src/test/resources/log4j.xml (revision )
@@ -45,7 +45,7 @@
-
+
@@ -65,7 +65,7 @@
-
+
Index: core/src/main/java/org/infinispan/util/concurrent/locks/DeadlockDetectingLockManager.java
===================================================================
--- core/src/main/java/org/infinispan/util/concurrent/locks/DeadlockDetectingLockManager.java (revision 1536)
+++ core/src/main/java/org/infinispan/util/concurrent/locks/DeadlockDetectingLockManager.java (revision )
@@ -91,9 +91,9 @@
private void localVsLocalDld(InvocationContext ctx, DeadlockDetectingGlobalTransaction lockOwnerTx) {
if (trace) log.trace("Looking for local vs local deadlocks");
DeadlockDetectingGlobalTransaction thisThreadsTx = (DeadlockDetectingGlobalTransaction) ctx.getLockOwner();
- boolean weOwnLock = ownsLock(lockOwnerTx.getLockIntention(), thisThreadsTx);
+ boolean weOwnLock = ownsLock_(lockOwnerTx, thisThreadsTx);
if (trace) {
- log.trace("Other owner's intention is " + lockOwnerTx.getLockIntention() + ". Do we(" + thisThreadsTx + ") own lock for it? " + weOwnLock + ". Lock owner is " + getOwner(lockOwnerTx.getLockIntention()));
+ log.trace("Other owner's intention is " + lockOwnerTx.getLockIntention() + ". Do we(" + thisThreadsTx + ") own lock for it? " + weOwnLock);
}
if (weOwnLock) {
boolean iShouldInterrupt = thisThreadsTx.thisWillInterrupt(lockOwnerTx);
@@ -106,6 +106,19 @@
}
}
+ private boolean ownsLock_(DeadlockDetectingGlobalTransaction lockOwnerTx, DeadlockDetectingGlobalTransaction thisThreadsTx) {
+ for (Object o : lockOwnerTx.getLockIntention()) {
+ boolean ownsLock = ownsLock(o, thisThreadsTx);
+ if (log.isTraceEnabled()) {
+ log.trace("Do we[ " + thisThreadsTx + " ] own lock on '" + o + "'? " + ownsLock + (ownsLock ? "" : " Owner is: " + getOwner(o)));
+ }
+ if (ownsLock) {
+ return true;
+ }
+ }
+ return false;
+ }
+
private boolean remoteVsRemoteDld(Object key, InvocationContext ctx, long lockTimeout, long start, long now, DeadlockDetectingGlobalTransaction lockOwnerTx) throws InterruptedException {
TxInvocationContext remoteTxContext = (TxInvocationContext) ctx;
Address origin = remoteTxContext.getGlobalTransaction().getAddress();
Index: core/src/main/java/org/infinispan/interceptors/ImplicitEagerLockingInterceptor.java
===================================================================
--- core/src/main/java/org/infinispan/interceptors/ImplicitEagerLockingInterceptor.java (revision 604)
+++ core/src/main/java/org/infinispan/interceptors/ImplicitEagerLockingInterceptor.java (revision )
@@ -35,8 +35,7 @@
}
@Override
- public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command)
- throws Throwable {
+ public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
boolean localTxScope = ctx.isInTxScope() & ctx.isOriginLocal();
if (localTxScope) {
lockEagerly(ctx, Collections.singleton(command.getKey()));
Index: core/src/main/java/org/infinispan/interceptors/LockingInterceptor.java
===================================================================
--- core/src/main/java/org/infinispan/interceptors/LockingInterceptor.java (revision 1848)
+++ core/src/main/java/org/infinispan/interceptors/LockingInterceptor.java (revision )
@@ -133,22 +133,18 @@
@Override
public Object visitLockControlCommand(TxInvocationContext ctx, LockControlCommand c) throws Throwable {
boolean localTxScope = ctx.isOriginLocal() && ctx.isInTxScope();
- boolean shouldInvokeOnCluster = false;
try {
if (localTxScope) {
c.attachGlobalTransaction((GlobalTransaction) ctx.getLockOwner());
}
+ Object result = null;
for (Object key : c.getKeys()) {
- if (c.isImplicit() && localTxScope && !lockManager.ownsLock(key, ctx.getLockOwner())) {
- //if even one key is unlocked we need to invoke this lock command cluster wide...
- shouldInvokeOnCluster = true;
+ if (c.isImplicit() && localTxScope && !lockManager.ownsLock(key, ctx.getLockOwner()) || c.isExplicit()) {
+ result = invokeNextInterceptor(ctx, c);
}
entryFactory.wrapEntryForWriting(ctx, key, true, false, false, false, false);
}
- if (shouldInvokeOnCluster || c.isExplicit())
- return invokeNextInterceptor(ctx, c);
- else
- return null;
+ return result;
} finally {
if (ctx.isInTxScope()) {
doAfterCall(ctx);
Index: core/src/test/java/org/infinispan/test/PerCacheExecutorThread.java
===================================================================
--- core/src/test/java/org/infinispan/test/PerCacheExecutorThread.java (revision 612)
+++ core/src/test/java/org/infinispan/test/PerCacheExecutorThread.java (revision )
@@ -4,6 +4,7 @@
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
+import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
@@ -24,6 +25,7 @@
private BlockingQueue