package com.drools.restore.reproducer;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.drools.core.time.impl.PseudoClockScheduler;
import org.junit.Test;
import org.kie.api.KieBase;
import org.kie.api.KieServices;
import org.kie.api.builder.KieBuilder;
import org.kie.api.builder.KieFileSystem;
import org.kie.api.builder.KieModule;
import org.kie.api.builder.ReleaseId;
import org.kie.api.builder.model.KieBaseModel;
import org.kie.api.builder.model.KieModuleModel;
import org.kie.api.conf.EqualityBehaviorOption;
import org.kie.api.conf.EventProcessingOption;
import org.kie.api.marshalling.Marshaller;
import org.kie.api.runtime.KieContainer;
import org.kie.api.runtime.KieSession;
import org.kie.api.runtime.KieSessionConfiguration;
import org.kie.api.runtime.conf.ClockTypeOption;
import org.kie.api.runtime.conf.TimedRuleExectionOption;
import org.kie.api.runtime.conf.TimerJobFactoryOption;
import org.kie.api.time.SessionClock;

public class RestoreReproducer {

   public static KieModule createAndDeployJarInStreamMode(KieServices ks, ReleaseId releaseId, String... drls) {
      KieFileSystem kfs = ks.newKieFileSystem();
      kfs.generateAndWritePomXML(releaseId);
      KieModuleModel module = ks.newKieModuleModel();

      KieBaseModel defaultBase = module.newKieBaseModel("kBase1");
      defaultBase.setEventProcessingMode(EventProcessingOption.STREAM).setDefault(true);
      defaultBase.newKieSessionModel("defaultKSession").setDefault(true);
      defaultBase.setEqualsBehavior(EqualityBehaviorOption.EQUALITY);
      kfs.writeKModuleXML(module.toXML());

      for (int i = 0; i < drls.length; i++) {
         kfs.write("src/main/resources/rules" + i + ".drl", drls[i]);
      }

      KieBuilder kb = ks.newKieBuilder(kfs);
      kb.buildAll();
      if (kb.getResults().hasMessages(org.kie.api.builder.Message.Level.ERROR)) {
         System.out.println(kb.getResults().toString());
      }
      return kb.getKieModule();
   }

   @Test
   @SuppressWarnings("rawtypes")
   public void testSnapshotRecoveryScheduledRulesPlain() throws Exception {

      StringBuffer prefix = new StringBuffer();
      prefix.append("package com.drools.restore.reproducer\n");
      prefix.append("global java.util.List list;\n");
      prefix.append("global java.util.List list2;\n");

      StringBuffer drlR1 = new StringBuffer();
      drlR1.append(prefix.toString());
      drlR1.append("rule R1\n");
      drlR1.append(" timer (int: 20s)\n");
      drlR1.append(" when\n");
      drlR1.append("   $m : Message( message == \"Hello World1\" )\n");
      drlR1.append(" then\n");
      drlR1.append("   list.add( $m );\n");
      drlR1.append("   retract( $m );\n");
      drlR1.append("end\n");

      StringBuffer drlR2 = new StringBuffer();
      drlR2.append(prefix.toString());
      drlR2.append("rule R2\n");
      drlR2.append(" timer (int: 30s)\n");
      drlR2.append(" when\n");
      drlR2.append("   $m : Message( message == \"Hello World2\" )\n");
      drlR2.append(" then\n");
      drlR2.append("   list2.add( $m );\n");
      drlR2.append("   retract( $m );\n");
      drlR2.append("end\n");

      KieServices ks = KieServices.Factory.get();
      ReleaseId releaseId1 = ks.newReleaseId("org.kie", "test-upgrade", "1.0.0");
      KieModule km = createAndDeployJarInStreamMode(ks, releaseId1, drlR1.toString(), drlR2.toString());

      KieContainer kc = ks.newKieContainer(km.getReleaseId());

      KieSessionConfiguration ksconf = ks.newKieSessionConfiguration();
      ksconf.setOption(TimedRuleExectionOption.YES);
      ksconf.setOption(TimerJobFactoryOption.get("trackable"));
      ksconf.setOption(ClockTypeOption.get("pseudo"));

      KieBase kBase = kc.getKieBase();

      KieSession ksession = kBase.newKieSession(ksconf, null);

      PseudoClockScheduler timeService = (PseudoClockScheduler) ksession.<SessionClock> getSessionClock();

      List list = new ArrayList();
      ksession.setGlobal("list", list);
      List list2 = new ArrayList();
      ksession.setGlobal("list2", list2);

      Message m1 = new Message("Hello World1");
      Message m2 = new Message("Hello World2");

      ksession.insert(m1);
      ksession.insert(m2);

      ksession.fireAllRules();
      timeService.advanceTime(10500, TimeUnit.MILLISECONDS);

      // Recover engine
      byte[] snapshot = this.getSnapshot(kBase, ksession);

      KieServices ks2 = KieServices.Factory.get();
      ReleaseId releaseId2 = ks.newReleaseId("org.kie", "test-upgrade", "4.2.3");
      KieModule km2 = createAndDeployJarInStreamMode(ks2, releaseId2, drlR1.toString(), drlR2.toString());

      KieContainer kc2 = ks2.newKieContainer(km2.getReleaseId());

      KieSessionConfiguration ksconf2 = ks2.newKieSessionConfiguration();
      ksconf2.setOption(TimedRuleExectionOption.YES);
      ksconf2.setOption(TimerJobFactoryOption.get("trackable"));
      ksconf2.setOption(ClockTypeOption.get("pseudo"));

      KieBase kBase2 = kc2.getKieBase();
      ksession.halt();
      ksession.destroy();

      List list3 = new ArrayList();
      List list4 = new ArrayList();

      KieSession ksession2 = kBase2.newKieSession(ksconf2, null);
      ksession2.setGlobal("list", list3);
      ksession2.setGlobal("list2", list4);
      PseudoClockScheduler timeService2 = (PseudoClockScheduler) ksession2.<SessionClock> getSessionClock();
      this.loadSnapshot(kBase2, ksession2, snapshot);
      ksession2.fireAllRules();

      long accumulatedSleepTime = 0;
      for (int i = 0; i < 8; i++) {
         timeService2.advanceTime(5050, TimeUnit.MILLISECONDS);
         accumulatedSleepTime += 5050;
         System.out.println("Accumulated sleep time: " + accumulatedSleepTime);
         System.out.println(list.size() + "," + list2.size() + "," + list3.size() + "," + list4.size());
      }

   }

   public byte[] getSnapshot(KieBase kBase, KieSession kSession) throws IOException {

      final Marshaller marshaller = KieServices.Factory.get().getMarshallers().newMarshaller(kBase);
      final ByteArrayOutputStream baos = new ByteArrayOutputStream();

      marshaller.marshall(baos, kSession);
      return baos.toByteArray();
   }

   private void loadSnapshot(KieBase kBase, KieSession kSession, byte[] bytes) throws ClassNotFoundException, IOException {

      Marshaller marshaller = KieServices.Factory.get().getMarshallers().newMarshaller(kBase);

      ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
      marshaller.unmarshall(bais, kSession);

      bais.close();

   }

}
