Uploaded image for project: 'AMQ Streams'
  1. AMQ Streams
  2. ENTMQST-3886

The state store, topic-store, may have migrated to another instance

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Obsolete
    • Icon: Major Major
    • 2.7.0.GA
    • 1.7.0.GA, 1.8.4.GA, 2.0.1.GA, 2.2.1.GA
    • topic-operator
    • None
    • False
    • None
    • False
    • Hide

      This problem is related to the topic-based metadata store, so the simplest solution would be to revert to the ZK-based metadata store, which is still supported.

      The following example shows how to revert a cluster named "my-cluster" with a plain listener on port 9092, so adjust according to the actual cluster setup.

      # edit the Kafka CR adding the following EO env var
      kubectl edit k my-cluster
      spec:
        entityOperator:
          template:
            topicOperatorContainer:
              env:
                - name: STRIMZI_USE_ZOOKEEPER_TOPIC_STORE
                  value: "true"
      
      # wait for the entity operator pod restart and confirm that you can successfully create a new test topic
      
      # if it works, you can safely delete the topic-based metadata store
      kubectl run client-"$(date +%s)" -itq --rm --restart="Never" --image="quay.io/strimzi/kafka:latest-kafka-3.4.0" -- \
        sh -c "bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --delete --topic __strimzi-topic-operator-kstreams-topic-store-changelog \
          && bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --delete --topic __strimzi_store_topic"
      
      Show
      This problem is related to the topic-based metadata store, so the simplest solution would be to revert to the ZK-based metadata store, which is still supported. The following example shows how to revert a cluster named "my-cluster" with a plain listener on port 9092, so adjust according to the actual cluster setup. # edit the Kafka CR adding the following EO env var kubectl edit k my-cluster spec: entityOperator: template: topicOperatorContainer: env: - name: STRIMZI_USE_ZOOKEEPER_TOPIC_STORE value: " true " # wait for the entity operator pod restart and confirm that you can successfully create a new test topic # if it works, you can safely delete the topic-based metadata store kubectl run client- "$(date +%s)" -itq --rm --restart= "Never" --image= "quay.io/strimzi/kafka:latest-kafka-3.4.0" -- \ sh -c "bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --delete --topic __strimzi-topic- operator -kstreams-topic-store-changelog \ && bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --delete --topic __strimzi_store_topic"

      The following error was probably caused by a general slowness of the OCP cluster (I see lots of Kube API timeouts before this), which triggered a TO rebalance.

      2021-10-08 08:52:37,71514 ERROR [vert.x-eventloop-thread-0] K8sTopicWatcher:63 - 114999|kube +"$TOPIC_NAME"---e9286f743ac6e8151037c483fddc86f287552000|747451905: Failure processing KafkaTopic watch event ADDED on resource "$TOPIC_NAME"---e9286f743ac6e8151037c483fddc86f287552000 with labels {strimzi.io/cluster="$CLUSTER_NAME"}: The state store, topic-store, may have migrated to another instance.
      org.apache.kafka.streams.errors.InvalidStateStoreException: The state store, topic-store, may have migrated to another instance.
      	at org.apache.kafka.streams.state.internals.WrappingStoreProvider.stores(WrappingStoreProvider.java:65) ~[org.apache.kafka.kafka-streams-2.7.0.redhat-00003.jar:2.7.0.redhat-00003]
      	at org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:52) ~[org.apache.kafka.kafka-streams-2.7.0.redhat-00003.jar:2.7.0.redhat-00003]
      	at jdk.internal.reflect.GeneratedMethodAccessor35.invoke(Unknown Source) ~[?:?]
      	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
      	at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
      	at io.strimzi.operator.topic.LocalStoreAndServiceFactory$LazyInvocationHandler.invoke(LocalStoreAndServiceFactory.java:60) ~[io.strimzi.topic-operator-0.22.1.redhat-00004.jar:0.22.1.redhat-00004]
      	at com.sun.proxy.$Proxy23.get(Unknown Source) ~[?:?]
      	at io.strimzi.operator.topic.KafkaStreamsTopicStore.read(KafkaStreamsTopicStore.java:74) ~[io.strimzi.topic-operator-0.22.1.redhat-00004.jar:0.22.1.redhat-00004]
      	at io.strimzi.operator.topic.TopicOperator.getFromTopicStore(TopicOperator.java:1438) ~[io.strimzi.topic-operator-0.22.1.redhat-00004.jar:0.22.1.redhat-00004]
      	at io.strimzi.operator.topic.TopicOperator.reconcileOnResourceChange(TopicOperator.java:1073) ~[io.strimzi.topic-operator-0.22.1.redhat-00004.jar:0.22.1.redhat-00004]
      	at io.strimzi.operator.topic.TopicOperator$5.lambda$execute$0(TopicOperator.java:1064) ~[io.strimzi.topic-operator-0.22.1.redhat-00004.jar:0.22.1.redhat-00004]
      	at io.vertx.core.Future.lambda$compose$3(Future.java:368) ~[io.vertx.vertx-core-3.9.4.redhat-00001.jar:3.9.4.redhat-00001]
      	at io.vertx.core.impl.FutureImpl.dispatch(FutureImpl.java:105) ~[io.vertx.vertx-core-3.9.4.redhat-00001.jar:3.9.4.redhat-00001]
      	at io.vertx.core.impl.FutureImpl.tryComplete(FutureImpl.java:150) ~[io.vertx.vertx-core-3.9.4.redhat-00001.jar:3.9.4.redhat-00001]
      	at io.vertx.core.impl.FutureImpl.complete(FutureImpl.java:111) ~[io.vertx.vertx-core-3.9.4.redhat-00001.jar:3.9.4.redhat-00001]
      	at io.vertx.core.impl.FutureImpl.handle(FutureImpl.java:176) ~[io.vertx.vertx-core-3.9.4.redhat-00001.jar:3.9.4.redhat-00001]
      	at io.vertx.core.impl.FutureImpl.handle(FutureImpl.java:21) ~[io.vertx.vertx-core-3.9.4.redhat-00001.jar:3.9.4.redhat-00001]
      	at io.vertx.core.impl.ContextImpl.lambda$null$0(ContextImpl.java:327) ~[io.vertx.vertx-core-3.9.4.redhat-00001.jar:3.9.4.redhat-00001]
      	at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:366) ~[io.vertx.vertx-core-3.9.4.redhat-00001.jar:3.9.4.redhat-00001]
      	at io.vertx.core.impl.EventLoopContext.lambda$executeAsync$0(EventLoopContext.java:38) ~[io.vertx.vertx-core-3.9.4.redhat-00001.jar:3.9.4.redhat-00001]
      	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) [io.netty.netty-common-4.1.60.Final-redhat-00001.jar:4.1.60.Final-redhat-00001]
      	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) [io.netty.netty-common-4.1.60.Final-redhat-00001.jar:4.1.60.Final-redhat-00001]
      	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500) [io.netty.netty-transport-4.1.60.Final-redhat-00001.jar:4.1.60.Final-redhat-00001]
      	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) [io.netty.netty-common-4.1.60.Final-redhat-00001.jar:4.1.60.Final-redhat-00001]
      	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [io.netty.netty-common-4.1.60.Final-redhat-00001.jar:4.1.60.Final-redhat-00001]
      	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty.netty-common-4.1.60.Final-redhat-00001.jar:4.1.60.Final-redhat-00001]
      	at java.lang.Thread.run(Thread.java:829) [?:?]
      

      On this cluster, there are 115 KafkaTopics stuck in NotReady state with this error, and they were all created at the same lastTransitionTime.

      Looking at the TO's code, I see that we do not handle the KafkaStreams REBALANCING state in the StateListener.

      https://github.com/strimzi/strimzi-kafka-operator/blob/main/topic-operator/src/main/java/io/strimzi/operator/topic/KafkaStreamsTopicStoreService.java#L105-L112

      As reported in javadoc, a KafkaStreams instance can switch between RUNNING and REBALANCING states at any time when there is a rebalance. When KafkaStreams is in REBALANCING state, the StateStore is not queryable.

      https://kafka.apache.org/31/javadoc/org/apache/kafka/streams/KafkaStreams.State.html
      https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java#L60-L68

      KafkaStreams may not be RUNNING when we try to use KafkaStreamsTopicStore, and this would raise the InvalidStateStoreException before the actual reconciliation logic is executed. Maybe we should pause reconciliation when KafkaStreams is in REBALANCING state, and skip any topic status update while in this state.

      These topics are not recovered until some change happens (unlikely in this case). In this specific use case, they will simply transition from InvalidStateStoreException to OperatorException, as there is already another Ready KafkaTopic with the same topicName.

      Tracking Strimzi issue: https://github.com/strimzi/strimzi-kafka-operator/issues/6671

              Unassigned Unassigned
              rhn-support-fvaleri Federico Valeri
              Votes:
              1 Vote for this issue
              Watchers:
              15 Start watching this issue

                Created:
                Updated:
                Resolved: