-
Enhancement
-
Resolution: Done
-
Major
-
None
-
None
-
False
-
None
-
False
Which use case/requirement will be addressed by the proposed feature?
The KafkaRecordEmitter class contains a synchronized block as follows:
synchronized (lock) { ProducerRecord<byte[], byte[]> producerRecord = toProducerRecord(record); Future<RecordMetadata> future = producer.send(producerRecord); LOGGER.trace("Sent to topic {}: {}", producerRecord.topic(), record); futures.put(record, future); maybeFlushAndMarkOffset(); }
This implementation slows down the event consumption process. Moreover, the explicit flushing of the Kafka producer is unnecessary since the Kafka producer already manages its own buffer flushing internally.
Implementation ideas (optional)
To eliminate the synchronized block, I propose leveraging the Kafka producer callback mechanism. This approach allows for acknowledgment from the Kafka producer once a record has been successfully produced. Subsequently, we can mark the offset of the event in the offset writer.
In order to avoid any synchronized blocks, it's crucial to ensure that the file offset writer is thread-safe from within. My suggestion is to transform the file offset writer into an asynchronous component, with all operations being carried out on a single-threaded executor.
I have developed a prototype based on this concept and will submit a pull request for further discussion.