Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-7575

Debezium Server Kafka BLOCKED forever when Kafka send failed

XMLWordPrintable

    • False
    • None
    • False

      Bug report

       

      What Debezium connector do you use and what version?

      components:debezium-server-kafka, debezium-connector-mysql
      version:2.5.1.Final

      What behaviour do you see?

      When the Kafka send failed, the debezium server will be blocked forever. That is beacuse the CountDownLatch only countdown on send success case, when Kafka send failed, an Exception will be raised but the latch will not countdown. So the latch will never count down to zero and the server will be blocked forever.

       

      io.debezium.server.kafka.KafkaChangeConsumer#handleBatch

       

      @Override
      public void handleBatch(final List<ChangeEvent<Object, Object>> records,
                              final RecordCommitter<ChangeEvent<Object, Object>> committer)
              throws InterruptedException {
          final CountDownLatch latch = new CountDownLatch(records.size());
          for (ChangeEvent<Object, Object> record : records) {
              try {
                  LOGGER.trace("Received event '{}'", record);
                  Headers headers = convertKafkaHeaders(record);
      
                  String topicName = streamNameMapper.map(record.destination());
                  producer.send(new ProducerRecord<>(topicName, null, null, record.key(), record.value(), headers), (metadata, exception) -> {
                      if (exception != null) {
                          LOGGER.error("Failed to send record to {}:", topicName, exception);
                          throw new DebeziumException(exception);
                      }
                      else {
                          LOGGER.trace("Sent message with offset: {}", metadata.offset());
                          latch.countDown();
                      }
                  });
                  committer.markProcessed(record);
              }
              catch (Exception e) {
                  throw new DebeziumException(e);
              }
          }
      
          latch.await();
          committer.markBatchFinished();
      } 

       

       

      Feature request or enhancement

       

      Implementation ideas (optional)

      Add a Timeout for the latch or move latch.countDown() to a finally block.

              vjuranek@redhat.com Vojtech Juranek
              teo_0 Shen Yi (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

                Created:
                Updated:
                Resolved: