Uploaded image for project: 'Red Hat Process Automation Manager'
  1. Red Hat Process Automation Manager
  2. RHPAM-3371

Kafka WIH puts kie-server hanging if Kafka Broker is offline.

XMLWordPrintable

    • Icon: Enhancement Enhancement
    • Resolution: Done
    • Icon: Major Major
    • 7.11.0.GA
    • 7.9.0.GA
    • jBPM Core
    • None
    • False
    • False
    • Documentation (Ref Guide, User Guide, etc.), Release Notes
    • CR1
    • Undefined
    • ---
    • ---
    • 2021 Week 04-06 (from Jan 25)

      I just tested the Kafka Producer WIH and noticed if the Kafka broker goes offline the kie-server hangs and can't process any new request. It becomes completely inoperable until it manages to reconnect to the Kafka Broker (comes online).

       

      This is the error I see in server.log:

      This WARN keeps showing up until the produces reaches its default timeout (60s)

      10:13:57,315 WARN  [org.apache.kafka.clients.NetworkClient] (kafka-producer-network-thread | pam) [Producer clientId=pam] Connection to node 0 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
      

      After the kafka produce timeout, this exception is thrown

      10:13:57,396 ERROR [org.jbpm.process.workitem.kafka.KafkaWorkItemHandler] (default task-58) Handler error: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for PAM_Events-0:120000 ms has passed since batch creation
      	at deployment.kie-server.war//org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)
      	at deployment.kie-server.war//org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)
      	at deployment.kie-server.war//org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)
      	at deployment.kie-server.war//org.jbpm.process.workitem.kafka.KafkaWorkItemHandler.executeWorkItem(KafkaWorkItemHandler.java:124)
      ...
      

      It seems the Kafka WIH sends the event in Sync mode.

      producer.send(new ProducerRecord(topic,key, value)).get();
      

      That's why the process hangs until the Producer client reaches its default timeout. Maybe we should provide a way to fine-tune (configure) the Kafka Producer using parameters or provide an option to use this WIH in an Async or Fire and Forget way...

      Solution

      In the deployment descriptor runtimeManager should be used to use async command by kafka:

      new KafkaWorkItemHandler.KafkaWorkItemHandler(env['kafka_server'], "jbpm_id", "org.apache.kafka.common.serialization.StringSerializer", "org.apache.kafka.common.serialization.StringSerializer", classLoader, runtimeManager);
      

      also added global env set for next properties for all producers created.

      org.jbpm.process.workitem.kafka.reconnect.backoff.max.msreconnect.backoff.max.ms "The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms."

      org.jbpm.process.workitem.kafka.reconnect.backoff.ms "The base amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker."

      org.jbpm.process.workitem.kafka.request.timeout.ms "The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted."

      org.jbpm.process.workitem.kafka.retries "Setting a value greater than zero will cause the client to resend any request that fails with a potentially transient error. It is recommended to set the value to either zero or `MAX_VALUE` and use corresponding timeout parameters to control how long a client should retry a request."

      org.jbpm.process.workitem.kafka.retry.backoff.ms "The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios."

      org.jbpm.process.workitem.kafka.enable.idempotence "When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer retries due to broker failures, etc., may write duplicates of the retried message in the stream. Note that enabling idempotence requires <code> MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION </code> to be less than or equal to 5, RETRIES_CONFIG to be greater than 0 and <code>ACKS_CONFIG </code> must be 'all'. If these values are not explicitly set by the user, suitable values will be chosen. If incompatible values are set, a <code>ConfigException</code> will be thrown."

              elguardian@gmail.com Enrique González Martínez (Inactive)
              rafael.soares Rafael Soares
              Gonzalo Muñoz Fernández Gonzalo Muñoz Fernández
              Gonzalo Muñoz Fernández Gonzalo Muñoz Fernández
              Votes:
              1 Vote for this issue
              Watchers:
              6 Start watching this issue

                Created:
                Updated:
                Resolved: