Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-3887

Throughput Bottleneck and Inefficient Batching in ChangeEventQueue

XMLWordPrintable

    • Icon: Enhancement Enhancement
    • Resolution: Done
    • Icon: Major Major
    • 1.7.0.CR2
    • None
    • core-library
    • None
    • False
    • False
    • undefined

      Major Throughput Bottleneck and Inefficient Batching in ChangeEventQueue

      while (!timeout.expired() && queue.drainTo(records, maxBatchSize) == 0) {
          throwProducerExceptionIfPresent();
      
          LOGGER.debug("no records available yet, sleeping a bit...");
          // no records yet, so wait a bit
          metronome.pause();
          LOGGER.debug("checking for more records...");
      }

      1) Pausing the loop upto `pollInterval` without any mechanism for conditional resuming causes backpressure in `doEnqueue` once `queue.size() >= maxQueueSize || currentQueueSizeInBytes.get() >= maxQueueSizeInBytes` condition matches.

      • Suppose maxQueueSize is reached, then doEnque should wait upto pollInterval (indirectly) since poll() is waiting upto pollInterval before draining the queue in worst case scenario
      • Suppose maxQueueSizeInBytes is reached, then doEnque should wait upto pollInterval (doEnqueue Thread.sleep) + pollInterval (poll metronome.pause) in worst case scenario

      2) Immediately terminating the loop and processing the drained records (if drainTo() > 0) leads to inefficient batching since there is always high possibility of records.size() < maxBatchSize and we don't wait upto pollInterval to satisfy records.size() = maxBatchSize

      • Suppose the drained records are processed by consumer and then calls poll() batch, there is a high probability that atleast 1 record would be available in queue (depending upon time taken by consumer to process previous batch and time taken by producer to add each record in queue). Assuming that queue now contains 1 record, we immediately let consumer process 1 record without waiting upto pollInterval to satisfy maxBatchSize. This leads to more batches and high load.

      Above scenarios can be said for inefficient maxQueueSizeInBytes batching also.

      Snapshot Benchmark

      package io.debezium.connector.postgresql;
      
      import io.debezium.config.CommonConnectorConfig;
      import io.debezium.config.Configuration;
      import io.debezium.connector.postgresql.PostgresConnectorConfig.LogicalDecoder;
      import io.debezium.embedded.Connect;
      import io.debezium.embedded.EmbeddedEngine;
      import io.debezium.engine.ChangeEvent;
      import io.debezium.engine.DebeziumEngine;
      import org.apache.kafka.connect.source.SourceRecord;
      import org.apache.kafka.connect.storage.FileOffsetBackingStore;
      import org.openjdk.jmh.annotations.*;
      
      import java.util.List;
      import java.util.concurrent.TimeUnit;
      
      @Fork(1)
      @State(Scope.Thread)
      @Warmup(iterations = 2, time = 10)
      @Measurement(iterations = 2, time = 10)
      @OutputTimeUnit(TimeUnit.SECONDS)
      @BenchmarkMode({Mode.AverageTime})
      public class CDCPerf {
      
          @Param({"10", "50", "500"})
          long pollIntervalMillis;
          Configuration configuration;
          DebeziumEngine<ChangeEvent<SourceRecord, SourceRecord>> engine;
      
          @Setup(Level.Trial)
          public void setupTrial() {
              TestHelper.execute("DROP TABLE IF EXISTS properties");
              TestHelper.execute("CREATE TABLE properties (prop_name TEXT PRIMARY KEY, prop_value TEXT, timestamp TEXT, autoid INT)");
              TestHelper.execute("INSERT INTO properties SELECT i::TEXT, i::TEXT, i::TEXT, i FROM generate_series(1, 1000000) AS t(i)");
              configuration = TestHelper.defaultConfig()
                      .with(EmbeddedEngine.CONNECTOR_CLASS, PostgresConnector.class)
                      .with(EmbeddedEngine.ENGINE_NAME, "postgres_cdc_perf")
                      .with(EmbeddedEngine.OFFSET_STORAGE, FileOffsetBackingStore.class)
                      .with(EmbeddedEngine.OFFSET_STORAGE_FILE_FILENAME, "f_offset.dat")
                      .with(PostgresConnectorConfig.PLUGIN_NAME, LogicalDecoder.PGOUTPUT)
                      .with(CommonConnectorConfig.POLL_INTERVAL_MS, pollIntervalMillis)
                      .build();
          }
      
          @Setup(Level.Invocation)
          public void setupInvocation() {
              engine = DebeziumEngine.create(Connect.class)
                      .using(configuration.asProperties())
                      .notifying(new DebeziumEngine.ChangeConsumer<ChangeEvent<SourceRecord, SourceRecord>>() {
                          private int rows = 0;
      
                          @Override
                          public void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> records, DebeziumEngine.RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> committer) throws InterruptedException {
                              rows += records.size();
                              committer.markBatchFinished();
                              if (rows >= 1_000_000) {
                                  Thread.currentThread().interrupt();
                              }
                          }
                      })
                      .build();
          }
      
          @Benchmark
          public void benchmark_1_snapshot_existing() {
              engine.run();
          }
      
          @TearDown(Level.Invocation)
          public void teardownInvocation() {
              TestHelper.dropDefaultReplicationSlot();
          }
      
      }
      
      Benchmark                              (pollIntervalMillis)  Mode  Cnt   Score   Error  Units
      CDCPerf.benchmark_1_snapshot_existing                    10  avgt    2   5.844           s/op
      CDCPerf.benchmark_1_snapshot_existing                    50  avgt    2  10.214           s/op
      CDCPerf.benchmark_1_snapshot_existing                   500  avgt    2  61.290           s/op
      

      Incremental CDC Benchmark**

      package io.debezium.connector.postgresql;
      
      import io.debezium.config.CommonConnectorConfig;
      import io.debezium.config.Configuration;
      import io.debezium.connector.postgresql.PostgresConnectorConfig.LogicalDecoder;
      import io.debezium.connector.postgresql.connection.PostgresConnection;
      import io.debezium.connector.postgresql.connection.ReplicationConnection;
      import io.debezium.embedded.Connect;
      import io.debezium.embedded.EmbeddedEngine;
      import io.debezium.engine.ChangeEvent;
      import io.debezium.engine.DebeziumEngine;
      import org.apache.kafka.connect.source.SourceRecord;
      import org.apache.kafka.connect.storage.FileOffsetBackingStore;
      import org.awaitility.Awaitility;
      import org.openjdk.jmh.annotations.*;
      
      import java.sql.ResultSet;
      import java.util.List;
      import java.util.concurrent.TimeUnit;
      
      @Fork(1)
      @State(Scope.Thread)
      @Warmup(iterations = 2, time = 10)
      @Measurement(iterations = 2, time = 10)
      @OutputTimeUnit(TimeUnit.SECONDS)
      @BenchmarkMode({Mode.AverageTime})
      public class IncrementalCDCPerf {
      
          @Param({"10", "50", "500"})
          long pollIntervalMillis;
          Configuration configuration;
          Thread thread;
      
          @Setup(Level.Trial)
          public void setupTrial() {
              configuration = TestHelper.defaultConfig()
                      .with(EmbeddedEngine.CONNECTOR_CLASS, PostgresConnector.class)
                      .with(EmbeddedEngine.ENGINE_NAME, "postgres_cdc_perf")
                      .with(EmbeddedEngine.OFFSET_STORAGE, FileOffsetBackingStore.class)
                      .with(EmbeddedEngine.OFFSET_STORAGE_FILE_FILENAME, "f_offset.dat")
                      .with(PostgresConnectorConfig.PLUGIN_NAME, LogicalDecoder.PGOUTPUT)
                      .with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER)
                      .with(CommonConnectorConfig.POLL_INTERVAL_MS, pollIntervalMillis)
                      .build();
          }
      
          @Setup(Level.Invocation)
          public void setupInvocation() throws InterruptedException {
              TestHelper.execute("DROP TABLE IF EXISTS properties");
              TestHelper.execute("CREATE TABLE properties (prop_name TEXT PRIMARY KEY, prop_value TEXT, timestamp TEXT, autoid INT)");
              DebeziumEngine<ChangeEvent<SourceRecord, SourceRecord>> engine = DebeziumEngine.create(Connect.class)
                      .using(configuration.asProperties())
                      .notifying(new DebeziumEngine.ChangeConsumer<ChangeEvent<SourceRecord, SourceRecord>>() {
                          private int rows = 0;
      
                          @Override
                          public void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> records, DebeziumEngine.RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> committer) throws InterruptedException {
                              rows += records.size();
                              committer.markBatchFinished();
                              if (rows >= 1_000_000) {
                                  Thread.currentThread().interrupt();
                              }
                          }
                      })
                      .build();
              thread = new Thread(engine);
              thread.start();
              waitForDefaultReplicationSlotBeActive(LogicalDecoder.parse(configuration.getString(PostgresConnectorConfig.PLUGIN_NAME)));
              TestHelper.execute("INSERT INTO properties SELECT i::TEXT, i::TEXT, i::TEXT, i FROM generate_series(1, 1000000) AS t(i)");
          }
      
          @Benchmark
          public void benchmark_1_incremental_existing() throws InterruptedException {
              thread.join();
          }
      
          @TearDown(Level.Invocation)
          public void teardownInvocation() {
              thread.interrupt();
              TestHelper.dropDefaultReplicationSlot();
          }
      
          private void waitForDefaultReplicationSlotBeActive(LogicalDecoder logicalDecoder) {
              try (PostgresConnection connection = TestHelper.create()) {
                  Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> connection.prepareQueryAndMap(
                          "select * from pg_replication_slots where slot_name = ? and database = ? and plugin = ? and active = true", statement -> {
                              statement.setString(1, ReplicationConnection.Builder.DEFAULT_SLOT_NAME);
                              statement.setString(2, "postgres");
                              statement.setString(3, logicalDecoder.getPostgresPluginName());
                          },
                          ResultSet::next));
              }
          }
      
      }
      
      Benchmark                                             (pollIntervalMillis)  Mode  Cnt   Score   Error  Units
      IncrementalCDCPerf.benchmark_1_incremental_existing                     10  avgt    2   8.685           s/op
      IncrementalCDCPerf.benchmark_1_incremental_existing                     50  avgt    2   9.271           s/op
      IncrementalCDCPerf.benchmark_1_incremental_existing                    500  avgt    2  61.186           s/op

      ChangeEventQueue Benchmark

      package io.debezium.connector.base;
      
      import io.debezium.util.LoggingContext;
      import org.openjdk.jmh.annotations.*;
      
      import java.time.Duration;
      import java.util.concurrent.TimeUnit;
      
      import static io.debezium.config.CommonConnectorConfig.*;
      
      @Fork(1)
      @State(Scope.Thread)
      @Warmup(iterations = 2, time = 5)
      @Measurement(iterations = 2, time = 5)
      @OutputTimeUnit(TimeUnit.MILLISECONDS)
      @BenchmarkMode({Mode.AverageTime})
      public class ChangeEventQueuePerf {
      
          @Param({"10", "50", "500"})
          String pollIntervalMillis;
      
          ChangeEventQueue<String> changeEventQueue;
      
          @Setup
          public void setup() {
              changeEventQueue = new ChangeEventQueue.Builder<String>()
                      .pollInterval(Duration.ofMillis(Long.parseLong(pollIntervalMillis)))
                      .maxQueueSize(DEFAULT_MAX_QUEUE_SIZE).maxBatchSize(DEFAULT_MAX_BATCH_SIZE)
                      .loggingContextSupplier(() -> LoggingContext.forConnector("a", "b", "c"))
                      .maxQueueSizeInBytes(DEFAULT_MAX_QUEUE_SIZE_IN_BYTES).build();
          }
      
          @Benchmark
          public void benchmark_1_existing_per_10m() throws InterruptedException {
              int totalRecords = 10_000_000;
              Thread t = new Thread(new Runnable() {
                  private long noOfRecords = 0;
      
                  @Override
                  public void run() {
                      try {
                          while (noOfRecords < totalRecords) {
                              noOfRecords += changeEventQueue.poll().size();
                          }
                      } catch (InterruptedException e) {
                          throw new RuntimeException(e);
                      }
                  }
              });
              t.start();
              String data = "Change Data Capture Even via Debezium - ";
              for (int i = 1; i <= totalRecords; i++) {
                  changeEventQueue.doEnqueue(data + i);
              }
              t.join();
          }
      
      }
      
      Benchmark                                           (pollIntervalMillis)  Mode  Cnt       Score   Error  Units
      ChangeEventQueuePerf.benchmark_1_existing_per_10m                     10  avgt    2   12210.889          ms/op
      ChangeEventQueuePerf.benchmark_1_existing_per_10m                     50  avgt    2   61025.820          ms/op
      ChangeEventQueuePerf.benchmark_1_existing_per_10m                    500  avgt    2  608096.485          ms/op
      

      How it was identified

      Performing CDC (not snapshot but incremental) on 1M rows PostgreSQL table of 4 columns with maxBatchSize - 10000, maxQueueSize - 20000, pollInterval - 500 ms has taken 26 sec (NOOP DebeziumEngine.notifying)

      Analysing it via JVisualVM Sampler revelead 25+ sec being spent in poll() metronome.pause itself.

      doEnqueu() spent 12.2 sec waiting for queue to be drained (queue.put)

      (JVisualVM Sampler metrics for incremental processing of 1M rows are attached)

      https://issues.redhat.com/browse/DBZ-3477 could be related to this

              Unassigned Unassigned
              krnaveen14 Naveen Kumar (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

                Created:
                Updated:
                Resolved: