package dev.cache.infinispan; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.ConcurrentModificationException; import java.util.Date; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import javax.transaction.TransactionManager; import junit.framework.TestCase; import junit.framework.TestSuite; import junit.textui.TestRunner; import org.infinispan.Cache; import org.infinispan.configuration.cache.CacheMode; import org.infinispan.configuration.cache.Configuration; import org.infinispan.configuration.cache.ConfigurationBuilder; import org.infinispan.configuration.global.GlobalConfigurationBuilder; import org.infinispan.eviction.EvictionStrategy; import org.infinispan.manager.DefaultCacheManager; import org.infinispan.transaction.LockingMode; import org.infinispan.transaction.TransactionMode; import org.infinispan.util.concurrent.locks.LockManager; /** * Tests for Infinispan locks that use provider's features only. * * @author dudalov */ public class LockTest extends TestCase { private static String CLASSNAME = LockTest.class.getCanonicalName(); private DefaultCacheManager _manager; public LockTest() { } protected void setUp() throws Exception { super.setUp(); _manager = new DefaultCacheManager(); /* _manager = new DefaultCacheManager( GlobalConfigurationBuilder.defaultClusteredBuilder() .globalJmxStatistics().enable() .transport().addProperty("configurationFile", "jgroups-udp.xml") .build(), new ConfigurationBuilder() .clustering().cacheMode(CacheMode.REPL_SYNC) .build() ); */ } protected void tearDown() throws Exception { _manager.stop(); super.tearDown(); } private Cache getCache(String cacheName, int capacity) { EvictionStrategy evictionStrategy = EvictionStrategy.LRU; /* org.infinispan.CacheException: Explicit locking is not allowed with optimistic caches! at org.infinispan.interceptors.locking.OptimisticLockingInterceptor.visitLockControlCommand(OptimisticLockingInterceptor.java:166) at org.infinispan.commands.control.LockControlCommand.acceptVisitor(LockControlCommand.java:129) */ Configuration config = new ConfigurationBuilder().eviction() .maxEntries(capacity).strategy(evictionStrategy) //.expiration().wakeUpInterval(5000L).maxIdle(120000L) //.clustering().cacheMode(CacheMode.REPL_SYNC) .locking().lockAcquisitionTimeout(10000L).useLockStriping(false) .transaction().transactionMode(TransactionMode.TRANSACTIONAL).lockingMode(LockingMode.PESSIMISTIC) .build(); _manager.defineConfiguration(cacheName, config); Cache infinispanCache = _manager.getCache(cacheName); return infinispanCache; } /** * It creates several Tasks that are trying to modify the same objects. * It verifies that all tasks deal with consistent data. * * @throws Exception */ public void testLock() throws Exception { // do not try to be too smart - unlocking the Object // increases the possibility of getting // org.infinispan.util.concurrent.TimeoutException: Unable to acquire lock after [10 seconds] // The tests show that only first two Threads could get through // all others will fail with that exception. // Strangely enough, but most of Infinispan unit tests do not run more than two Threads int nThreads = 4; long stepDelayMsec = 10; doTestLock(true, nThreads, stepDelayMsec); } /** * Same as previous but only with 2 Tasks. * * @throws Exception */ public void testLockTwoTasks() throws Exception { int nThreads = 2; long stepDelayMsec = 10; doTestLock(true, nThreads, stepDelayMsec); } /** * It creates several Tasks that are trying to modify the same objects. * It verifies that all tasks deal with consistent data. * * Similar to the previous test but it doesn't explicitly unlock locked resource * The resource is unlocked by the transaction * * @throws Exception */ public void testLockNoExplicitUnlock() throws Exception { int nThreads = 10; long stepDelayMsec = 10; // each process takes 30 * stepDelayMsec msecs doTestLock(false, nThreads, stepDelayMsec); } public void testLockNoExplicitUnlockTwoTasks() throws Exception { int nThreads = 2; long stepDelayMsec = 10; doTestLock(false, nThreads, stepDelayMsec); } /** * LongProcess - longer than lockAcquisitionTimeout */ public void __testLockNoExplicitUnlockLongProcess() throws Exception { int nThreads = 10; long stepDelayMsec = 500; // 30 * 500 = 15000 longer than lockAcquisitionTimeout doTestLock(false, nThreads, stepDelayMsec); } public void __testLockNoExplicitUnlockTwoTasksLongProcess() throws Exception { int nThreads = 2; long stepDelayMsec = 5000; doTestLock(false, nThreads, stepDelayMsec); } /** * As long as every task is no longer than lockAcquisitionTimeout it works fine * and nobody fails with org.infinispan.util.concurrent.TimeoutException * * But if it gets longer then the threads will start falling with that exception * * The total time of one task is * 10 * stepDelayMsec * 3 * where 10 is number of objects * * * @param withUnlock * @throws Exception */ private void doTestLock(boolean withUnlock, int nThreads, long stepDelayMsec) throws Exception { String cacheName = "FooLookupCache"; int cacheSize = 64; Cache cache = getCache(cacheName, cacheSize); int numObjects = 10; // Add to the cache entries: ("1", "value"), ("2", "value"), .. for (int key=1; key<=numObjects; key++) { cache.put("" + key, "value"); } //int nThreads = 10; List> results = new ArrayList>(nThreads); ExecutorService executor = Executors.newFixedThreadPool(nThreads); for (int i=1; i<=nThreads; i++) { results.add( executor.submit(new Worker(i, cache, numObjects, withUnlock, stepDelayMsec)) ); } for ( Future next : results ) { Boolean success = next.get(); assertTrue("All worker should complete without exceptions", success); } System.out.format("TEST COMPLETED SUCESSFULY (withUnlock=%s, nThreads=%d, stepDelayMsec=%d)\n", withUnlock, nThreads, stepDelayMsec); } /** * * A task that "renames" all cached objects from 1 to numObjects. * The task verifies that all object have the same name (==consistent) before and after that renaming. * The consistency is provided by Infinispan locks. * */ private static class Worker implements Callable { private static String lockKey = "0"; // there is no cached Object with such key private final Cache cache; private final int numObjects; private final boolean withUnlock; private final long stepDelayMsec; private final int index; private final String title; public Worker(int index, final Cache cache, int numObjects, boolean withUnlock, long stepDelayMsec) { this.index = index; this.cache = cache; this.numObjects = numObjects; this.withUnlock = withUnlock; this.stepDelayMsec = stepDelayMsec; title = "Worker_" + index + "_unlock=" + withUnlock; } protected void debug(String method, String msg) { //log(method, msg); } protected void log(String method, String msg) { System.out.format("%s [%s] : %s : %s\n", new Date(), title, method, msg); } @Override public Boolean call() throws Exception { boolean success = false; try { doRun(); success = true; } catch (Throwable t) { t.printStackTrace(); success = false; } return new Boolean(success); } private void doRun() throws Exception { final String methodName = "run"; TransactionManager mgr = cache.getAdvancedCache().getTransactionManager(); if ( null == mgr ) { throw new UnsupportedOperationException("TransactionManager was not configured for the cache " + cache.getName()); } mgr.begin(); try { if ( acquireLock() ) { log(methodName, "acquired lock"); validateCache("before"); // renaming all Objects from 1 to numObjects String newName = "value-" + index; debug(methodName, "Changing value to " + newName); for (int key=1; key<=numObjects; key++) { cache.put("" + key, newName); Thread.sleep(stepDelayMsec); } validateCache("after"); if (withUnlock) { unlock(lockKey); } } else { log(methodName, "Failed to acquired lock"); } mgr.commit(); } catch (Exception t ) { mgr.rollback(); throw t; } //log(methodName, "Done"); } private boolean acquireLock() { return cache.getAdvancedCache().lock(lockKey); } private boolean unlock(String resourceId) { LockManager lockManager = cache.getAdvancedCache().getLockManager(); Object lockOwner = lockManager.getOwner(resourceId); Collection keys = Collections.singletonList(resourceId); lockManager.unlock(keys, lockOwner); return true; } /** * Checks if all cache entries are consistent * @param step * @throws InterruptedException */ private void validateCache(String step) throws InterruptedException { String methodName = "validateCache_" + step; String value = getCachedValue(1); for (int key=1; key<=numObjects; key++) { String nextValue = getCachedValue(key); if ( ! value.equals(nextValue) ) { String msg = String.format("Cache inconsistent: value=%s, nextValue=%s", value, nextValue); log(methodName, msg); throw new ConcurrentModificationException(msg); } Thread.sleep(stepDelayMsec); } debug(methodName, "passed: " + value); } private String getCachedValue(int index) { String value = cache.get("" + index); if ( null == value ) { throw new ConcurrentModificationException("Missed entry for " + index); } return value; } } /** * * @param args * @throws InterruptedException */ public static void main(String[] args) throws InterruptedException { (new TestRunner()).doRun(new TestSuite(LockTest.class)); } }