package com.sample; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.List; import org.drools.KnowledgeBase; import org.drools.KnowledgeBaseFactory; import org.drools.common.AbstractRuleBase; import org.drools.impl.InternalKnowledgeBase; import org.drools.process.core.datatype.impl.type.IntegerDataType; import org.drools.process.core.datatype.impl.type.ObjectDataType; import org.drools.runtime.StatefulKnowledgeSession; import org.drools.runtime.process.ProcessContext; import org.jbpm.bpmn2.xml.XmlBPMNProcessSerializer; import org.jbpm.process.instance.impl.Action; import org.jbpm.ruleflow.core.RuleFlowProcess; import org.jbpm.ruleflow.core.RuleFlowProcessFactory; import org.jbpm.workflow.core.WorkflowProcess; import org.jbpm.workflow.core.node.Split; public class CronusTest { KnowledgeBase m_kbase; StatefulKnowledgeSession m_ksession; static final String DEPLOY_PROCESS = "cronus.deploy.process"; static final String SYNC_MANIFEST = "cronus.sync.manifest"; static final String DEPLOY_MANIFEST = "cronus.deploy.manifest"; private static class WorkingSet { ArrayList servers = new ArrayList(); String manifest; } void registerProcess(WorkflowProcess process) { ((AbstractRuleBase) ((InternalKnowledgeBase) m_kbase).getRuleBase()).addProcess(process); } void init() { m_kbase = KnowledgeBaseFactory.newKnowledgeBase(); m_ksession = m_kbase.newStatefulKnowledgeSession(); } private static RuleFlowProcess getDeployProcess() { final RuleFlowProcessFactory factory = RuleFlowProcessFactory .createProcess(DEPLOY_PROCESS); factory // header .name(DEPLOY_PROCESS) .packageName(DEPLOY_PROCESS) // process variable(s) .variable("servers",new ObjectDataType(Collection.class.getCanonicalName())) .variable("workingSets",new ObjectDataType(Collection.class.getCanonicalName())) // Process-scope variables .variable("shouldActivate",new ObjectDataType(Boolean.class.getCanonicalName())) // start and other nodes .startNode(1).name("StartDeploy").done() // Identify new deployment which returns Collection .actionNode(10).action( new Action() {public void execute(ProcessContext context) throws Exception { ArrayList servers = new ArrayList(); servers.add("1"); servers.add("2"); servers.add("3"); servers.add("4"); servers.add("5"); servers.add("6"); context.setVariable("servers", servers); System.out.println("Identify new deployment: "+servers.size()+" servers."); }}).done() .actionNode(20).action( new Action() {public void execute(ProcessContext context) throws Exception { ArrayList workingSets = new ArrayList(); List servers = (List) context.getVariable("servers"); int i=0; for (Iterator it=servers.iterator(); it.hasNext(); ) { WorkingSet s = new WorkingSet(); s.servers.add(it.next()); s.servers.add(it.next()); s.manifest = "Manifest"+(i++); workingSets.add(s); } context.setVariable("workingSets", workingSets); System.out.println("Identify working sets: "+workingSets.size()); }}).done() // for each working set, call manifest sync workflow serially .forEachNode(30) .collectionExpression("workingSets") .variable("workingSet", new ObjectDataType(WorkingSet.class.getCanonicalName())) // .variable("localShouldActivate", new ObjectDataType(Boolean.class.getCanonicalName())) .waitForCompletion(true) .startNode(301).name("StartForEachWorkingSet").done() .actionNode(4000) .action(new Action() { @Override public void execute(ProcessContext context) throws Exception { ArrayList ws = (ArrayList)context.getVariable("workingSets"); for (WorkingSet s : ws) { System.out.println("--"+s); } System.out.println("got a total of "+ws.size()+" working sets"); System.out.println("+++"+context.getVariable("workingSet")); System.out.println((context.getVariable("shouldActivate")==Boolean.TRUE)?"true": "false"); } }) .done() // call manifest sync process .subProcessNode(302) .name(SYNC_MANIFEST) .processId(SYNC_MANIFEST) .waitForCompletion(true) .inMapping("workingSet", "workingSet") .done() // check if we should do manifest activate // ... .actionNode(1000).name("shouldActivate") .action(new Action() { @Override public void execute(ProcessContext context) throws Exception { System.out.println("Test splitnode on XOR. Intentionally set it to false to go the false branch, but it doesn't seem to work."); context.setVariable("shouldActivate", Boolean.FALSE); // context.setVariable("localShouldActivate", Boolean.TRUE); } }) .done() // should activate? .splitNode(1010).type(Split.TYPE_XOR).name("splitOnShouldActivate") .constraint(305, "should activate", "code", "Java", "(Boolean)(context.getVariable(\"shouldActivate\")) == Boolean.TRUE") .constraint(2000, "should not activate", "code", "Java", "(Boolean)(context.getVariable(\"shouldActivate\")) == Boolean.FALSE") .done() // call manifest activate process .subProcessNode(305) .name(DEPLOY_MANIFEST) .processId(DEPLOY_MANIFEST) // .waitForCompletion(true) .inMapping("workingSet", "workingSet") .done() .joinNode(2000).type(Split.TYPE_XOR).done() // must set terminate to false in the for-each loop .endNode(307).name("EndForEachWorkingSet").terminate(false).done() // split node connection // split on true - continue in the loop .connection(1010, 305) .connection(305, 2000) // split on false - skip over .connection(1010, 2000) // from join to end for each .connection(2000, 307) .connection(301, 4000) .connection(4000, 302) .connection(302, 1000) .connection(1000, 1010) .done() // done for each .endNode(40).name("End").terminate(false).done() .connection(1, 10) .connection(10, 20) .connection(20, 30) .connection(30, 40) ; return toProcess(factory); } private static RuleFlowProcess getSyncManifest() { RuleFlowProcessFactory factory = RuleFlowProcessFactory .createProcess(SYNC_MANIFEST); factory .name(SYNC_MANIFEST).packageName(SYNC_MANIFEST) // Process input variable(s) .variable("localWorkingSet", new ObjectDataType(WorkingSet.class.getCanonicalName())) .variable("percentComplete", new IntegerDataType()) .variable("numFailures", new IntegerDataType()) .variable("manifest",new ObjectDataType()) .variable("servers",new ObjectDataType(Collection.class.getCanonicalName())) .startNode(1).name("Start").done() .actionNode(2).action( new Action() {public void execute(ProcessContext context) throws Exception { WorkingSet workingSet = (WorkingSet) context.getVariable("workingSet"); System.out.println("Start sub-process manifest sync: "+workingSet.manifest); context.setVariable("manifest", new String("manifest")); }}).done() .actionNode(3).action( new Action() {public void execute(ProcessContext context) throws Exception { WorkingSet workingSet = (WorkingSet) context.getVariable("workingSet"); System.out.println("get servers for manifest "+workingSet.manifest); context.setVariable("servers", workingSet.servers); }}).done() // first loop with activation .forEachNode(4) .collectionExpression("servers") .variable("server",new ObjectDataType()) // .waitForCompletion(true) .startNode(401).name("Start ManifestSync::triggerDeployManifest").done() .actionNode(402).action( new Action() {public void execute(ProcessContext context) throws Exception { String server = (String) context.getVariable("server"); System.out.println("Trigger activate manifest for server: "+server); }}).done() .endNode(403).name("End").terminate(false).done() .connection(401, 402) .connection(402, 403) .done() // second loop with waiting for results of activation .forEachNode(5) .collectionExpression("servers") .variable("server",new ObjectDataType()) // .waitForCompletion(true) .startNode(501).done() .actionNode(502).action( new Action() {public void execute(ProcessContext context) throws Exception { String server = (String) context.getVariable("server"); System.out.println("Wait activate manifest for server: "+server); }}).done() .endNode(503).name("End").terminate(false).done() .connection(501, 502) .connection(502, 503) .done() .endNode(6).name("EndForEachServer").done() .connection(1, 2) .connection(2, 3) .connection(3, 4) .connection(4, 5) .connection(5, 6) ; return toProcess(factory); } private static RuleFlowProcess getDeployManifest() { RuleFlowProcessFactory factory = RuleFlowProcessFactory .createProcess(DEPLOY_MANIFEST); factory.name(DEPLOY_MANIFEST).packageName(DEPLOY_MANIFEST) // Process input variable(s) .variable("workingSet", new ObjectDataType(WorkingSet.class.getCanonicalName())) .variable("percentComplete", new IntegerDataType()) .variable("numFailures", new IntegerDataType()) .variable("manifest",new ObjectDataType()) .variable("servers",new ObjectDataType(Collection.class.getCanonicalName())) .startNode(1).name("Start").done() .actionNode(2).action( new Action() {public void execute(ProcessContext context) throws Exception { WorkingSet workingSet = (WorkingSet) context.getVariable("workingSet"); System.out.println("Start sub-process: deploy manifest "+workingSet.manifest); context.setVariable("manifest", new String("manifest")); }}).done() .actionNode(3).action( new Action() {public void execute(ProcessContext context) throws Exception { WorkingSet workingSet = (WorkingSet) context.getVariable("workingSet"); System.out.println("get servers for manifest "+workingSet.manifest); context.setVariable("servers", workingSet.servers); }}).done() // first loop with activation .forEachNode(4) .collectionExpression("servers") .variable("server",new ObjectDataType()) // .waitForCompletion(true) .startNode(401).done() .actionNode(402).action( new Action() {public void execute(ProcessContext context) throws Exception { String server = (String) context.getVariable("server"); System.out.println("Trigger deploy manifest for server "+server); }}).done() .endNode(403).name("End").terminate(false).done() .connection(401, 402) .connection(402, 403) .done() // second loop with waiting for results of activation .forEachNode(5) .collectionExpression("servers") .variable("server",new ObjectDataType()) // .waitForCompletion(true) .startNode(501).done() .actionNode(502).action( new Action() {public void execute(ProcessContext context) throws Exception { String server = (String) context.getVariable("server"); System.out.println("Wait deploy manifest for server "+server); }}).done() .endNode(503).name("End").terminate(false).done() .connection(501, 502) .connection(502, 503) .done() .endNode(6).name("End").done() .connection(1, 2) .connection(2, 3) .connection(3, 4) .connection(4, 5) .connection(5, 6) ; return toProcess(factory); } private void run() { m_ksession.startProcess(DEPLOY_PROCESS); } private static RuleFlowProcess toProcess(RuleFlowProcessFactory f) { RuleFlowProcess process = f.validate().getProcess(); process.setAutoComplete(true); String xml = XmlBPMNProcessSerializer.INSTANCE.dump(process, true); System.out.println(xml); return process; } public static void main(String[] argv) { CronusTest test = new CronusTest(); test.init(); test.registerProcess(getDeployProcess()); test.registerProcess(getSyncManifest()); test.registerProcess(getDeployManifest()); test.run(); } }