Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-2897

DebeziumEngine RecordChangeEvents cannot be modified

XMLWordPrintable

    • Icon: Enhancement Enhancement
    • Resolution: Done
    • Icon: Minor Minor
    • 1.5.0.Beta1
    • 1.3.1.Final
    • embedded-engine
    • None

      Debezium EmbeddedEngine API prevents the creation and modification of a RecordChangeEvent in a ChangeConsumer. This means that custom ChangeConsumer cannot commit a different change event than what is received by the consumer. One use case for this is to create exactly once semantics with performant batch processing (ie no need to set flush interval to 0). This ticket is meant to determine how the API can be designed such that it can allow for implementations of DebeziumEngine.ChangeConsumer to create/modify events prior to committing them.

      Gitter thread

      To reproduce this, attempt to modify the record contained by an event in DebeziumEngine.ChangeConsumer's handleBatch method, and to then commit the event with the new record.

      class MyEngine {
      
          private DebeziumEngine engine;
      
          // [...] denotes omitted irrelevant portions
      
          public void run(final DebeziumMysqlSourceConnectorConfiguration configuration,
                          final Environment environment) {
      
              engine = DebeziumEngine
                      .create(ChangeEventFormat.of(Connect.class))
                      .using(new Properties())
                      .notifying(new MyChangeConsumer())
                      .build();
      
              ExecutorService executor = Executors.newSingleThreadExecutor();
              executor.execute(engine);
          }
      
          class MyChangeConsumer implements DebeziumEngine.ChangeConsumer<RecordChangeEvent<SourceRecord>> {
              @Override
              public void handleBatch(List<RecordChangeEvent<SourceRecord>> records,
                                      DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> committer)
                      throws InterruptedException {
                  SourceRecord lastRecord = null;
                  for (RecordChangeEvent<SourceRecord> recordChangeEvent : records) {
                      SourceRecord record = recordChangeEvent.record();
                      try {
                          // [...] process the record, but don't commit it
                      } catch (StopConnectorException | StopEngineException ex) {
                          // [...] handle exception
                      } catch (Exception e) {
                          // [...] handle exception
                      }
                      lastRecord = record;
                  }
      
                  // [...] block until all messages are published to kafka
      
                  // create new record offset info
                  Map<String, String> newSourceOffset = (Map<String, String>) lastRecord.sourceOffset();
                  Map<String, Long> topicToKakfaOffsetMap = new HashMap<>();
                  // get the latest offset published to kafka
                  topicToKakfaOffsetMap.put("topic_partition1", 1L);
                  newSourceOffset.put("topicOffsetMap", new JSONObject(topicToKakfaOffsetMap).toString());
      
      
                  // Create a new source record that contains the offsets for each partition
                  // This is used to skip messages upon recovery to preserve exactly once semantics
                  SourceRecord mergedRecord = new SourceRecord(lastRecord.sourcePartition(), newSourceOffset, lastRecord.topic(),
                          lastRecord.kafkaPartition(), lastRecord.keySchema(), lastRecord.key(), lastRecord.valueSchema(), lastRecord.value(),
                          lastRecord.timestamp(), lastRecord.headers());
      
                  // Create a new record change
                  // to contain the merged record with offset data
                  RecordChangeEvent<SourceRecord> mergedRecordChangeEvent = new RecordChangeEvent<SourceRecord>() {
                      @Override
                      public SourceRecord record() {
                          return mergedRecord;
                      }
                  };
                  // Next line throws a cast error at runtime because we cannot
                  // Cast from a RecordChangeEvent to an EmbeddedChangeEvent
                  committer.markProcessed(mergedRecordChangeEvent);
                  committer.markBatchFinished();
              }
          }
      }
      

       The exact exception is

      ERROR [2021-01-05 19:42:10,635] io.debezium.embedded.EmbeddedEngine: Stopping connector after error in the application's handler method: class com.yelp.MysqlConnector.DebeziumMysqlSourceConnectorApplication$MyChangeConsumer$1 cannot be cast to class io.debezium.embedded.EmbeddedEngineChangeEvent
      […]
      ! at io.debezium.embedded.ConvertingEngineBuilder$1.markProcessed(ConvertingEngineBuilder.java:89)

              Unassigned Unassigned
              thomast-2 Thomas Thornton (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

                Created:
                Updated:
                Resolved: