package org.jbpm.job.executor; import java.util.Collections; import java.util.Set; import java.util.Timer; import java.util.TimerTask; import java.util.TreeSet; import junit.framework.TestCase; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.hibernate.Session; import org.jbpm.JbpmConfiguration; import org.jbpm.JbpmContext; import org.jbpm.graph.def.ActionHandler; import org.jbpm.graph.def.ProcessDefinition; import org.jbpm.graph.exe.ExecutionContext; import org.jbpm.graph.exe.ProcessInstance; /** This test verifies that a Node with an action handler can propagate execution to the next node * that is an async="true" node that also has an action handler. * * An instance of the process below will run successfully if the action handler is removed from Node1, * or Node2 is not async="true". But the combination with Node1 * having and action handler that propagates the execution to the next node that is async="true" * fails with the exception described below. * * @author jpjohnson@digitalriver.com * * Originally this test case failed in jBPM 3.2.1 with the following exception * org.jbpm.JbpmException: token '1' can't be locked by 'job[1]' cause it's already locked by 'token[1]' at org.jbpm.graph.exe.Token.lock(Token.java:646) at org.jbpm.graph.def.Node.enter(Node.java:316) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.lang.reflect.Method.invoke(Unknown Source) at org.hibernate.proxy.pojo.cglib.CGLIBLazyInitializer.invoke(CGLIBLazyInitializer.java:157) at org.jbpm.graph.def.Node$$EnhancerByCGLIB$$cc789161.enter() at org.jbpm.graph.def.Transition.take(Transition.java:151) at org.jbpm.graph.def.Node.leave(Node.java:393) at org.jbpm.graph.def.Node.leave(Node.java:357) at org.jbpm.graph.exe.ExecutionContext.leaveNode(ExecutionContext.java:120) at org.jbpm.job.executor.SimpleAsyncProcessTest$AsyncAction.execute(SimpleAsyncProcessTest.java:57) at org.jbpm.graph.def.Action.execute(Action.java:122) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.lang.reflect.Method.invoke(Unknown Source) at org.hibernate.proxy.pojo.cglib.CGLIBLazyInitializer.invoke(CGLIBLazyInitializer.java:157) at org.jbpm.graph.def.Action$$EnhancerByCGLIB$$4852cc95.execute() at org.jbpm.graph.def.GraphElement.executeAction(GraphElement.java:255) at org.jbpm.graph.def.Node.execute(Node.java:338) at org.jbpm.graph.def.Node.enter(Node.java:318) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.lang.reflect.Method.invoke(Unknown Source) at org.hibernate.proxy.pojo.cglib.CGLIBLazyInitializer.invoke(CGLIBLazyInitializer.java:157) at org.jbpm.graph.def.Node$$EnhancerByCGLIB$$cc789161.enter() at org.jbpm.graph.def.Transition.take(Transition.java:151) at org.jbpm.graph.def.Node.leave(Node.java:393) at org.jbpm.graph.node.StartState.leave(StartState.java:70) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.lang.reflect.Method.invoke(Unknown Source) at org.hibernate.proxy.pojo.cglib.CGLIBLazyInitializer.invoke(CGLIBLazyInitializer.java:157) at org.jbpm.graph.def.Node$$EnhancerByCGLIB$$cc789161.leave() at org.jbpm.graph.exe.Token.signal(Token.java:194) at org.jbpm.graph.exe.Token.signal(Token.java:139) at org.jbpm.graph.exe.ProcessInstance.signal(ProcessInstance.java:270) at org.jbpm.job.executor.SimpleAsyncProcessTest.launchProcess(SimpleAsyncProcessTest.java:112) at org.jbpm.job.executor.SimpleAsyncProcessTest.testConsecutiveAycnActionHandlers(SimpleAsyncProcessTest.java:69) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.lang.reflect.Method.invoke(Unknown Source) at junit.framework.TestCase.runTest(TestCase.java:154) at junit.framework.TestCase.runBare(TestCase.java:127) at junit.framework.TestResult$1.protect(TestResult.java:106) at junit.framework.TestResult.runProtected(TestResult.java:124) at junit.framework.TestResult.run(TestResult.java:109) at junit.framework.TestCase.run(TestCase.java:118) at junit.framework.TestSuite.runTest(TestSuite.java:208) at junit.framework.TestSuite.run(TestSuite.java:203) at org.eclipse.jdt.internal.junit.runner.junit3.JUnit3TestReference.run(JUnit3TestReference.java:130) at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:460) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:673) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:386) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:196) */ public class SimpleAsyncProcessTest extends TestCase { private static final long serialVersionUID = 1L; static int nbrOfConcurrentProcessExecutions = 20; static int maxWaitTime = 20000; static Set collectedResults = Collections .synchronizedSet(new TreeSet()); protected static JbpmConfiguration jbpmConfiguration = JbpmConfiguration .getInstance("org/jbpm/jbpm.test.cfg.xml"); static { jbpmConfiguration.getJobExecutor().nbrOfThreads = 5; } protected JobExecutor jobExecutor; public static class AsyncAction implements ActionHandler { private static final long serialVersionUID = 1L; public void execute(ExecutionContext executionContext) throws Exception { // Normal would do stuff here // Leave via the default transition // When this test case was written, this method would throw when // it tried to enter Node2 because it would try to lock the token but // it is already locked. executionContext.leaveNode(); } } /** This test verifies that a process with two consecutive nodes with async="true" * and an action handler specified actually completes. * This test was initially written against jBPM 3.2.1. It would fail in Token.lock() when * attempting to propagate execution to Node2 */ public void testConsecutiveAycnActionHandlers() { jbpmConfiguration.createSchema(); deployProcess(); long processID = launchProcess(); processJobs(maxWaitTime); assertTrue(hasProcessInstanceEnded(processID)); jbpmConfiguration.createSchema(); } public void deployProcess() { ProcessDefinition processDefinition = ProcessDefinition .parseXmlString("" + "" + "" + "" + "" + "" + "" + "" + "" + "" + ""+ "" + ""+ ""+ ""); JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext(); try { jbpmContext.deployProcessDefinition(processDefinition); } finally { jbpmContext.close(); } } /** Create a new process instance * * @return The process instance ID */ public long launchProcess() { JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext(); try { ProcessInstance processInstance = jbpmContext .newProcessInstanceForUpdate("StandaloneSample"); processInstance.signal(); return processInstance.getId(); } finally { jbpmContext.close(); } } public boolean hasProcessInstanceEnded(long id) { JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext(); try { ProcessInstance processInstance = jbpmContext.getProcessInstance(id); return processInstance.hasEnded(); } finally { jbpmContext.close(); } } protected void startJobExecutor() { jobExecutor = jbpmConfiguration.getJobExecutor(); jobExecutor.start(); } private void processAllJobs(final long maxWait) { boolean jobsAvailable = true; // install a timer that will interrupt if it takes too long // if that happens, it will lead to an interrupted exception and the test // will fail TimerTask interruptTask = new TimerTask() { Thread testThread = Thread.currentThread(); public void run() { log .debug("test " + getName() + " took too long. going to interrupt..."); testThread.interrupt(); } }; Timer timer = new Timer(); timer.schedule(interruptTask, maxWait); try { while (jobsAvailable) { log .debug("going to sleep for 200 millis, waiting for the job executor to process more jobs"); Thread.sleep(200); jobsAvailable = areJobsAvailable(); } jobExecutor.stopAndJoin(); } catch (InterruptedException e) { fail("test execution exceeded treshold of " + maxWait + " milliseconds"); } finally { timer.cancel(); } } private int getNbrOfJobsAvailable() { int nbrOfJobsAvailable = 0; JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext(); try { Session session = jbpmContext.getSession(); Number jobs = (Number) session .createQuery("select count(*) from org.jbpm.job.Job").uniqueResult(); log.debug("there are '" + jobs + "' jobs currently in the job table"); if (jobs != null) { nbrOfJobsAvailable = jobs.intValue(); } } finally { jbpmContext.close(); } return nbrOfJobsAvailable; } protected boolean areJobsAvailable() { return (getNbrOfJobsAvailable() > 0); } protected void processJobs(long maxWait) { try { Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } startJobExecutor(); try { processAllJobs(maxWait); } catch (Exception e) { e.printStackTrace(); throw new RuntimeException(e); } finally { stopJobExecutor(); } } protected void stopJobExecutor() { if (jobExecutor != null) { try { jobExecutor.stopAndJoin(); } catch (InterruptedException e) { throw new RuntimeException( "waiting for job executor to stop and join got interrupted", e); } } } private static Log log = LogFactory.getLog(JobExecutorDbTest.class); }