-
Enhancement
-
Resolution: Done
-
Minor
-
1.3.1.Final
-
None
-
False
-
False
-
Undefined
-
Debezium EmbeddedEngine API prevents the creation and modification of a RecordChangeEvent in a ChangeConsumer. This means that custom ChangeConsumer cannot commit a different change event than what is received by the consumer. One use case for this is to create exactly once semantics with performant batch processing (ie no need to set flush interval to 0). This ticket is meant to determine how the API can be designed such that it can allow for implementations of DebeziumEngine.ChangeConsumer to create/modify events prior to committing them.
To reproduce this, attempt to modify the record contained by an event in DebeziumEngine.ChangeConsumer's handleBatch method, and to then commit the event with the new record.
class MyEngine { private DebeziumEngine engine; // [...] denotes omitted irrelevant portions public void run(final DebeziumMysqlSourceConnectorConfiguration configuration, final Environment environment) { engine = DebeziumEngine .create(ChangeEventFormat.of(Connect.class)) .using(new Properties()) .notifying(new MyChangeConsumer()) .build(); ExecutorService executor = Executors.newSingleThreadExecutor(); executor.execute(engine); } class MyChangeConsumer implements DebeziumEngine.ChangeConsumer<RecordChangeEvent<SourceRecord>> { @Override public void handleBatch(List<RecordChangeEvent<SourceRecord>> records, DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> committer) throws InterruptedException { SourceRecord lastRecord = null; for (RecordChangeEvent<SourceRecord> recordChangeEvent : records) { SourceRecord record = recordChangeEvent.record(); try { // [...] process the record, but don't commit it } catch (StopConnectorException | StopEngineException ex) { // [...] handle exception } catch (Exception e) { // [...] handle exception } lastRecord = record; } // [...] block until all messages are published to kafka // create new record offset info Map<String, String> newSourceOffset = (Map<String, String>) lastRecord.sourceOffset(); Map<String, Long> topicToKakfaOffsetMap = new HashMap<>(); // get the latest offset published to kafka topicToKakfaOffsetMap.put("topic_partition1", 1L); newSourceOffset.put("topicOffsetMap", new JSONObject(topicToKakfaOffsetMap).toString()); // Create a new source record that contains the offsets for each partition // This is used to skip messages upon recovery to preserve exactly once semantics SourceRecord mergedRecord = new SourceRecord(lastRecord.sourcePartition(), newSourceOffset, lastRecord.topic(), lastRecord.kafkaPartition(), lastRecord.keySchema(), lastRecord.key(), lastRecord.valueSchema(), lastRecord.value(), lastRecord.timestamp(), lastRecord.headers()); // Create a new record change // to contain the merged record with offset data RecordChangeEvent<SourceRecord> mergedRecordChangeEvent = new RecordChangeEvent<SourceRecord>() { @Override public SourceRecord record() { return mergedRecord; } }; // Next line throws a cast error at runtime because we cannot // Cast from a RecordChangeEvent to an EmbeddedChangeEvent committer.markProcessed(mergedRecordChangeEvent); committer.markBatchFinished(); } } }
The exact exception is
ERROR [2021-01-05 19:42:10,635] io.debezium.embedded.EmbeddedEngine: Stopping connector after error in the application's handler method: class com.yelp.MysqlConnector.DebeziumMysqlSourceConnectorApplication$MyChangeConsumer$1 cannot be cast to class io.debezium.embedded.EmbeddedEngineChangeEvent […] ! at io.debezium.embedded.ConvertingEngineBuilder$1.markProcessed(ConvertingEngineBuilder.java:89)