Uploaded image for project: 'AMQ Streams'
  1. AMQ Streams
  2. ENTMQST-4116

[DOC OCP RHEL] How to configure producers/consumers in MirrorMaker2 connectors

XMLWordPrintable

    • Icon: Story Story
    • Resolution: Done
    • Icon: Major Major
    • None
    • 2.1.0.GA
    • None
    • None
    • False
    • Hide

      None

      Show
      None
    • False

      • [Problem] How to configure producers/consumers in MirrorMaker2 connectors?
        • In particular, the producer and consumer configurations in MirrorSourceConnector are important for tuning MirrorMaker2. However, currently there is no clear description of it in our documentation:
          • What configuration methods are provided for producers/consumers?
          • Which Kafka CR yaml configurations affect which producer/consumer configurations?
        • In fact, even the official upstream Kafka documentation on MirrorMaker2 is quite unclear on how to configure producers/consumers in MirrorMakers.
        • And for example, “my-target-cluster.config:producer.buffer.memory” is used in our documentation(1), however, there is no mention explaining that a producer.* or consumer.* is available in Kafka CR yaml.
          • Note that "my-target-cluster.config:producer.buffer.memory" affects not only MirrorSourceConnector’s producer but also MirrorCheckpointConnector’s and MirrorHeartbeatConnector’s one. (I don't think it's a good method to configure it.)
             
      • [RFE 1] The documentation include how to configure the MirrorMaker2 connector producers/consumers
        • List of producers/consumers within each in connectors
        • How to change the configurations for each producer/consumer
        • For example, list of producers/consumers and configurations as below, (as long as each of these configurations is used, it is configured only for the corresponding producer/consumer and does not affect others):
           
          - MirrorSourceConnector
              - producer (mirrors.sourceConnector.config: producer.override.*) #created in Kafka Connect framework
              - consumer (mirrors.sourceConnector.config: consumer.* or mirrors.sourceConnector.config: source.consumer.* ) #created in MirrorMaker2
              - producer for offset commit (mirrors.sourceConnector.config: producer.*) #created in MirrorMaker2
          - MirrorHeartbeatConnector
              - producer (mirrors.heartbeatConnector.config: producer.override.*) #created in Kafka Connect framework
          - MirrorCheckpointConnector
              - producer (mirrors.checkpointConnector.config: producer.override.*) #created in Kafka Connect framework
              - consumer (mirrors.checkpointConnector.config: consumer.*) #created in MirrorMaker2
          hoge
          

           

      • [RFE 2] Add configuration items for producers/consumers in each connector definition in Kafka CR yaml
        • As you can see from the above list of producers/consumers in {}RFE 1{}, at first glance at the configuration, it is not clear which producer/consumer is affected.
          • some have "override" and others have not. These cases depend on whether the producers/consumers were created with the Kafka Connect framework or with MirrorMaker2.
        • Therefore, it could be helpful to the user to add explicit producer and consumer configuration items. For example:
          apiVersion: kafka.strimzi.io/v1beta2                                                                                                                                                                                                                                                                                                                                kind: KafkaMirrorMaker2
          metadata:
            name: my-mirror-maker-2
          spec:
          ...
            mirrors:
          ...
              sourceConnector:
                 config:
                     ...
                sourceProducer:
                  config:
                     ...
                sourceConsumer:
                  config:
                     ...
                offsetProducer:
                  config:
                     ...
          ...
          
        • In this case, it is not enough to just add the documentation, because it involves changes to the Kafka CRD.
        • As another option, there is also a way to provide a prefix for specific producer/consumer in mirrors.xxxConnector.config without changing the Kafka CRD as below:
          apiVersion: kafka.strimzi.io/v1beta2                                                                                                                                                                                                                                                                                                                                kind: KafkaMirrorMaker2
          metadata:
            name: my-mirror-maker-2
          spec:
          ...
            mirrors:
          ...
              sourceConnector:
                config:
                  sourceProducer.xxx.xxx
                  sourceConsumer.xxx.xxx
                  offsetProducer.xxx.xxx
          
        • I have stated this as a RFE to consider in development team, however, I am neutral on this RFE to change this Kafka CR yaml. There are also some concerns:
          • The producer/consumer of each connector is not documented in the official community Kafka documentation.
          • New producer/consumer may be added/removed in the future. The challenge is whether we can continue to maintain them.
          • For a radical solution, the upstream Kafka community may need to list producers/consumers in MirrorMaker2 connectors and provide explicit ways to configure them.
            • In fact, even the upstream Kafka community documentation on MirrorMaker2 is quite unclear on how to configure producers/consumers.
            • And the scope of the configuration's impact is completely unimaginable.
            • I believe I can create KIP/JIRA/PR if needed for the upstream Kafka community.
      • [as far as I've researched]
        • MirrorMaker2 has multiple Kafka Connectors, with multiple producers/consumers within each, some of them belonging to the Kafka Connect framework, and others specific to MirrorMaker2.
        • The configuration method differs between MirrorMaker-specific producers/consumers and Kafka Connect framework producers/consumers.
          • producers/consumers created in Kafka Connect framework use following configurations
            • mirrors.xxxConnector.config: {producer or consumer}.override.*
            • {source_or_target}

              .config: {producer_or_consumer}.* is overridden

          • producers/consumers created in MirrorMaker2 use following configurations
            • mirrors.xxxConnector.config: {source_or_target}.{producer_or_consumer}
            • mirrors.xxxConnector.config: {producer_or_consumer}.* is overridden
        • list of producers/consumers and configurations
          • As far as I have actually tried(attached file: kafka-mirror-maker-2.yaml) and confirmed them using the debugger, the following configurations were used for each producer/consumer.
                - MirrorSourceConnector
                    - producer in WorkerSourceTask #created in Kafka Connect framework
                        - sourceConnector.config: producer.override.buffer.memory(=7770303)
                            - my-target-cluster.config: producer.buffer.memory(=7770202) is overridden
                    - consumer in MirrorSourceTask:93 #created in MirrorMaker2
                        - (sourceConnector.config: source.consumer.fetch.max.bytes(=55500306))
                            - always prefix is “source”. However, the prefix can be omitted, as it is sufficiently specific.
                        - sourceConnector.config: consumer.fetch.max.bytes(=55500302) 
                    - offsetProducer in MirrorSourceTask:94 as offsetProducer : #created in MirrorMaker2
                        - (sourceConnector.config: source.producer.buffer.memory(=7770306))
                            - “source” prefix depend on where OffsetSyncStore is created on source or target cluster. However, the prefix can be omitted, as it is sufficiently specific.
                        - sourceConnector.config:producer.buffer.memory(=7770302)
                - MirrorHeartbeatConnector
                    - producer in WorkerSourceTask  #created in Kafka Connect framework
                        - heartbeatConnector.config: producer.override.buffer.memory(=7770403)
                            - my-target-cluster.config: producer.buffer.memory(=7770202) is overridden
                - MirrorCheckpointConnector
                    - producer in WorkerSourceTask #created in Kafka Connect framework
                        - checkpointConnector.config: producer.override.buffer.memory(=7770503)
                            - my-target-cluster.config: producer.buffer.memory(=7770202) is overridden
                    - consumer in OffsetSyncStore in MirrorCheckpointTask:94  #created in MirrorMaker2 
                        - (checkpointConnector.config: source.consumer.fetch.max.bytes(=55500506))
                            - “source” prefix depend on where OffsetSyncStore is created on source or target cluster. However, the prefix can be omitted, as it is sufficiently specific.
                        - checkpointConnector.config: consumer.fetch.max.bytes:55500502
                - Kafka Connect
                    - KafkaBasedLog for each backing stores (KafkaOffsetBackingStore, KafkaStatusBackingStore and KafkaConfigBackingStore)
                        - producer #value=  my-target-cluster.config:buffer.memory=7770201
                        - consumer #value= my-target-cluster.config:fetch.max.bytes=55500201 
            
      • Investigating AMQ Streams on RHEL
        • I used the attached mm2deployment_rhel.yaml for testing.
        • The following result is with only my-source-cluster->my-target-cluster enabled and "/opt/kafka/bin/connect-mirror-maker.sh --clusters my-target-cluster".
        • However, if my-source-cluster->my-target-cluster & my-target-cluster->my-source-cluster are enabled, the cluster with inverted source and target are also used as valid producer/consumer configurations.
        • - MirrorSourceConnector 
              - consumer to retrieve topic messages from the source cluster
                  - connect-mirror-maker.properties: my-source-cluster.consumer.fetch.max.bytes 55500110(MirrorSourceTask:93)
              - producer to send topic messages to the target cluster
                  - connect-mirror-maker.properties: my-target-cluster.producer.buffer.memory 7770111(buildWorkerTak:601) 
              - producer to write to the offset sync topic, which maps the source and target offsets for replicated topic partitions.
                  - connect-mirror-maker.properties: my-source-cluster.producer.buffer.memory 7770110(MirrorSourceTask:94)
          - MirrorHeartbeatConnector
              - producer to emit heartbeats 
                  - connect-mirror-maker.properties: my-target-cluster.producer.buffer.memory 7770111(buildWorkerTask:601) 
          - MirrorCheckpointConnector
              - producer to emits consumer offset checkpoints
                  - connect-mirror-maker.properties: my-target-cluster.producer.buffer.memory 7770111(buildWorkerTask:601)
              - consumer to load the offset sync topic 
                  - connect-mirror-maker.properties: my-source-cluster.consumer.fetch.max.bytes 55500110(MirrorCheckpointTask:94) 
          

          (1) "producer.buffer.memory" is used in our documentation 
              https://access.redhat.com/documentation/en-us/red_hat_amq_streams/2.1/html-single/configuring_amq_streams_on_openshift/index#con-mirrormaker-high-volume-messages-str

        1. ENTMQST-4116_kafka-mirror-maker-2.yaml
          7 kB
          Tomonari Yamashita
        2. mm2deployment_rhel.yaml
          3 kB
          Tomonari Yamashita

              pmellor@redhat.com Paul Mellor
              rhn-support-tyamashi Tomonari Yamashita
              Maros Orsak Maros Orsak
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

                Created:
                Updated:
                Resolved: