Index: lucene-directory/src/test/java/org/infinispan/lucene/testutils/SingleThreadScopingCacheManagerFactory.java =================================================================== --- lucene-directory/src/test/java/org/infinispan/lucene/testutils/SingleThreadScopingCacheManagerFactory.java (revision 0) +++ lucene-directory/src/test/java/org/infinispan/lucene/testutils/SingleThreadScopingCacheManagerFactory.java (revision 0) @@ -0,0 +1,126 @@ +/* + * JBoss, Home of Professional Open Source. + * Copyright 2009, Red Hat Middleware LLC, and individual contributors + * as indicated by the @author tags. See the copyright.txt file in the + * distribution for a full listing of individual contributors. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.infinispan.lucene.testutils; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.SynchronousQueue; + +import org.infinispan.config.Configuration; +import org.infinispan.manager.CacheManager; +import org.infinispan.test.fwk.TestCacheManagerFactory; + +/** + * A factory for CacheManagers, making sure all CacheManager are created by the same thread even if invokers are in different threads. + * They all share a Configuration. + * + * This class is only needed as a workaround for ISPN-261 + * + * @author Sanne Grinovero + * @since 4.0 + */ +public final class SingleThreadScopingCacheManagerFactory { + + private final BlockingQueue requests = new SynchronousQueue(); + private final BlockingQueue results = new SynchronousQueue(); + private final ExecutorService executor = Executors.newFixedThreadPool(1); + private final Configuration cfg; + private boolean started = false; + private boolean stopped = false; + + /** + * Create a new SingleThreadScopingCacheManagerFactory. + * + * @param cfg defines the default configuration + */ + public SingleThreadScopingCacheManagerFactory(Configuration cfg) { + if (cfg==null) throw new IllegalArgumentException("configuration is mandatory"); + this.cfg = cfg; + } + + /** + * Create a new clustered CacheManager using the configuration given to constructor + * @return + * @throws InterruptedException + */ + public synchronized CacheManager createClusteredCacheManager() throws InterruptedException { + checkRunning(); + requests.put(cfg); + return results.take(); + } + + /** + * Create a new clustered CacheManager using the specified configuration + * + * @param cfg the configuration to use + * @return + * @throws InterruptedException + */ + public synchronized CacheManager createClusteredCacheManager(Configuration cfg) throws InterruptedException { + checkRunning(); + requests.put(cfg); + return results.take(); + } + + public synchronized void start() { + if (started) + throw new IllegalStateException("was already started"); + if (stopped) + throw new IllegalStateException("was already stopped"); + started = true; + executor.execute(new Worker()); + } + + public synchronized void stop() { + if (stopped) + throw new IllegalStateException("was already stopped"); + stopped = true; + executor.shutdownNow(); + } + + private void checkRunning() { + if (!started) + throw new IllegalStateException("was not started"); + if (stopped) + throw new IllegalStateException("was already stopped"); + } + + private class Worker implements Runnable { + + @Override + public void run() { + boolean running = true; + while (running) { + try { + Configuration configuration = requests.take(); + CacheManager cacheManager = TestCacheManagerFactory.createClusteredCacheManager(configuration); + results.put(cacheManager); + } catch (InterruptedException e) { + running = false; + } + } + } + + } + +} Property changes on: lucene-directory/src/test/java/org/infinispan/lucene/testutils/SingleThreadScopingCacheManagerFactory.java ___________________________________________________________________ Added: svn:keywords + Id Revision Added: svn:eol-style + LF Index: lucene-directory/src/test/java/org/infinispan/lucene/Try.java =================================================================== --- lucene-directory/src/test/java/org/infinispan/lucene/Try.java (revision 0) +++ lucene-directory/src/test/java/org/infinispan/lucene/Try.java (revision 0) @@ -0,0 +1,138 @@ +/* + * JBoss, Home of Professional Open Source. + * Copyright 2009, Red Hat Middleware LLC, and individual contributors + * as indicated by the @author tags. See the copyright.txt file in the + * distribution for a full listing of individual contributors. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.infinispan.lucene; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.infinispan.Cache; +import org.infinispan.lucene.testutils.SingleThreadScopingCacheManagerFactory; +import org.infinispan.manager.CacheManager; +import org.infinispan.test.TestingUtil; +import org.infinispan.test.fwk.TestCacheManagerFactory; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +/** + * Try it. The first test should pass, proving the configuration is fine; + * the second test fails, the different caches are not joining (confirmed by jgroups trace). + * You can enable a "workaround" as in comments of createCache() + * + * @author Sanne Grinovero + * @since 4.0 + */ +public class Try { + + private static final int CACHES_NUMBER = 3; + private final SingleThreadScopingCacheManagerFactory cacheFactory = new SingleThreadScopingCacheManagerFactory(CacheTestSupport.createTestConfiguration()); + private final List> caches = Collections.synchronizedList(new ArrayList>()); + private final List cacheManagers = Collections.synchronizedList(new ArrayList()); + + @Test + public void inThreadInizializationTest() throws InterruptedException { + Cache cache = null; + for (int i = 0; i < CACHES_NUMBER; i++) { + cache = createCache(); + } + assert countMembersSeen(cache) == CACHES_NUMBER; + } + + @Test + public void parallelInizializationTest() throws InterruptedException, ExecutionException { + ExecutorService executorService = Executors.newFixedThreadPool(CACHES_NUMBER); + List> futures = new ArrayList>(CACHES_NUMBER); + for (int i = 0; i < CACHES_NUMBER; i++) { + futures.add(executorService.submit(new ConcurrentManagerInitialize())); + } + executorService.shutdown(); + for (Future future : futures) { + int res = future.get().intValue(); + assert res == CACHES_NUMBER; + } + } + + private Cache createCache() throws InterruptedException { + + // when using the cacheFactory both tests work: + // CacheManager manager = cacheFactory.createClusteredCacheManager(); + + // when creating the CacheManager directly, the second test fails: + CacheManager manager = TestCacheManagerFactory.createClusteredCacheManager(CacheTestSupport.createTestConfiguration()); + + cacheManagers.add(manager); + Cache cache = manager.getCache(); + caches.add(cache); + return cache; + } + + private static int countMembersSeen(Cache cache) { + int members = 0; + CacheManager manager = cache.getCacheManager(); + for (int i = 0; i < 8; i++) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + System.err.println("Interrupted"); + } + members = Math.max(members, manager.getMembers().size()); + System.out.println("\tmembers:\t" + manager.getMembers() + "\taddress:\t" + manager.getAddress()); + } + return members; + } + + private class ConcurrentManagerInitialize implements Callable { + + @Override + public Integer call() throws Exception { + return countMembersSeen(createCache()); + } + + } + + @AfterMethod + public void cleanup() { + try { + TestingUtil.killCaches(caches); + } finally { + TestingUtil.killCacheManagers(cacheManagers); + } + } + + @BeforeClass + public void beforeTest() { + cacheFactory.start(); + } + + @AfterClass + public void afterTest() { + cacheFactory.stop(); + } + +} Property changes on: lucene-directory/src/test/java/org/infinispan/lucene/Try.java ___________________________________________________________________ Added: svn:keywords + Id Revision Added: svn:eol-style + LF