-
Story
-
Resolution: Done
-
Major
-
None
-
2.1.0.GA
-
None
-
None
-
False
-
-
False
-
- [Problem] How to configure producers/consumers in MirrorMaker2 connectors?
- In particular, the producer and consumer configurations in MirrorSourceConnector are important for tuning MirrorMaker2. However, currently there is no clear description of it in our documentation:
- What configuration methods are provided for producers/consumers?
- Which Kafka CR yaml configurations affect which producer/consumer configurations?
- In fact, even the official upstream Kafka documentation on MirrorMaker2 is quite unclear on how to configure producers/consumers in MirrorMakers.
- And for example, “my-target-cluster.config:producer.buffer.memory” is used in our documentation(1), however, there is no mention explaining that a producer.* or consumer.* is available in Kafka CR yaml.
- Note that "my-target-cluster.config:producer.buffer.memory" affects not only MirrorSourceConnector’s producer but also MirrorCheckpointConnector’s and MirrorHeartbeatConnector’s one. (I don't think it's a good method to configure it.)
- Note that "my-target-cluster.config:producer.buffer.memory" affects not only MirrorSourceConnector’s producer but also MirrorCheckpointConnector’s and MirrorHeartbeatConnector’s one. (I don't think it's a good method to configure it.)
- In particular, the producer and consumer configurations in MirrorSourceConnector are important for tuning MirrorMaker2. However, currently there is no clear description of it in our documentation:
- [RFE 1] The documentation include how to configure the MirrorMaker2 connector producers/consumers
- List of producers/consumers within each in connectors
- How to change the configurations for each producer/consumer
- For example, list of producers/consumers and configurations as below, (as long as each of these configurations is used, it is configured only for the corresponding producer/consumer and does not affect others):
- MirrorSourceConnector - producer (mirrors.sourceConnector.config: producer.override.*) #created in Kafka Connect framework - consumer (mirrors.sourceConnector.config: consumer.* or mirrors.sourceConnector.config: source.consumer.* ) #created in MirrorMaker2 - producer for offset commit (mirrors.sourceConnector.config: producer.*) #created in MirrorMaker2 - MirrorHeartbeatConnector - producer (mirrors.heartbeatConnector.config: producer.override.*) #created in Kafka Connect framework - MirrorCheckpointConnector - producer (mirrors.checkpointConnector.config: producer.override.*) #created in Kafka Connect framework - consumer (mirrors.checkpointConnector.config: consumer.*) #created in MirrorMaker2 hoge
- [RFE 2] Add configuration items for producers/consumers in each connector definition in Kafka CR yaml
- As you can see from the above list of producers/consumers in {}RFE 1{}, at first glance at the configuration, it is not clear which producer/consumer is affected.
- some have "override" and others have not. These cases depend on whether the producers/consumers were created with the Kafka Connect framework or with MirrorMaker2.
- Therefore, it could be helpful to the user to add explicit producer and consumer configuration items. For example:
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaMirrorMaker2 metadata: name: my-mirror-maker-2 spec: ... mirrors: ... sourceConnector: config: ... sourceProducer: config: ... sourceConsumer: config: ... offsetProducer: config: ... ...
- As you can see from the above list of producers/consumers in {}RFE 1{}, at first glance at the configuration, it is not clear which producer/consumer is affected.
-
- In this case, it is not enough to just add the documentation, because it involves changes to the Kafka CRD.
- As another option, there is also a way to provide a prefix for specific producer/consumer in mirrors.xxxConnector.config without changing the Kafka CRD as below:
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaMirrorMaker2 metadata: name: my-mirror-maker-2 spec: ... mirrors: ... sourceConnector: config: sourceProducer.xxx.xxx sourceConsumer.xxx.xxx offsetProducer.xxx.xxx
-
- I have stated this as a RFE to consider in development team, however, I am neutral on this RFE to change this Kafka CR yaml. There are also some concerns:
- The producer/consumer of each connector is not documented in the official community Kafka documentation.
- New producer/consumer may be added/removed in the future. The challenge is whether we can continue to maintain them.
- For a radical solution, the upstream Kafka community may need to list producers/consumers in MirrorMaker2 connectors and provide explicit ways to configure them.
- In fact, even the upstream Kafka community documentation on MirrorMaker2 is quite unclear on how to configure producers/consumers.
- And the scope of the configuration's impact is completely unimaginable.
- I believe I can create KIP/JIRA/PR if needed for the upstream Kafka community.
- I have stated this as a RFE to consider in development team, however, I am neutral on this RFE to change this Kafka CR yaml. There are also some concerns:
- [as far as I've researched]
- MirrorMaker2 has multiple Kafka Connectors, with multiple producers/consumers within each, some of them belonging to the Kafka Connect framework, and others specific to MirrorMaker2.
- The configuration method differs between MirrorMaker-specific producers/consumers and Kafka Connect framework producers/consumers.
- producers/consumers created in Kafka Connect framework use following configurations
- mirrors.xxxConnector.config: {producer or consumer}.override.*
- {source_or_target}
.config: {producer_or_consumer}.* is overridden
- producers/consumers created in Kafka Connect framework use following configurations
-
-
- producers/consumers created in MirrorMaker2 use following configurations
- mirrors.xxxConnector.config: {source_or_target}.{producer_or_consumer}
- mirrors.xxxConnector.config: {producer_or_consumer}.* is overridden
- producers/consumers created in MirrorMaker2 use following configurations
- list of producers/consumers and configurations
- As far as I have actually tried(attached file: kafka-mirror-maker-2.yaml) and confirmed them using the debugger, the following configurations were used for each producer/consumer.
- MirrorSourceConnector - producer in WorkerSourceTask #created in Kafka Connect framework - sourceConnector.config: producer.override.buffer.memory(=7770303) - my-target-cluster.config: producer.buffer.memory(=7770202) is overridden - consumer in MirrorSourceTask:93 #created in MirrorMaker2 - (sourceConnector.config: source.consumer.fetch.max.bytes(=55500306)) - always prefix is “source”. However, the prefix can be omitted, as it is sufficiently specific. - sourceConnector.config: consumer.fetch.max.bytes(=55500302) - offsetProducer in MirrorSourceTask:94 as offsetProducer : #created in MirrorMaker2 - (sourceConnector.config: source.producer.buffer.memory(=7770306)) - “source” prefix depend on where OffsetSyncStore is created on source or target cluster. However, the prefix can be omitted, as it is sufficiently specific. - sourceConnector.config:producer.buffer.memory(=7770302) - MirrorHeartbeatConnector - producer in WorkerSourceTask #created in Kafka Connect framework - heartbeatConnector.config: producer.override.buffer.memory(=7770403) - my-target-cluster.config: producer.buffer.memory(=7770202) is overridden - MirrorCheckpointConnector - producer in WorkerSourceTask #created in Kafka Connect framework - checkpointConnector.config: producer.override.buffer.memory(=7770503) - my-target-cluster.config: producer.buffer.memory(=7770202) is overridden - consumer in OffsetSyncStore in MirrorCheckpointTask:94 #created in MirrorMaker2 - (checkpointConnector.config: source.consumer.fetch.max.bytes(=55500506)) - “source” prefix depend on where OffsetSyncStore is created on source or target cluster. However, the prefix can be omitted, as it is sufficiently specific. - checkpointConnector.config: consumer.fetch.max.bytes:55500502 - Kafka Connect - KafkaBasedLog for each backing stores (KafkaOffsetBackingStore, KafkaStatusBackingStore and KafkaConfigBackingStore) - producer #value= my-target-cluster.config:buffer.memory=7770201 - consumer #value= my-target-cluster.config:fetch.max.bytes=55500201
- As far as I have actually tried(attached file: kafka-mirror-maker-2.yaml) and confirmed them using the debugger, the following configurations were used for each producer/consumer.
-
- Investigating AMQ Streams on RHEL
- I used the attached mm2deployment_rhel.yaml for testing.
- The following result is with only my-source-cluster->my-target-cluster enabled and "/opt/kafka/bin/connect-mirror-maker.sh --clusters my-target-cluster".
- However, if my-source-cluster->my-target-cluster & my-target-cluster->my-source-cluster are enabled, the cluster with inverted source and target are also used as valid producer/consumer configurations.
- MirrorSourceConnector - consumer to retrieve topic messages from the source cluster - connect-mirror-maker.properties: my-source-cluster.consumer.fetch.max.bytes 55500110(MirrorSourceTask:93) - producer to send topic messages to the target cluster - connect-mirror-maker.properties: my-target-cluster.producer.buffer.memory 7770111(buildWorkerTak:601) - producer to write to the offset sync topic, which maps the source and target offsets for replicated topic partitions. - connect-mirror-maker.properties: my-source-cluster.producer.buffer.memory 7770110(MirrorSourceTask:94) - MirrorHeartbeatConnector - producer to emit heartbeats - connect-mirror-maker.properties: my-target-cluster.producer.buffer.memory 7770111(buildWorkerTask:601) - MirrorCheckpointConnector - producer to emits consumer offset checkpoints - connect-mirror-maker.properties: my-target-cluster.producer.buffer.memory 7770111(buildWorkerTask:601) - consumer to load the offset sync topic - connect-mirror-maker.properties: my-source-cluster.consumer.fetch.max.bytes 55500110(MirrorCheckpointTask:94)
(1) "producer.buffer.memory" is used in our documentation
https://access.redhat.com/documentation/en-us/red_hat_amq_streams/2.1/html-single/configuring_amq_streams_on_openshift/index#con-mirrormaker-high-volume-messages-str