Uploaded image for project: 'jBPM'
  1. jBPM
  2. JBPM-9541

Kafka extension to support custom format

XMLWordPrintable

    • Icon: Enhancement Enhancement
    • Resolution: Done
    • Icon: Major Major
    • 7.49.0.Final
    • None
    • None
    • None
    • 2020 Week 52-03 (from Dec 21), 2021 Week 04-06 (from Jan 25)

      Currently, KIE kafka extension expects the value of the Kafka record received to fulfill cloud event specification, and, conversely, it also publish records which value follows cloud event specification.

      This Jira will add an additional implementation that consume/publish directly the object as json and provides an extension mechanism so customer can provided its own value format if needed.

      Engineering Solution

      User will have two options to customize Kafka record format. 

      User might use default implementation of KafkaEventProcessFactory or implement his own.

      When using the default factory, user can specify its own implementation of KafkaEventReader (used to extract messages from Kafka) or KafkaEventWriter (used to put message into Kafka) interfaces per topic, specifying system properties of this form: org.kie.server.jbpm-kafka.ext.topics.<topicName>.eventReaderClass and org.kie.server.jbpm-kafka.ext.topics.<topicName>.eventWriterClass. If all topics are using the same conversion mechanism, default cloud event implementation can be changed for all topics using org.kie.server.jbpm-kafka.ext.eventReaderClass and org.kie.server.jbpm-kafka.ext.eventWriterClass.

      For example if user wants to  consume or publish the message pojo as json for all topics (rather than as cloud events), this is the configuration to be used:

       

      org.kie.server.jbpm-kafka.ext.eventReaderClass=org.kie.server.services.jbpm.kafka.RawJsonEventReader
      org.kie.server.jbpm-kafka.ext.eventWriterClass=org.kie.server.services.jbpm.kafka.RawJsonEventWriter
      

       

      KafkaEventReader implementation needs to have a public  constructor that accepts a ClassLoader instance. KafkaEventWriter implementation just needs a public default constructor

      Default factory implementation caches reader instances per class name and classloader (so if two topics share property value and kie deployment classloader, they will share the same reader instance) and caches writer instances per class name (so if two topics share property value, they will share the same writer intance)

      This caching behaviour is optimal for json based mashalllers, but if for some reason user requires a different one, then custom factory that returns KafkaEventReader and KafkaEventWriter implementations as needed can be developed. In order to use this custom implementation, system property org.kie.server.jbpm-kafka.ext.eventProcessorFactoryClass should be set. See an example of factory implementation using java serialization mechanism.

       

            ftirados Francisco Javier Tirado Sarti
            ftirados Francisco Javier Tirado Sarti
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

              Created:
              Updated:
              Resolved: