import java.io.IOException; import java.net.InetAddress; import java.util.Collections; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import org.jgroups.Global; import org.jgroups.JChannel; import org.jgroups.Message; import org.jgroups.blocks.MessageDispatcher; import org.jgroups.blocks.RequestHandler; import org.jgroups.blocks.RequestOptions; import org.jgroups.blocks.locking.LockService; import org.jgroups.logging.Log; import org.jgroups.logging.LogFactory; import org.jgroups.protocols.CENTRAL_LOCK; import org.jgroups.protocols.DISCARD; import org.jgroups.protocols.MERGE2; import org.jgroups.protocols.PING; import org.jgroups.protocols.RSVP; import org.jgroups.protocols.SHARED_LOOPBACK; import org.jgroups.protocols.UNICAST2; import org.jgroups.protocols.pbcast.GMS; import org.jgroups.protocols.pbcast.NAKACK2; import org.jgroups.stack.DiagnosticsHandler; import org.jgroups.stack.ProtocolStack; import org.jgroups.util.DefaultSocketFactory; import org.jgroups.util.DefaultThreadFactory; import org.jgroups.util.RspList; import org.jgroups.util.SocketFactory; import org.jgroups.util.ThreadFactory; import org.jgroups.util.TimeScheduler; import org.jgroups.util.TimeScheduler2; import org.jgroups.util.Util; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @Test(groups = Global.FUNCTIONAL, sequential = true) public class RpcLockingTest { protected static final int NUM = 2; // number of members protected final JChannel[] channels = new JChannel[NUM]; protected final MessageDispatcher[] dispatchers = new MessageDispatcher[NUM]; protected MyDiagnosticsHandler handler; protected ThreadPoolExecutor oob_thread_pool; protected ThreadPoolExecutor thread_pool; protected Lock[] locks = new Lock[NUM]; @BeforeMethod void setUp() throws Exception { handler = new MyDiagnosticsHandler( InetAddress.getByName("224.0.75.75"), 7500, LogFactory.getLog(DiagnosticsHandler.class), new DefaultSocketFactory(), new DefaultThreadFactory("", false)); handler.start(); TimeScheduler timer = new TimeScheduler2(new DefaultThreadFactory( "Timer", true, true), 5, 20, 3000, 5000, "abort"); oob_thread_pool = new ThreadPoolExecutor(5, Math.max(5, NUM / 4), 3000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(NUM * NUM)); oob_thread_pool .setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); thread_pool = new ThreadPoolExecutor(5, Math.max(5, NUM / 4), 3000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(NUM * NUM)); thread_pool .setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); System.out.print("Connecting channels: "); for (int i = 0; i < NUM; i++) { SHARED_LOOPBACK shared_loopback = (SHARED_LOOPBACK) new SHARED_LOOPBACK() .setValue("enable_bundling", false); shared_loopback.setLoopback(false); shared_loopback.setTimer(timer); shared_loopback.setOOBThreadPool(oob_thread_pool); shared_loopback.setDefaultThreadPool(thread_pool); shared_loopback.setDiagnosticsHandler(handler); channels[i] = Util.createChannel( shared_loopback, new DISCARD(), new PING().setValue("timeout", 1000) .setValue("num_initial_members", NUM) .setValue("force_sending_discovery_rsps", true), new MERGE2().setValue("min_interval", 1000).setValue( "max_interval", 3000), new NAKACK2().setValue("use_mcast_xmit", false) .setValue("discard_delivered_msgs", true) .setValue("log_discard_msgs", false) .setValue("log_not_found_msgs", false), new UNICAST2().setValue("xmit_table_num_rows", 5).setValue( "xmit_interval", 300), new RSVP().setValue("timeout", 10000).setValue( "throw_exception_on_timeout", true), new GMS().setValue("print_local_addr", false) .setValue("leave_timeout", 100) .setValue("log_view_warnings", false) .setValue("view_ack_collection_timeout", 2000) .setValue("log_collect_msgs", false), new CENTRAL_LOCK()); channels[i].setName(String.valueOf((i + 1))); dispatchers[i] = new MessageDispatcher(channels[i], null, null); channels[i].connect("MessageDispatcherRSVPTest"); System.out.print(i + 1 + " "); if (i == 0) Util.sleep(1000); } Util.waitUntilAllChannelsHaveSameSize(30000, 1000, channels); System.out.println(""); } @AfterMethod void tearDown() throws Exception { for (int i = NUM - 1; i >= 0; i--) { ProtocolStack stack = channels[i].getProtocolStack(); String cluster_name = channels[i].getClusterName(); stack.stopStack(cluster_name); stack.destroy(); } handler.destroy(); } protected void init() { LockService[] lockServices = new LockService[NUM]; for (int i = 0; i < NUM; i++) { lockServices[i] = new LockService(channels[i]); locks[i] = lockServices[i].getLock("lock"); } dispatchers[0].setRequestHandler(new RequestHandler() { @Override public Object handle(Message arg0) throws Exception { System.out .println("Channel0 received a message, will now try to lock the lock"); if (locks[0].tryLock()) { Assert.fail("Should not be able to lock the lock here"); System.out .println("Channel0 aquired the lock, this shouldn't be possible"); } else { System.out .println("The lock was already locked, as it should be"); } return "Hello"; } }); dispatchers[1].setRequestHandler(new RequestHandler() { @Override public Object handle(Message arg0) throws Exception { System.out .println("Channel1 received a message, will now try to lock the lock"); if (locks[1].tryLock()) { Assert.fail("Should not be able to lock the lock here"); System.out .println("Channel1 aquired the lock, this shouldn't be possible"); } else { System.out .println("The lock already was locked, as it should be"); } return "Hello"; } }); // Print who is the coordinator if (channels[1].getView().getMembers().get(0) .equals(channels[1].getAddress())) { System.out.println("channel1 is the coordinator"); } else { System.out.println("channel0 is the coordinator"); } System.out.println(""); } /** * If the coordinator of the lock locks the lock and then send a message, * the receiver will wait for ever in tryLock. However, castMessage will * return after a while because of the default settings of * RequestOptions.SYNC(). * * @throws Exception */ public void testCoordSendFirst() throws Exception { init(); System.out.println("Running testCoordSendFirst"); // =========================================================================== // Channel0 locks the lock if (locks[0].tryLock()) { try { System.out .println("Channel0 aquired the lock, about to send message to Channel1"); RspList rsp = dispatchers[0].castMessage( Collections.singleton(channels[1].getAddress()), new Message(channels[1].getAddress(), "bla"), RequestOptions.SYNC()); if (rsp.getResults().isEmpty()) { System.err.println("ERROR: didn't return correctly"); Assert.fail("Didn't return correctly"); } else { System.out.println("Returned: " + rsp.getResults().get(0)); } } finally { // Channel0 unlocks the lock locks[0].unlock(); } } else { Assert.fail("The lock was already locked"); System.out.println("Channel0 failed to aquire the lock"); } // =========================================================================== System.out.println(); } /** * If the node that isn't the coordinator is the one who sends the message * it works, but later when the coordinator sends the message, the receiver, * will wait forever in tryLock. * * @throws Exception */ public void testCoordReceiveFirst() throws Exception { init(); System.out.println("Running testCoordReceiveFirst"); // =========================================================================== // Channel1 locks the lock if (locks[1].tryLock()) { try { System.out .println("Channel1 aquired the lock, about to send message to Channel0"); RspList rsp = dispatchers[1].castMessage( Collections.singleton(channels[0].getAddress()), new Message(channels[0].getAddress(), "bla"), RequestOptions.SYNC()); if (rsp.getResults().isEmpty()) { System.err.println("ERROR: didn't return correctly"); Assert.fail("Didn't return correctly"); } else { System.out.println("Returned: " + rsp.getResults().get(0)); } } finally { // Channel1 unlocks the lock locks[1].unlock(); } } else { Assert.fail("The lock was already locked"); System.out.println("Channel1 failed to aquire the lock"); } // =========================================================================== // =========================================================================== // Channel0 locks the lock if (locks[0].tryLock()) { try { System.out .println("Channel0 aquired the lock, about to send message to Channel1"); RspList rsp = dispatchers[0].castMessage( Collections.singleton(channels[1].getAddress()), new Message(channels[1].getAddress(), "bla"), RequestOptions.SYNC()); if (rsp.getResults().isEmpty()) { System.err.println("ERROR: didn't return correctly"); Assert.fail("Didn't return correctly"); } else { System.out.println("Returned: " + rsp.getResults().get(0)); } } finally { // Channel0 unlocks the lock locks[0].unlock(); } } else { Assert.fail("The lock was already locked"); System.out.println("Channel0 failed to aquire the lock"); } // =========================================================================== System.out.println(); } protected static class MyDiagnosticsHandler extends DiagnosticsHandler { protected MyDiagnosticsHandler(InetAddress diagnostics_addr, int diagnostics_port, Log log, SocketFactory socket_factory, ThreadFactory thread_factory) { super(diagnostics_addr, diagnostics_port, log, socket_factory, thread_factory); } public void start() throws IOException { super.start(); } public void stop() { } public void destroy() { super.stop(); } } }