Uploaded image for project: 'Kogito'
  1. Kogito
  2. KOGITO-6287

Improve jbpm engine Message and Signal (forward port)

XMLWordPrintable

    • Icon: Feature Request Feature Request
    • Resolution: Done
    • Icon: Major Major
    • 1.20.0.Final
    • None
    • None
    • None

      h1: conversations:
      for now seems out of the scope of the bapl. In any case the infrastructure is there for further improvements.

      h1: correlation subscriptions 1:1 peer communication

      what is it?

      So far the engine has no differentiation whether a signal is being used or message. This works tries to introduce a way to add correlations key in a spec way.
      This will allow systems like kafka to route specific messages based on content and collaboration among process that share common knowledge without knowing the existence of each other.

       
      The work is based on the EventTypeFilter adding a new level of filter based on the content of the message. There are two possible ways of working this correlations subscriptions

      1. Static (Message start event)
      2. Dynamic (Message Intermediate Catch event, Message Boundary event)

      As the correlation subscription is based on retrieval expression.

      1. Static can only based on constant based expression. It is not possible to access variables of a process has not been created (cannot subscribe)
      2. Dynamic Can be based on constants expression or based on expression using process state variables

      Use Cases

      For static use case we can picture an event coming from a thirst party (either a process or another subsystem like payment gateway or camel route). An event is generated

      {
         orderId : 1
         payment: ok
      }
      

      We can plan the static correlation like:

      Correlation key (OkOrderCorrelationKey) Message properties (orderId) Process expression (orderId)
      status payment "ok"

      This would work as follow. We created a subscription for a certain process with the correlation key OkOrderCorrelationKey. Now for every incoming message:

      1. We compute the correlation key instance based on the incoming message. We extract payment value and fill the status property of the correlation key derived from there -> OKOrderCorrelationKey {status = ok}
        # We compute the correlation subscription based on the static expression "ok" and filling the property status -> OkOrderCorrelationKey {status = ok}

      As there is a match we would start a new process with that message. if the message does not match the system will reject the starting.

      For dynamic correlation key it is very similar. Let say we have another correlation key:

      Correlation key (OrderCorrelationKey) Message properties (orderId) Process expression (orderId)
      id orderId ProcessVarOrderId

      As we had in the same way with static now we compute both correlation keys.

      We start a process with

      {ProcessVarOrderId = 1}
      1. We compute the correlation key instance based on the incoming message. We extract payment value and fill the status property of the correlation key derived from there -> OKOrderCorrelationKey {status = 1}
      2. We compute the correlation subscription based on the dynamic expression based on process variables "ProcessVarOderId" which is 1 at the moment of computing and filling the property status -> OkOrderCorrelationKey {id = 1}

      As we had the same way with static now we have a match. As it is dynamic a change in the ProcessVarOrderId may vary in time. it should (but not limited) to be final variable (once it is set. IT should not be modified, but it is not limited to that)

      1. IMPORTANT NOTE: The constraints of the process is that none property expression of the correlation key might be evaluated to null at the moment of computed the correlation key instance. The system will rise and exception
      2. Only supports MVEL expression language (language attribute in expressions)

      bpmn2 technicalities

      	<xsd:element name="correlationProperty" type="tCorrelationProperty" substitutionGroup="rootElement"/>
      	<xsd:complexType name="tCorrelationProperty">
      		<xsd:complexContent>
      			<xsd:extension base="tRootElement">
      				<xsd:sequence>
      					<xsd:element ref="correlationPropertyRetrievalExpression" minOccurs="1" maxOccurs="unbounded"/>
      				</xsd:sequence>
      				<xsd:attribute name="name" type="xsd:string" use="optional"/>
      				<xsd:attribute name="type" type="xsd:QName"/>
      			</xsd:extension>
      		</xsd:complexContent>
      	</xsd:complexType>
      
      	<xsd:element name="correlationPropertyRetrievalExpression" type="tCorrelationPropertyRetrievalExpression"/>
      	<xsd:complexType name="tCorrelationPropertyRetrievalExpression">
      		<xsd:complexContent>
      			<xsd:extension base="tBaseElement">
      				<xsd:sequence>
      					<xsd:element name="messagePath" type="tFormalExpression" minOccurs="1" maxOccurs="1"/>
      				</xsd:sequence>
      				<xsd:attribute name="messageRef" type="xsd:QName" use="required"/>
      			</xsd:extension>
      		</xsd:complexContent>
      	</xsd:complexType>
      

      collaboration needed

      	<xsd:element name="collaboration" type="tCollaboration" substitutionGroup="rootElement"/>
      	<xsd:complexType name="tCollaboration">
      		<xsd:complexContent>
      			<xsd:extension base="tRootElement">
      				<xsd:sequence>
      					<xsd:element ref="participant" minOccurs="0" maxOccurs="unbounded"/>
      					<xsd:element ref="messageFlow" minOccurs="0" maxOccurs="unbounded"/>
      					<xsd:element ref="artifact" minOccurs="0" maxOccurs="unbounded"/>
      					<xsd:element ref="conversationNode" minOccurs="0" maxOccurs="unbounded"/>
      					<xsd:element ref="conversationAssociation" minOccurs="0" maxOccurs="unbounded"/>
      					<xsd:element ref="participantAssociation" minOccurs="0" maxOccurs="unbounded"/>
      					<xsd:element ref="messageFlowAssociation" minOccurs="0" maxOccurs="unbounded"/>
      					<xsd:element ref="correlationKey" minOccurs="0" maxOccurs="unbounded"/>
      					<xsd:element name="choreographyRef" type="xsd:QName" minOccurs="0" maxOccurs="unbounded"/>
      					<xsd:element ref="conversationLink" minOccurs="0" maxOccurs="unbounded"/>
      				</xsd:sequence>
      				<xsd:attribute name="name" type="xsd:string"/>
      				<xsd:attribute name="isClosed" type="xsd:boolean" default="false"/>
      			</xsd:extension>
      		</xsd:complexContent>
      	</xsd:complexType>
      
      	<xsd:element name="correlationKey" type="tCorrelationKey"/>
      	<xsd:complexType name="tCorrelationKey">
      		<xsd:complexContent>
      			<xsd:extension base="tBaseElement">
      				<xsd:sequence>
      					<xsd:element name="correlationPropertyRef" type="xsd:QName" minOccurs="0" maxOccurs="unbounded"/>
      				</xsd:sequence>
      				<xsd:attribute name="name" type="xsd:string" use="optional"/>
      			</xsd:extension>
      		</xsd:complexContent>
      	</xsd:complexType>
      	
      	
      
      

      subscription

      	<xsd:element name="correlationSubscription" type="tCorrelationSubscription"/>
      	<xsd:complexType name="tCorrelationSubscription">
      		<xsd:complexContent>
      			<xsd:extension base="tBaseElement">
      				<xsd:sequence>
      					<xsd:element ref="correlationPropertyBinding" minOccurs="0" maxOccurs="unbounded"/>
      				</xsd:sequence>
      				<xsd:attribute name="correlationKeyRef" type="xsd:QName" use="required"/>
      			</xsd:extension>
      		</xsd:complexContent>
      	</xsd:complexType>
      
      	<xsd:element name="correlationPropertyBinding" type="tCorrelationPropertyBinding"/>
      	<xsd:complexType name="tCorrelationPropertyBinding">
      		<xsd:complexContent>
      			<xsd:extension base="tBaseElement">
      				<xsd:sequence>
      					<xsd:element name="dataPath" type="tFormalExpression" minOccurs="1" maxOccurs="1"/>
      				</xsd:sequence>
      				<xsd:attribute name="correlationPropertyRef" type="xsd:QName" use="required"/>
      			</xsd:extension>
      		</xsd:complexContent>
      	</xsd:complexType>
      
      

      Example as it is:

      <?xml version="1.0" encoding="UTF-8"?>
      <bpmn2:definitions xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://www.omg.org/bpmn20" xmlns:bpmn2="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:bpsim="http://www.bpsim.org/schemas/1.0" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:drools="http://www.jboss.org/drools" id="_5wA6AMKrEeujE7stQnOanQ" xsi:schemaLocation="http://www.omg.org/spec/BPMN/20100524/MODEL BPMN20.xsd http://www.jboss.org/drools drools.xsd http://www.bpsim.org/schemas/1.0 bpsim.xsd http://www.omg.org/spec/DD/20100524/DC DC.xsd http://www.omg.org/spec/DD/20100524/DI DI.xsd " exporter="jBPM Process Modeler" exporterVersion="2.0" targetNamespace="http://www.omg.org/bpmn20">
      
        <bpmn2:itemDefinition id="collaborationType" structureRef="org.jbpm.test.functional.collaboration.Message"/>
        <bpmn2:itemDefinition id="collaborationPropertyType" structureRef="String"/>
        <bpmn2:itemDefinition id="subscriptionCollaborationPropertyType" structureRef="String"/>
      
        <bpmn2:message id="_2QDl6sHdEeur348LfIUqUw" itemRef="collaborationType" name="collaboration"/>
      
        <bpmn2:correlationProperty id="CorrelationPropertyId" name="Correlation Property 1" type="collaborationPropertyType">
            <bpmn2:correlationPropertyRetrievalExpression id="CorrelationPropertyRetrievalExpressionId" messageRef="_2QDl6sHdEeur348LfIUqUw">
            <bpmn2:messagePath xsi:type="bpmn2:tFormalExpression" id="FormalExpressionId" evaluatesToTypeRef="collaborationType" language="http://www.java.com/java">id</bpmn2:messagePath>
          </bpmn2:correlationPropertyRetrievalExpression>
        </bpmn2:correlationProperty>
      
        <bpmn2:collaboration id="DefaultCollaborationId" name="Default Collaboration">
          <bpmn2:participant id="ParticipantId" name="Pool Participant" processRef="collaboration.StartMessage"/>
          <bpmn2:correlationKey id="CorrelationKeyId" name="Correlation Key Message">
            <bpmn2:correlationPropertyRef>CorrelationPropertyId</bpmn2:correlationPropertyRef>
          </bpmn2:correlationKey>
        </bpmn2:collaboration>
      
        <bpmn2:process id="collaboration.StartMessage" drools:packageName="com.test.test" drools:version="1.0" drools:adHoc="false" name="Start" isExecutable="true" processType="Public">
          <bpmn2:property id="MessageId" itemSubjectRef="subscriptionCollaborationPropertyType"/>
          <bpmn2:correlationSubscription id="CorrelationSubscription_Id" correlationKeyRef="CorrelationKeyId">
            <bpmn2:correlationPropertyBinding id="CorrelationPropertyBindingId" correlationPropertyRef="CorrelationPropertyId">
              <bpmn2:dataPath xsi:type="bpmn2:tFormalExpression" id="FormalExpression_2" evaluatesToTypeRef="collaborationType" language="http://www.java.com/java">"1"</bpmn2:dataPath>
            </bpmn2:correlationPropertyBinding>
          </bpmn2:correlationSubscription>
      
          <bpmn2:sequenceFlow id="_281A3494-8772-4208-B166-7C3791208FBC" sourceRef="_48E6D148-AD42-47C7-9BA3-09E0493D83F2" targetRef="_629DCCD1-4FF4-4EFD-803F-09B7BBE7F03F">
            <bpmn2:extensionElements>
              <drools:metaData name="isAutoConnection.source">
                <drools:metaValue><![CDATA[true]]></drools:metaValue>
              </drools:metaData>
              <drools:metaData name="isAutoConnection.target">
                <drools:metaValue><![CDATA[true]]></drools:metaValue>
              </drools:metaData>
            </bpmn2:extensionElements>
          </bpmn2:sequenceFlow>
          <bpmn2:endEvent id="_629DCCD1-4FF4-4EFD-803F-09B7BBE7F03F" name="End">
            <bpmn2:extensionElements>
              <drools:metaData name="elementname">
                <drools:metaValue><![CDATA[End]]></drools:metaValue>
              </drools:metaData>
            </bpmn2:extensionElements>
            <bpmn2:incoming>_281A3494-8772-4208-B166-7C3791208FBC</bpmn2:incoming>
          </bpmn2:endEvent>
          <bpmn2:startEvent id="_48E6D148-AD42-47C7-9BA3-09E0493D83F2" name="Message">
            <bpmn2:extensionElements>
              <drools:metaData name="elementname">
                <drools:metaValue><![CDATA[Message]]></drools:metaValue>
              </drools:metaData>
            </bpmn2:extensionElements>
            <bpmn2:outgoing>_281A3494-8772-4208-B166-7C3791208FBC</bpmn2:outgoing>
            <bpmn2:messageEventDefinition id="_2QDl6cHdEeur348LfIUqUw" drools:msgref="collaboration" messageRef="_2QDl6sHdEeur348LfIUqUw"/>
          </bpmn2:startEvent>
        </bpmn2:process>
      </bpmn2:definitions>
      

              rhn-support-egonzale Enrique Gonzalez Martinez (Inactive)
              rhn-support-egonzale Enrique Gonzalez Martinez (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

                Created:
                Updated:
                Resolved: