Index: core/src/test/java/org/infinispan/tx/SizeTest.java =================================================================== --- core/src/test/java/org/infinispan/tx/SizeTest.java (revision ) +++ core/src/test/java/org/infinispan/tx/SizeTest.java (revision ) @@ -0,0 +1,279 @@ +package org.infinispan.tx; + +import org.infinispan.Cache; +import org.infinispan.manager.DefaultCacheManager; +import org.infinispan.manager.EmbeddedCacheManager; +import org.infinispan.test.MultipleCacheManagersTest; +import org.infinispan.test.TestingUtil; +import org.infinispan.tx.obj.BigObject; +import org.infinispan.tx.obj.SmallObject; +import org.infinispan.util.logging.Log; +import org.infinispan.util.logging.LogFactory; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.testng.Assert.assertEquals; + +/** + * // TODO: Document this + * + * @author Mircea.Markus@jboss.com + * @since 4.2 + */ +@Test(testName = "tx.SizeTest") +public class SizeTest extends MultipleCacheManagersTest { + + private static Log logger = LogFactory.getLog(SizeTest.class); + + @Override + protected void createCacheManagers() throws Throwable { + registerCacheManager((EmbeddedCacheManager) new DefaultCacheManager("test-cluster.xml")); + registerCacheManager((EmbeddedCacheManager) new DefaultCacheManager("test-cluster.xml")); + registerCacheManager((EmbeddedCacheManager) new DefaultCacheManager("test-cluster.xml")); + registerCacheManager((EmbeddedCacheManager) new DefaultCacheManager("test-cluster.xml")); + manager(0).getCache("cache_nojta_async"); + manager(1).getCache("cache_nojta_async"); + manager(2).getCache("cache_nojta_async"); + manager(3).getCache("cache_nojta_async"); + TestingUtil.blockUntilViewsReceived(10000, manager(0).getCache("cache_nojta_async"), manager(1).getCache("cache_nojta_async"), manager(2).getCache("cache_nojta_async"), manager(3).getCache("cache_nojta_async")); + } + + @Override + protected void assertSupportedConfig() { + //ignore + } + + public void testNoTxAsync() { + final int objectNumber = 10000; + String prefix = "some"; + boolean isSmallObject = false; + Cache cache = cache(0, "cache_nojta_async"); + + int startNumber = 1; + int endNumber = objectNumber; + int number = objectNumber + 1; + logger.debug(" ------- [" + startNumber + "] [" + endNumber + "] --------"); + + List list = createTobeCachedObjects(isSmallObject, prefix, startNumber, endNumber); + + long startTime = System.currentTimeMillis(); + logger.debug("before ingest objects, startTime: " + startTime); + + final Set names = new HashSet(); + if (isSmallObject) { + for (Object obj : list) { + String oName = ((SmallObject) obj).getName(); + cache.put(oName, obj); + names.add(oName); + } + } else { + for (Object obj : list) { + String oName = ((BigObject) obj).getName(); + names.add(oName); + cache.put(oName, obj);; + } + } + assertEquals(objectNumber, names.size()); + long now = System.currentTimeMillis(); + long ingestUsedTime = now - startTime; + logger.debug("after ingest objects, endTime: " + now + ", usedTime: " + ingestUsedTime); + + System.out.println(" ingestUsedTime = " + ingestUsedTime); + + eventually(new Condition() { + public boolean isSatisfied() throws Exception { + int cache0 = cache(0, "cache_nojta_async").size(); + System.out.println("cache0 = " + cache0); + int cache1 = cache(1, "cache_nojta_async").size(); + System.out.println("cache1 = " + cache1); + int cache2 = cache(2, "cache_nojta_async").size(); + System.out.println("cache2 = " + cache2); + int cache3 = cache(3, "cache_nojta_async").size(); + System.out.println("cache3 = " + cache3); + int sum = cache0 + cache1 + cache2 + cache3; + System.out.println("sum = " + sum); + return sum == 3 * objectNumber; + } + }); + + assertAllKeysExist(names, 3, 3); + + TestingUtil.killCacheManagers(manager(3)); + TestingUtil.blockForMemberToFail(1000, manager(0), manager(1), manager(2)); + eventually(new Condition() { + public boolean isSatisfied() throws Exception { + return assertAllKeysExist(names, 3, 2); + } + }); + + eventually(new Condition() { + public boolean isSatisfied() throws Exception { + int cache0 = cache(0, "cache_nojta_async").size(); + System.out.println("cache0 = " + cache0); + int cache1 = cache(1, "cache_nojta_async").size(); + System.out.println("cache1 = " + cache1); + int cache2 = cache(2, "cache_nojta_async").size(); + System.out.println("cache2 = " + cache2); + int sum = cache0 + cache1 + cache2; + System.out.println("sum = " + sum); + return sum == 3 * objectNumber; + } + }); + + assertAllKeysExist(names, 3, 2); + + TestingUtil.killCacheManagers(manager(2)); + TestingUtil.blockForMemberToFail(1000, manager(0), manager(1)); + + + eventually(new Condition() { + public boolean isSatisfied() throws Exception { + int cache0 = cache(0, "cache_nojta_async").size(); + System.out.println("cache0 = " + cache0); + int cache1 = cache(1, "cache_nojta_async").size(); + System.out.println("cache1 = " + cache1); + int sum = cache0 + cache1; + System.out.println("sum = " + sum); + return sum == 2 * objectNumber; + } + }); + + TestingUtil.killCacheManagers(manager(1)); + TestingUtil.blockForMemberToFail(1000, manager(0)); + + + eventually(new Condition() { + public boolean isSatisfied() throws Exception { + int cache0 = cache(0, "cache_nojta_async").size(); + System.out.println("cache0 = " + cache0); + return cache0 == objectNumber; + } + }); + + } + + private boolean assertAllKeysExist(Set names, int nrCopies, int maxIndex) { + for (String name : names) { + int count = 0; + String appear = name + " does not appear on: "; + for (int index = 0; index <= maxIndex; index++) { + boolean contains = advancedCache(index, "cache_nojta_async").getDataContainer().containsKey(name); + if (contains) { + count ++; + } else { + appear += advancedCache(index, "cache_nojta_async").getRpcManager().getAddress() + " "; + } + } + if (count != nrCopies) { + System.out.println(appear); + return false; + } + } + return true; + } + + + public List createTobeCachedObjects(boolean isSmallObject, String prefix, int startNum, int endNum) { + long startTime = System.currentTimeMillis(); + logger.info("before create objects, startTime: " + startTime); + List list = new ArrayList(endNum + 1 - startNum); + for (int i = startNum; i <= endNum; i++) { + list.add(createTobeCachedObject(isSmallObject, prefix, i)); + } + long now = System.currentTimeMillis(); + long createUsedTime = now - startTime; + logger.info("after create objects, endTime: " + now + ", usedTime: " + createUsedTime); + + return list; + } + + private Object createTobeCachedObject(boolean isSmallObject, String prefix, int num) { + if (isSmallObject) { + SmallObject obj = new SmallObject(); + //obj.setName("sname" + "-" + prefix + "-" + num); + obj.setName("<" + num + "|" + prefix + "|" + (num * 3) + "|" + (num * 7) + ">"); + obj.setValue("value" + num); + + return obj; + } else { + BigObject obj = new BigObject(); +// obj.setName("bname" + "-" + prefix + "-" + num); + obj.setName("[" + num + "|" + prefix + "|" + (num * 3) + "|" + (num * 7) + "]"); + obj.setValue("* [ISPN-307] - Lucene Directory not supporting Lucene's ConcurrentMergeScheduler\n" + num); + obj.setValue2("* [ISPN-514] - replace(Async) with eager locking\n" + num); + obj.setValue3("* [ISPN-546] - Lazy deserialization not working for replace() calls\n" + num); + obj.setValue4("* [ISPN-548] - QueryInterceptor needs to be able to differentiate update to the indexes from an add to the indexes.\n" + num); + obj.setValue5("* [ISPN-550] - Unnecessary parameter invalidation in server Main class leading to errors\n" + num); + obj.setValue6("* [ISPN-551] - ReplicationQueue might attempt to use a closed JGroupsTransport\n" + num); + obj.setValue7("* [ISPN-552] - Tree facade caches cannot be configured with lazy deserialization\n" + num); + obj.setValue8("* [ISPN-560] - Unable to address bytes above 2GB in Lucene Directory\n" + num); + obj.setValue9("* [ISPN-562] - HotRod Bulk Get response operation code is incorrect\n" + num); + obj.setValue10("* [ISPN-564] - ManagedConnectionFactory releaseConnection can throw NullPointerException\n" + num); + obj.setValue11("* [ISPN-565] - SKIP_LOCKING with cache loader and calling remove results in java.lang.ClassCastException\n" + num); + obj.setValue12("* [ISPN-568] - Lucene needs boundary checks in buffer copy of readBytes()\n" + num); + obj.setValue13("* [ISPN-569] - IllegalMonitorStateException thrown when releasing the lock\n" + num); + obj.setValue14("* [ISPN-570] - Locks not properly cleaned up when using explicit lock API, marshalled values and custom key types\n" + num); + obj.setValue15("* [ISPN-572] - Update RemoteCacheStore javadoc\n" + num); + obj.setValue16("* [ISPN-574] - Marshaller to stop before Transport to avoid NullPointerException\n" + num); + obj.setValue17("* [ISPN-577] - State transfer and replication queue ordering issue\n" + num); + obj.setValue18( + "* [ISPN-578] - BDB store logs license information relating to JBoss Cache instead of Infinispan\n" + + "* [ISPN-579] - JdbcStringBasedCacheStore preloads data using wrong keys (it shouldn't preload)\n" + + "* [ISPN-580] - neverending AsyncStore.awaitNotEmpty during stop() of AsyncStore\n" + + "* [ISPN-581] - SKIP_CACHE_STORE is ignored by CacheLoaderInterceptor\n" + + "* [ISPN-582] - Eviction with passivation introduces a race where entries are invisible to callers\n" + + "* [ISPN-583] - Lucene readlocks have to avoid being persisted in a store\n" + + "* [ISPN-586] - ManagedConnectionPool doesn't work in a transactional context\n" + + "* [ISPN-588] - Explicit locking has no effect in distributed mode\n" + + "* [ISPN-589] - The useEagerLocking config setting has no effect in distributed mode\n" + + "* [ISPN-590] - Lucene's segment files are flushed twice using SKIP_LOCK, implement proper index commit()\n" + + "* [ISPN-596] - Deadlock detection and explicit locking should be disallowed.\n" + + "* [ISPN-598] - Fix LIRS with passivation.\n" + + "* [ISPN-600] - Embedded JBossTS and TreeCache don't work together\n" + + "* [ISPN-601] - Second transactional read on a key with FORCE_WRITE_LOCK and cache store leads to NullPointerException\n" + + "* [ISPN-602] - need to store the chunk size of Lucene segments in the metadata\n" + + "* [ISPN-605] - Cannot add custom interceptor to the Cache programmatically\n" + + "* [ISPN-606] - asyncMarshalling should be turned off by default to avoid reordering\n" + num); + obj.setValue19( + "ÒI have had to tell everyone in my lab that when they feed their cells tomorrow morning, they better use media that has not been funded by the federal government,Ó said Dr." + + "George Q. Daley, director of the stem cell transplantation program at ChildrenÕs Hospital Boston, referring to food given to cells. ÒThis ruling means an immediate disruption " + + "of dozens of labs doing this work since the Obama administration made its order.Ó In his ruling, Chief Judge Royce C. Lamberth of Federal District Court for the District of " + + "Columbia wrote that his temporary injunction returned federal policy to the Òstatus quo,Ó but few officials, scientists or lawyers in the case were sure Monday night what that meant." + + "Dr. Daley was among those who said they believed that it meant that work financed under the new rules had to stop immediately; others said it meant that the health institutes had to " + + "use Bush administration rules for future grants. Steven H. Aden, senior counsel for the Alliance Defense Fund, which sued to stop the Obama administration rules, said the judgeÕs " + + "ruling Òmeans that for now the N.I.H. cannot issue funding grants to embryonic stem cell research projects without any further order from the court.Ó Officials at the health " + + "institutes said that lawyers at the Department of Justice would interpret the ruling for them. Tracy Schmaler, a spokeswoman for the Justice Department, wrote in an e-mail, ÒWeÕre " + + "reviewing the decision.Ó The judge ruled that the Obama administrationÕs policy was illegal because the administrationÕs distinction between work that leads to the destruction of " + + "embryos Ñ which cannot be financed by the federal government under the current policy Ñ and the financing of work using stem cells created through embryonic destruction was meaningless. " + + "In his ruling, he referred to embryonic stem cell research as E.S.C. ÒIf one step or Ôpiece of researchÕ of an E.S.C. research project results in the destruction of an embryo, " + + "the entire project is precluded from receiving federal funding,Ó wrote Judge Lamberth, who was appointed to the federal bench in 1987 by President Ronald Reagan. In other words, " + + "the neat lines that the government had drawn between the process of embryonic destruction and the results of that destruction are not valid, the judge ruled. For scientists, " + + "the problem with the judgeÕs reasoning is that it may render all scientific work regarding embryonic stem cells illegal Ñ including work allowed under the more restrictive policy " + + "adopted by President George W. Bush in 2001. For years, private financing has been used to create embryonic stem cell lines, mostly from discarded embryos from fertility clinics." + num); + obj.setValue20( + "The crowd that gathered was pro-Rangel, too. At one point, Jonathan Tasini, a candidate and a labor activist, explained matter-of-factly that Mr. Rangel had accepted large " + + "sums of money from political action committees. The crowd erupted in boos and jeers. A woman in a straw hat stood up and wagged her finger at Mr. Tasini. But Mr. RangelÕs " + + "problems kept intruding. Mr. Tasini told the crowd that, despite Mr. RangelÕs best intentions, he had fallen victim to a culture in Washington that was awash with corporate " + + "money and lobbyists. ÒThe corruption that Congressman Rangel is a part of is being in Congress for 40 years,Ó Mr. Tasini said. Mr. RangelÕs foes took pains to honor his " + + "40-year career in Congress. But they made clear they thought it was time for a change. ÒYes, he has done some good. He has a legacy,Ó Mr. Powell said. ÒBut he is no longer " + + "the chairman of the powerful Ways and Means Committee. He is no longer perceived as somebody that people in Congress want to work with.Ó Mr. Rangel infused his own remarks " + + "with his typical sarcasm and combativeness, taking a jab at those who had called on him to step aside, especially Mr. Powell. ÒAdam, is he here?Ó Mr. Rangel said, quickly " + + "surveying the room. ÒHe truly believes that I should resign, so that somebody else should take my place.Ó The crowd interrupted: ÒNo!Ó ÒHe is the only one to say this,Ó Mr. " + + "Rangel continued. ÒI think itÕs creative. ÒBut if itÕs O.K. with my doctor, I am going to serve the next two years.Ó Mr. Rangel also railed against the wars in Afghanistan " + + "and Iraq, saying American soldiers would not be there if not for the countryÕs voracious appetite for oil, and mocked Republicans in Congress for trying to block the presidentÕs " + + "agenda. And while some Democrats have said that he is creating an embarrassment for his party by refusing to step down, Mr. Rangel declared, ÒI wonÕt step aside when the people " + + "of my district believe this is what I should be doing.Ó While Mr. Powell, whose father once held the seat, was aggressive, the other candidates challenging Mr. Rangel were more " + + "subtle, though they echoed his essential message: Mr. RangelÕs era was over. ÒEveryone has their time,Ó said Joyce S. Johnson, a former field director in New York for Mr. " + + "ObamaÕs presidential campaign in 2008. ÒThere is a need for a new perspective, out-of-the-box thinking.Ó" + num); + + return obj; + + } + } + + +} Index: core/src/test/java/org/infinispan/commands/MyCustomVisitableCommand.java =================================================================== --- core/src/test/java/org/infinispan/commands/MyCustomVisitableCommand.java (revision ) +++ core/src/test/java/org/infinispan/commands/MyCustomVisitableCommand.java (revision ) @@ -0,0 +1,77 @@ +package org.infinispan.commands; + +import org.infinispan.context.InvocationContext; +import org.infinispan.factories.ComponentRegistry; +import org.infinispan.interceptors.base.CommandInterceptor; +import org.infinispan.marshall.Ids; +import org.infinispan.marshall.Marshallable; +import org.infinispan.marshall.exts.ReplicableCommandExternalizer; +import org.infinispan.remoting.rpc.RpcManager; +import org.infinispan.remoting.transport.Address; + +import java.io.Serializable; +import java.util.Map; + +/** + * // TODO: Document this + * + * @author Mircea.Markus@jboss.com + * @since 4.2 + */ +@Marshallable(externalizer = ReplicableCommandExternalizer.class, id = Ids.GENERIC_VISITABLE_COMMAND) +public class MyCustomVisitableCommand implements CustomVisitableCommand, Serializable { + public static final byte COMMAND_ID = Ids.GENERIC_VISITABLE_COMMAND; + private boolean shouldInvoke; + private Map params; + private RpcManager rpcManager; + + public MyCustomVisitableCommand(boolean shouldInvoke, Map params) { + this.shouldInvoke = shouldInvoke; + this.params = params; + } + + public MyCustomVisitableCommand() { + // TODO: Customise this generated block + } + + public Object acceptVisitor(InvocationContext ctx, Visitor visitor) throws Throwable { + if (visitor instanceof MyCustomVisitor) { + System.out.println("rpcManager = " + rpcManager.getAddress()); + return ((MyCustomVisitor) visitor).visitMyCustomVisitableCommand(ctx, this); + } + CommandInterceptor ci = (CommandInterceptor) visitor; + if (ci != null) { + return ci.invokeNextInterceptor(ctx, this); + } + return null; + } + + public boolean shouldInvoke(InvocationContext ctx) { + return shouldInvoke; + } + + public Object perform(InvocationContext ctx) throws Throwable { + return null; + } + + public byte getCommandId() { + return Ids.GENERIC_VISITABLE_COMMAND; + } + + public Map getParams() { + return params; + } + + public Object[] getParameters() { + return new Object[]{shouldInvoke, params}; + } + + public void setParameters(int commandId, Object[] parameters) { + this.shouldInvoke = (Boolean)parameters[0]; + this.params = (Map) parameters[1]; + } + + public void init(ComponentRegistry cr) { + rpcManager = cr.getComponent(RpcManager.class); + } +} Index: core/src/main/java/org/infinispan/marshall/Ids.java =================================================================== --- core/src/main/java/org/infinispan/marshall/Ids.java (revision 2372) +++ core/src/main/java/org/infinispan/marshall/Ids.java (revision ) @@ -117,4 +117,6 @@ static final byte BYTE_ARRAY_KEY = 57; static final byte TOPOLOGY_ADDRESS = 58; static final byte TOPOLOGY_VIEW = 59; + static final byte GENERIC_VISITABLE_COMMAND = 60; + } Index: core/src/test/java/org/infinispan/commands/GenericVisitableCommandTest.java =================================================================== --- core/src/test/java/org/infinispan/commands/GenericVisitableCommandTest.java (revision ) +++ core/src/test/java/org/infinispan/commands/GenericVisitableCommandTest.java (revision ) @@ -0,0 +1,53 @@ +package org.infinispan.commands; + +import org.infinispan.config.Configuration; +import org.infinispan.context.InvocationContext; +import org.infinispan.marshall.Externalizer; +import org.infinispan.marshall.Ids; +import org.infinispan.marshall.StreamingMarshaller; +import org.infinispan.marshall.exts.ReplicableCommandExternalizer; +import org.infinispan.marshall.jboss.ConstantObjectTable; +import org.infinispan.test.MultipleCacheManagersTest; +import org.infinispan.test.TestingUtil; +import org.jboss.marshalling.Marshaller; +import org.jboss.marshalling.ObjectTable; +import org.jboss.marshalling.Unmarshaller; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; + +/** + * @author Mircea.Markus@jboss.com + * @since 4.2 + */ +@Test(testName = "commands.GenericVisitableCommandTest", groups = "functional") +public class GenericVisitableCommandTest extends MultipleCacheManagersTest { + private FooInterceptor interceptor; + + @Override + protected void createCacheManagers() throws Throwable { + addClusterEnabledCacheManagers(Configuration.CacheMode.REPL_SYNC, 2); + interceptor = new FooInterceptor(); + advancedCache(1).addInterceptor(interceptor, 1); + } + + public void testSuccessfulReplication() { + Map params = Collections.singletonMap("k","v"); + + advancedCache(0).getRpcManager().broadcastRpcCommand(new MyCustomVisitableCommand(true, params), true); + assert interceptor.received.get("k").equals("v"); + } + + public static class FooInterceptor extends MyCustomVisitor { + Map received; + + @Override + public Object visitMyCustomVisitableCommand(InvocationContext ctx, MyCustomVisitableCommand mcvc) throws Throwable { + received = mcvc.getParams(); + return invokeNextInterceptor(ctx, mcvc); + } + } + +} Index: core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java =================================================================== --- core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java (revision 2372) +++ core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java (revision ) @@ -54,6 +54,7 @@ import org.infinispan.context.InvocationContextContainer; import org.infinispan.distribution.ConsistentHash; import org.infinispan.distribution.DistributionManager; +import org.infinispan.factories.ComponentRegistry; import org.infinispan.factories.annotations.Inject; import org.infinispan.factories.annotations.Start; import org.infinispan.interceptors.InterceptorChain; @@ -69,6 +70,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; /** @@ -97,11 +99,12 @@ private InvocationContextContainer icc; private TransactionTable txTable; private Configuration configuration; + private ComponentRegistry componentRegistry; @Inject public void setupDependencies(DataContainer container, CacheNotifier notifier, Cache cache, InterceptorChain interceptorChain, DistributionManager distributionManager, - InvocationContextContainer icc, TransactionTable txTable, Configuration configuration) { + InvocationContextContainer icc, TransactionTable txTable, Configuration configuration, ComponentRegistry cr) { this.dataContainer = container; this.notifier = notifier; this.cache = cache; @@ -110,6 +113,7 @@ this.icc = icc; this.txTable = txTable; this.configuration = configuration; + this.componentRegistry = cr; } @Start(priority = 1) @@ -311,6 +315,9 @@ rcc.init(distributionManager, configuration, dataContainer, this); break; default: + if (c instanceof CustomVisitableCommand) { + ((CustomVisitableCommand) c).init(componentRegistry); + } if (trace) log.trace("Nothing to initialize for command: " + c); } Index: core/src/main/java/org/infinispan/commands/CustomVisitableCommand.java =================================================================== --- core/src/main/java/org/infinispan/commands/CustomVisitableCommand.java (revision ) +++ core/src/main/java/org/infinispan/commands/CustomVisitableCommand.java (revision ) @@ -0,0 +1,13 @@ +package org.infinispan.commands; + +import org.infinispan.factories.ComponentRegistry; + +/** + * // TODO: Document this + * + * @author Mircea.Markus@jboss.com + * @since 4.2 + */ +public interface CustomVisitableCommand extends VisitableCommand { + public void init(ComponentRegistry cr); +} Index: core/src/test/java/org/infinispan/commands/MyCustomVisitor.java =================================================================== --- core/src/test/java/org/infinispan/commands/MyCustomVisitor.java (revision ) +++ core/src/test/java/org/infinispan/commands/MyCustomVisitor.java (revision ) @@ -0,0 +1,15 @@ +package org.infinispan.commands; + +import org.infinispan.context.InvocationContext; +import org.infinispan.interceptors.base.CommandInterceptor; + +/** + * // TODO: Document this + * + * @author Mircea.Markus@jboss.com + * @since 4.2 + */ +public abstract class MyCustomVisitor extends CommandInterceptor { + + public abstract Object visitMyCustomVisitableCommand(InvocationContext ctx, MyCustomVisitableCommand mcvc) throws Throwable; +}