-
Bug
-
Resolution: Done
-
Blocker
-
0.1
-
None
The MySQL connector is incorrectly handling row delete events:
public void handleDelete(Event event, SourceInfo source, Consumer<SourceRecord> recorder) { DeleteRowsEventData deleted = event.getData(); long tableNumber = deleted.getTableId(); BitSet includedColumns = deleted.getIncludedColumns(); Converter converter = convertersByTableId.get(tableNumber); if (tableFilter.test(converter.tableId())) { logger.debug("Received delete row event: {}", event); String topic = converter.topic(); Integer partition = converter.partition(); List<Serializable[]> rows = deleted.getRows(); for (int row = 0; row != rows.size(); ++row) { Serializable[] values = rows.get(row); Schema keySchema = converter.keySchema(); Object key = converter.createKey(values, includedColumns); Schema valueSchema = converter.valueSchema(); Struct value = converter.inserted(values, includedColumns); SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition, keySchema, key, valueSchema, value); recorder.accept(record); }
The value should be obtained by calling converter.deleted(...) rather than converter.inserted(...).