-
Bug
-
Resolution: Done
-
Major
-
1.0.0.Final
-
None
-
False
-
False
-
Undefined
-
- Run process-kafka-quickstart-quarkus example
- Wait for more than 1 minute after sending the message to start the process
-
---
-
---
-
-
2020 Week 46-48 (from Nov 9)
- issue running the process-kafka-quickstart-quarkus: starting the process sending a kafka message is ok, the process run and publishes the message as expected but the problem is all the messages being received are not acknowledged, in this way after 1 minute an exception is thrown killing the consumer, that basically stop receiving messages, impossible to start new processes.
...
2020-11-24 13:33:43,363 WARN [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-0) SRMSG18231: The amount of received messages without acking is too high for topic partition 'TopicPartition{topic=travellers, partition=0}', amount 2.
2020-11-24 13:33:43,365 WARN [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-0) SRMSG18228: A failure has been reported for Kafka topics '[travellers]': io.smallrye.reactive.messaging.kafka.commit.KafkaThrottledLatestProcessedCommit$TooManyMessagesWithoutAckException: Too Many Messages without acknowledgement
at io.smallrye.reactive.messaging.kafka.commit.KafkaThrottledLatestProcessedCommit.lambda$flushAndCheckHealth$8(KafkaThrottledLatestProcessedCommit.java:299)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
at java.base/java.util.HashMap$ValueSpliterator.forEachRemaining(HashMap.java:1675)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
at io.smallrye.reactive.messaging.kafka.commit.KafkaThrottledLatestProcessedCommit.flushAndCheckHealth(KafkaThrottledLatestProcessedCommit.java:299)
at io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:923)
at io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:887)
at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:366)
at io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:43)
at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:229)
at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:221)
at io.vertx.core.impl.VertxImpl$InternalTimerHandler.run(VertxImpl.java:913)
at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
2020-11-24 13:33:43,370 INFO [org.apa.kaf.cli.con.int.ConsumerCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=kafka-consumer-kogito_incoming_stream, groupId=5b93d8ee-ff5a-44ee-9ff8-2a668816eba3] Revoke previously assigned partitions travellers-0
2020-11-24 13:33:43,373 INFO [org.apa.kaf.cli.con.int.AbstractCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=kafka-consumer-kogito_incoming_stream, groupId=5b93d8ee-ff5a-44ee-9ff8-2a668816eba3] Member kafka-consumer-kogito_incoming_stream-a686a23e-f340-4c27-b943-1cc28e78eaad sending LeaveGroup request to coordinator localhost:9092 (id: 2147483647 rack: null) due to the consumer is being closed
- relates to
-
KOGITO-3933 Provide proper acknowledgment for Messages consumed from Kafka
- Resolved