Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-7575

Debezium Server Kafka BLOCKED forever when Kafka send failed

    XMLWordPrintable

Details

    • False
    • None
    • False

    Description

      Bug report

       

      What Debezium connector do you use and what version?

      components:debezium-server-kafka, debezium-connector-mysql
      version:2.5.1.Final

      What behaviour do you see?

      When the Kafka send failed, the debezium server will be blocked forever. That is beacuse the CountDownLatch only countdown on send success case, when Kafka send failed, an Exception will be raised but the latch will not countdown. So the latch will never count down to zero and the server will be blocked forever.

       

      io.debezium.server.kafka.KafkaChangeConsumer#handleBatch

       

      @Override
      public void handleBatch(final List<ChangeEvent<Object, Object>> records,
                              final RecordCommitter<ChangeEvent<Object, Object>> committer)
              throws InterruptedException {
          final CountDownLatch latch = new CountDownLatch(records.size());
          for (ChangeEvent<Object, Object> record : records) {
              try {
                  LOGGER.trace("Received event '{}'", record);
                  Headers headers = convertKafkaHeaders(record);
      
                  String topicName = streamNameMapper.map(record.destination());
                  producer.send(new ProducerRecord<>(topicName, null, null, record.key(), record.value(), headers), (metadata, exception) -> {
                      if (exception != null) {
                          LOGGER.error("Failed to send record to {}:", topicName, exception);
                          throw new DebeziumException(exception);
                      }
                      else {
                          LOGGER.trace("Sent message with offset: {}", metadata.offset());
                          latch.countDown();
                      }
                  });
                  committer.markProcessed(record);
              }
              catch (Exception e) {
                  throw new DebeziumException(e);
              }
          }
      
          latch.await();
          committer.markBatchFinished();
      } 

       

       

      Feature request or enhancement

       

      Implementation ideas (optional)

      Add a Timeout for the latch or move latch.countDown() to a finally block.

      Attachments

        Activity

          People

            vjuranek@redhat.com Vojtech Juranek
            teo_0 Shen Yi
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: