@Data
@Role(Role.Type.EVENT)
@Timestamp("timestamp")
@Expires("30s")
public class DispatchEvent {
@Key
private String id;
private Long timestamp;
public DispatchEvent(String id) {
this.id = id;
this.timestamp = Instant.now().toEpochMilli();
}
}
public class RetractExpiredEventNPEMain {
public static final String DRL1 = "package org.example.drools;\n" +
"\n" +
"import org.example.drools.DispatchEvent;\n" +
"\n" +
"rule \"Old Rule\"\n" +
"when\n" +
" $e : DispatchEvent($id : id) from entry-point \"events\"\n" +
"then\n" +
" System.out.println(\"received event in old rule: \" + $id);\n" +
"end";
public static final String DRL2 = "package org.example.drools;\n" +
"\n" +
"import org.example.drools.DispatchEvent;\n" +
"\n" +
"rule \"New Rule\"\n" +
"when\n" +
" $e : DispatchEvent($id : id) from entry-point \"events\"\n" +
"then\n" +
" System.out.println(\"received event in new rule=== \" + $id);\n" +
"end";
public static void main(String[] args) {
KieServices ks = KieServices.Factory.get();
KieFileSystem kfs = ks.newKieFileSystem();
KieModuleModel kmodel = ks.newKieModuleModel();
kmodel.newKieBaseModel( "KBase")
.setDefault(true)
.addPackage("org.example.drools")
.setEventProcessingMode(EventProcessingOption.STREAM)
.newKieSessionModel("session1")
.setDefault(true);
kfs.writeKModuleXML(kmodel.toXML());
ReleaseId releaseId = ks.newReleaseId( "org.example.drools", "rules", "1.0.0-SNAPSHOT");
kfs.generateAndWritePomXML(releaseId);
String rulePath = "src/main/resources/org/example/drools/rules.drl";
kfs.write(rulePath, ks.getResources().newByteArrayResource(DRL1.getBytes()).setResourceType(ResourceType.DRL));
KieBuilder kieBuilder = ks.newKieBuilder(kfs, RetractExpiredEventNPEMain.class.getClassLoader());
kieBuilder.buildAll();
KieContainer kcontainer = ks.newKieContainer(releaseId);
KieSession session = kcontainer.newKieSession("session1");
EntryPoint events = session.getEntryPoint("events");
EventFactHandle aaa = (EventFactHandle) events.insert(new DispatchEvent("aaa"));
System.out.println("fire until halt...");
Executors.newSingleThreadExecutor()
.submit(() -> {
try {
session.fireUntilHalt();
} catch (Throwable e) {
e.printStackTrace(System.err); }
});
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
return;
}
AbstractTerminalNode sink = aaa.getLinkedTuples().getFirstLeftTuple(0).getTupleSink();
System.out.println("Tuple source not null: " + (sink.getLeftTupleSource() != null));
kfs.generateAndWritePomXML(releaseId);
kfs.write(rulePath, ks.getResources().newByteArrayResource(DRL2.getBytes()).setResourceType(ResourceType.DRL));
kieBuilder = ks.newKieBuilder(kfs, RetractExpiredEventNPEMain.class.getClassLoader());
System.out.println("build new rules...");
kieBuilder.buildAll();
System.out.println("update rule version...");
kcontainer.updateToVersion(releaseId);
System.out.println("Tuple source not null: " + (sink.getLeftTupleSource() != null));
System.out.println("insert event bbb");
events.insert(new DispatchEvent("bbb"));
System.out.println("insert event ccc");
events.insert(new DispatchEvent("ccc"));
try {
System.out.println("wait 60s ...");
Thread.sleep(60000);
} catch (InterruptedException e) {
return;
}
System.out.println("trigger retract expired facts...");
events.insert(new DispatchEvent("check expire"));
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
return;
}
System.out.println("FactHandles size=" + events.getFactCount());
session.halt();
kcontainer.dispose();
System.exit(0);
}