-
Bug
-
Resolution: Done
-
Major
-
None
-
False
-
None
-
False
Bug report
What Debezium connector do you use and what version?
components:debezium-server-kafka, debezium-connector-mysql
version:2.5.1.Final
What behaviour do you see?
When the Kafka send failed, the debezium server will be blocked forever. That is beacuse the CountDownLatch only countdown on send success case, when Kafka send failed, an Exception will be raised but the latch will not countdown. So the latch will never count down to zero and the server will be blocked forever.
io.debezium.server.kafka.KafkaChangeConsumer#handleBatch
@Override public void handleBatch(final List<ChangeEvent<Object, Object>> records, final RecordCommitter<ChangeEvent<Object, Object>> committer) throws InterruptedException { final CountDownLatch latch = new CountDownLatch(records.size()); for (ChangeEvent<Object, Object> record : records) { try { LOGGER.trace("Received event '{}'", record); Headers headers = convertKafkaHeaders(record); String topicName = streamNameMapper.map(record.destination()); producer.send(new ProducerRecord<>(topicName, null, null, record.key(), record.value(), headers), (metadata, exception) -> { if (exception != null) { LOGGER.error("Failed to send record to {}:", topicName, exception); throw new DebeziumException(exception); } else { LOGGER.trace("Sent message with offset: {}", metadata.offset()); latch.countDown(); } }); committer.markProcessed(record); } catch (Exception e) { throw new DebeziumException(e); } } latch.await(); committer.markBatchFinished(); }
Feature request or enhancement
Implementation ideas (optional)
Add a Timeout for the latch or move latch.countDown() to a finally block.
- links to
-
RHEA-2024:139598 Red Hat build of Debezium 2.5.4 release