Uploaded image for project: 'Kogito'
  1. Kogito
  2. KOGITO-3911

Error on Kafka consumer not acking received messages

    XMLWordPrintable

Details

    • Bug
    • Resolution: Done
    • Major
    • 1.0.0.Final
    • 1.0.0.Final
    • Core Engine
    • None
    • False
    • False
    • Undefined
      • Run process-kafka-quickstart-quarkus example
      • Wait for more than 1 minute after sending the message to start the process
    • ---
    • ---
    • 2020 Week 46-48 (from Nov 9)

    Description

      • issue running the process-kafka-quickstart-quarkus: starting the process sending a kafka message is ok, the process run and publishes the message as expected but the problem is all the messages being received are not acknowledged, in this way after 1 minute an exception is thrown killing the consumer, that basically stop receiving messages, impossible to start new processes.
        ...
         
        2020-11-24 13:33:43,363 WARN  [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-0) SRMSG18231: The amount of received messages without acking is too high for topic partition 'TopicPartition{topic=travellers, partition=0}', amount 2.
        2020-11-24 13:33:43,365 WARN  [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-0) SRMSG18228: A failure has been reported for Kafka topics '[travellers]': io.smallrye.reactive.messaging.kafka.commit.KafkaThrottledLatestProcessedCommit$TooManyMessagesWithoutAckException: Too Many Messages without acknowledgement
        at io.smallrye.reactive.messaging.kafka.commit.KafkaThrottledLatestProcessedCommit.lambda$flushAndCheckHealth$8(KafkaThrottledLatestProcessedCommit.java:299)
        at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
        at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
        at java.base/java.util.HashMap$ValueSpliterator.forEachRemaining(HashMap.java:1675)
        at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
        at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
        at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
        at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
        at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
        at io.smallrye.reactive.messaging.kafka.commit.KafkaThrottledLatestProcessedCommit.flushAndCheckHealth(KafkaThrottledLatestProcessedCommit.java:299)
        at io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:923)
        at io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:887)
        at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:366)
        at io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:43)
        at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:229)
        at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:221)
        at io.vertx.core.impl.VertxImpl$InternalTimerHandler.run(VertxImpl.java:913)
        at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
        at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:834)

      2020-11-24 13:33:43,370 INFO  [org.apa.kaf.cli.con.int.ConsumerCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=kafka-consumer-kogito_incoming_stream, groupId=5b93d8ee-ff5a-44ee-9ff8-2a668816eba3] Revoke previously assigned partitions travellers-0
      2020-11-24 13:33:43,373 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=kafka-consumer-kogito_incoming_stream, groupId=5b93d8ee-ff5a-44ee-9ff8-2a668816eba3] Member kafka-consumer-kogito_incoming_stream-a686a23e-f340-4c27-b943-1cc28e78eaad sending LeaveGroup request to coordinator localhost:9092 (id: 2147483647 rack: null) due to the consumer is being closed
       

      Attachments

        Issue Links

          Activity

            People

              tdolphin-1 Tiago Dolphine
              tdolphin-1 Tiago Dolphine
              Tristan Radisson Tristan Radisson
              Tristan Radisson Tristan Radisson
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: