diff --git a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/Buffer.java b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/Buffer.java index 94a8a074d..6e8158ec6 100644 --- a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/Buffer.java +++ b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/Buffer.java @@ -22,6 +22,9 @@ public interface Buffer { */ List add(SinkRecordDescriptor recordDescriptor); + default void remove(SinkRecordDescriptor recordDescriptor) { + }; + /** * to clear and flush the internal buffer * @return {@link SinkRecordDescriptor} the flushed buffer records. diff --git a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/JdbcChangeEventSink.java b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/JdbcChangeEventSink.java index 74c39921a..8f620af21 100644 --- a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/JdbcChangeEventSink.java +++ b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/JdbcChangeEventSink.java @@ -124,8 +124,12 @@ public void execute(Collection records) { if (updateBufferByTable.get(tableId) != null && !updateBufferByTable.get(tableId).isEmpty()) { // When an delete arrives, update buffer must be flushed to avoid losing an // delete for the same record after its update. - - flushBuffer(tableId, updateBufferByTable.get(tableId).flush()); + if (config.isUseReductionBuffer()) { + updateBufferByTable.get(tableId).remove(sinkRecordDescriptor); + } + else { + flushBuffer(tableId, updateBufferByTable.get(tableId).flush()); + } } Buffer tableIdBuffer = resolveBuffer(deleteBufferByTable, tableId, sinkRecordDescriptor); @@ -139,8 +143,12 @@ public void execute(Collection records) { if (deleteBufferByTable.get(tableId) != null && !deleteBufferByTable.get(tableId).isEmpty()) { // When an insert arrives, delete buffer must be flushed to avoid losing an insert for the same record after its deletion. // this because at the end we will always flush inserts before deletes. - - flushBuffer(tableId, deleteBufferByTable.get(tableId).flush()); + if (config.isUseReductionBuffer()) { + deleteBufferByTable.get(tableId).remove(sinkRecordDescriptor); + } + else { + flushBuffer(tableId, deleteBufferByTable.get(tableId).flush()); + } } Stopwatch updateBufferStopwatch = Stopwatch.reusable(); diff --git a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/ReducedRecordBuffer.java b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/ReducedRecordBuffer.java index 2bc4b4016..2bd784c75 100644 --- a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/ReducedRecordBuffer.java +++ b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/ReducedRecordBuffer.java @@ -33,6 +33,27 @@ public ReducedRecordBuffer(JdbcSinkConnectorConfig connectorConfig) { this.connectorConfig = connectorConfig; } + @Override + public void remove(SinkRecordDescriptor recordDescriptor) { + if (records.isEmpty()) { + keySchema = recordDescriptor.getKeySchema(); + valueSchema = recordDescriptor.getValueSchema(); + } + + if (!Objects.equals(keySchema, recordDescriptor.getKeySchema()) || !Objects.equals(valueSchema, recordDescriptor.getValueSchema())) { + keySchema = recordDescriptor.getKeySchema(); + valueSchema = recordDescriptor.getValueSchema(); + } + + Struct keyStruct = recordDescriptor.getKeyStruct(connectorConfig.getPrimaryKeyMode(), connectorConfig.getPrimaryKeyFields()); + if (keyStruct != null) { + records.remove(keyStruct); + } + else { + throw new ConnectException("No struct-based primary key defined for record key/value, reduction buffer require struct based primary key"); + } + } + @Override public List add(SinkRecordDescriptor recordDescriptor) { List flushed = new ArrayList<>();