-
Enhancement
-
Resolution: Done
-
Major
-
None
-
None
-
False
-
False
-
undefined
-
Major Throughput Bottleneck and Inefficient Batching in ChangeEventQueue
while (!timeout.expired() && queue.drainTo(records, maxBatchSize) == 0) { throwProducerExceptionIfPresent(); LOGGER.debug("no records available yet, sleeping a bit..."); // no records yet, so wait a bit metronome.pause(); LOGGER.debug("checking for more records..."); }
1) Pausing the loop upto `pollInterval` without any mechanism for conditional resuming causes backpressure in `doEnqueue` once `queue.size() >= maxQueueSize || currentQueueSizeInBytes.get() >= maxQueueSizeInBytes` condition matches.
- Suppose maxQueueSize is reached, then doEnque should wait upto pollInterval (indirectly) since poll() is waiting upto pollInterval before draining the queue in worst case scenario
- Suppose maxQueueSizeInBytes is reached, then doEnque should wait upto pollInterval (doEnqueue Thread.sleep) + pollInterval (poll metronome.pause) in worst case scenario
2) Immediately terminating the loop and processing the drained records (if drainTo() > 0) leads to inefficient batching since there is always high possibility of records.size() < maxBatchSize and we don't wait upto pollInterval to satisfy records.size() = maxBatchSize
- Suppose the drained records are processed by consumer and then calls poll() batch, there is a high probability that atleast 1 record would be available in queue (depending upon time taken by consumer to process previous batch and time taken by producer to add each record in queue). Assuming that queue now contains 1 record, we immediately let consumer process 1 record without waiting upto pollInterval to satisfy maxBatchSize. This leads to more batches and high load.
Above scenarios can be said for inefficient maxQueueSizeInBytes batching also.
Snapshot Benchmark
package io.debezium.connector.postgresql; import io.debezium.config.CommonConnectorConfig; import io.debezium.config.Configuration; import io.debezium.connector.postgresql.PostgresConnectorConfig.LogicalDecoder; import io.debezium.embedded.Connect; import io.debezium.embedded.EmbeddedEngine; import io.debezium.engine.ChangeEvent; import io.debezium.engine.DebeziumEngine; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.storage.FileOffsetBackingStore; import org.openjdk.jmh.annotations.*; import java.util.List; import java.util.concurrent.TimeUnit; @Fork(1) @State(Scope.Thread) @Warmup(iterations = 2, time = 10) @Measurement(iterations = 2, time = 10) @OutputTimeUnit(TimeUnit.SECONDS) @BenchmarkMode({Mode.AverageTime}) public class CDCPerf { @Param({"10", "50", "500"}) long pollIntervalMillis; Configuration configuration; DebeziumEngine<ChangeEvent<SourceRecord, SourceRecord>> engine; @Setup(Level.Trial) public void setupTrial() { TestHelper.execute("DROP TABLE IF EXISTS properties"); TestHelper.execute("CREATE TABLE properties (prop_name TEXT PRIMARY KEY, prop_value TEXT, timestamp TEXT, autoid INT)"); TestHelper.execute("INSERT INTO properties SELECT i::TEXT, i::TEXT, i::TEXT, i FROM generate_series(1, 1000000) AS t(i)"); configuration = TestHelper.defaultConfig() .with(EmbeddedEngine.CONNECTOR_CLASS, PostgresConnector.class) .with(EmbeddedEngine.ENGINE_NAME, "postgres_cdc_perf") .with(EmbeddedEngine.OFFSET_STORAGE, FileOffsetBackingStore.class) .with(EmbeddedEngine.OFFSET_STORAGE_FILE_FILENAME, "f_offset.dat") .with(PostgresConnectorConfig.PLUGIN_NAME, LogicalDecoder.PGOUTPUT) .with(CommonConnectorConfig.POLL_INTERVAL_MS, pollIntervalMillis) .build(); } @Setup(Level.Invocation) public void setupInvocation() { engine = DebeziumEngine.create(Connect.class) .using(configuration.asProperties()) .notifying(new DebeziumEngine.ChangeConsumer<ChangeEvent<SourceRecord, SourceRecord>>() { private int rows = 0; @Override public void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> records, DebeziumEngine.RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> committer) throws InterruptedException { rows += records.size(); committer.markBatchFinished(); if (rows >= 1_000_000) { Thread.currentThread().interrupt(); } } }) .build(); } @Benchmark public void benchmark_1_snapshot_existing() { engine.run(); } @TearDown(Level.Invocation) public void teardownInvocation() { TestHelper.dropDefaultReplicationSlot(); } }
Benchmark (pollIntervalMillis) Mode Cnt Score Error Units CDCPerf.benchmark_1_snapshot_existing 10 avgt 2 5.844 s/op CDCPerf.benchmark_1_snapshot_existing 50 avgt 2 10.214 s/op CDCPerf.benchmark_1_snapshot_existing 500 avgt 2 61.290 s/op
Incremental CDC Benchmark**
package io.debezium.connector.postgresql; import io.debezium.config.CommonConnectorConfig; import io.debezium.config.Configuration; import io.debezium.connector.postgresql.PostgresConnectorConfig.LogicalDecoder; import io.debezium.connector.postgresql.connection.PostgresConnection; import io.debezium.connector.postgresql.connection.ReplicationConnection; import io.debezium.embedded.Connect; import io.debezium.embedded.EmbeddedEngine; import io.debezium.engine.ChangeEvent; import io.debezium.engine.DebeziumEngine; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.storage.FileOffsetBackingStore; import org.awaitility.Awaitility; import org.openjdk.jmh.annotations.*; import java.sql.ResultSet; import java.util.List; import java.util.concurrent.TimeUnit; @Fork(1) @State(Scope.Thread) @Warmup(iterations = 2, time = 10) @Measurement(iterations = 2, time = 10) @OutputTimeUnit(TimeUnit.SECONDS) @BenchmarkMode({Mode.AverageTime}) public class IncrementalCDCPerf { @Param({"10", "50", "500"}) long pollIntervalMillis; Configuration configuration; Thread thread; @Setup(Level.Trial) public void setupTrial() { configuration = TestHelper.defaultConfig() .with(EmbeddedEngine.CONNECTOR_CLASS, PostgresConnector.class) .with(EmbeddedEngine.ENGINE_NAME, "postgres_cdc_perf") .with(EmbeddedEngine.OFFSET_STORAGE, FileOffsetBackingStore.class) .with(EmbeddedEngine.OFFSET_STORAGE_FILE_FILENAME, "f_offset.dat") .with(PostgresConnectorConfig.PLUGIN_NAME, LogicalDecoder.PGOUTPUT) .with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER) .with(CommonConnectorConfig.POLL_INTERVAL_MS, pollIntervalMillis) .build(); } @Setup(Level.Invocation) public void setupInvocation() throws InterruptedException { TestHelper.execute("DROP TABLE IF EXISTS properties"); TestHelper.execute("CREATE TABLE properties (prop_name TEXT PRIMARY KEY, prop_value TEXT, timestamp TEXT, autoid INT)"); DebeziumEngine<ChangeEvent<SourceRecord, SourceRecord>> engine = DebeziumEngine.create(Connect.class) .using(configuration.asProperties()) .notifying(new DebeziumEngine.ChangeConsumer<ChangeEvent<SourceRecord, SourceRecord>>() { private int rows = 0; @Override public void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> records, DebeziumEngine.RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> committer) throws InterruptedException { rows += records.size(); committer.markBatchFinished(); if (rows >= 1_000_000) { Thread.currentThread().interrupt(); } } }) .build(); thread = new Thread(engine); thread.start(); waitForDefaultReplicationSlotBeActive(LogicalDecoder.parse(configuration.getString(PostgresConnectorConfig.PLUGIN_NAME))); TestHelper.execute("INSERT INTO properties SELECT i::TEXT, i::TEXT, i::TEXT, i FROM generate_series(1, 1000000) AS t(i)"); } @Benchmark public void benchmark_1_incremental_existing() throws InterruptedException { thread.join(); } @TearDown(Level.Invocation) public void teardownInvocation() { thread.interrupt(); TestHelper.dropDefaultReplicationSlot(); } private void waitForDefaultReplicationSlotBeActive(LogicalDecoder logicalDecoder) { try (PostgresConnection connection = TestHelper.create()) { Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> connection.prepareQueryAndMap( "select * from pg_replication_slots where slot_name = ? and database = ? and plugin = ? and active = true", statement -> { statement.setString(1, ReplicationConnection.Builder.DEFAULT_SLOT_NAME); statement.setString(2, "postgres"); statement.setString(3, logicalDecoder.getPostgresPluginName()); }, ResultSet::next)); } } }
Benchmark (pollIntervalMillis) Mode Cnt Score Error Units IncrementalCDCPerf.benchmark_1_incremental_existing 10 avgt 2 8.685 s/op IncrementalCDCPerf.benchmark_1_incremental_existing 50 avgt 2 9.271 s/op IncrementalCDCPerf.benchmark_1_incremental_existing 500 avgt 2 61.186 s/op
ChangeEventQueue Benchmark
package io.debezium.connector.base; import io.debezium.util.LoggingContext; import org.openjdk.jmh.annotations.*; import java.time.Duration; import java.util.concurrent.TimeUnit; import static io.debezium.config.CommonConnectorConfig.*; @Fork(1) @State(Scope.Thread) @Warmup(iterations = 2, time = 5) @Measurement(iterations = 2, time = 5) @OutputTimeUnit(TimeUnit.MILLISECONDS) @BenchmarkMode({Mode.AverageTime}) public class ChangeEventQueuePerf { @Param({"10", "50", "500"}) String pollIntervalMillis; ChangeEventQueue<String> changeEventQueue; @Setup public void setup() { changeEventQueue = new ChangeEventQueue.Builder<String>() .pollInterval(Duration.ofMillis(Long.parseLong(pollIntervalMillis))) .maxQueueSize(DEFAULT_MAX_QUEUE_SIZE).maxBatchSize(DEFAULT_MAX_BATCH_SIZE) .loggingContextSupplier(() -> LoggingContext.forConnector("a", "b", "c")) .maxQueueSizeInBytes(DEFAULT_MAX_QUEUE_SIZE_IN_BYTES).build(); } @Benchmark public void benchmark_1_existing_per_10m() throws InterruptedException { int totalRecords = 10_000_000; Thread t = new Thread(new Runnable() { private long noOfRecords = 0; @Override public void run() { try { while (noOfRecords < totalRecords) { noOfRecords += changeEventQueue.poll().size(); } } catch (InterruptedException e) { throw new RuntimeException(e); } } }); t.start(); String data = "Change Data Capture Even via Debezium - "; for (int i = 1; i <= totalRecords; i++) { changeEventQueue.doEnqueue(data + i); } t.join(); } }
Benchmark (pollIntervalMillis) Mode Cnt Score Error Units ChangeEventQueuePerf.benchmark_1_existing_per_10m 10 avgt 2 12210.889 ms/op ChangeEventQueuePerf.benchmark_1_existing_per_10m 50 avgt 2 61025.820 ms/op ChangeEventQueuePerf.benchmark_1_existing_per_10m 500 avgt 2 608096.485 ms/op
How it was identified
Performing CDC (not snapshot but incremental) on 1M rows PostgreSQL table of 4 columns with maxBatchSize - 10000, maxQueueSize - 20000, pollInterval - 500 ms has taken 26 sec (NOOP DebeziumEngine.notifying)
Analysing it via JVisualVM Sampler revelead 25+ sec being spent in poll() metronome.pause itself.
doEnqueu() spent 12.2 sec waiting for queue to be drained (queue.put)
(JVisualVM Sampler metrics for incremental processing of 1M rows are attached)
https://issues.redhat.com/browse/DBZ-3477 could be related to this
- is related to
-
DBZ-4050 Provide JMH benchmark for ChangeEventQueue
- Closed