-
Bug
-
Resolution: Done
-
Major
-
0.8.3.Final
-
None
When streaming data from a Postgres DB, and a table that already has data in, the last record from the snapshot does not have the right value for the "last_snapshot_record" field: it is false instead of true.
I believe the issue to be in io/debezium/connector/postgresql/RecordsSnapshotProducer.java, I have a small patch solving it but it is not very nice.
--- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsSnapshotProducer.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsSnapshotProducer.java @@ -206,11 +206,21 @@ public class RecordsSnapshotProducer extends RecordsProducer { // process and send the last record after marking it as such logger.info("Step 5: sending the last snapshot record"); sourceInfo.markLastSnapshotRecord(); + final Struct oldValue = (Struct)currentRecord.value(); + final Struct newValue = new Struct(oldValue.schema()); + newValue.put(Envelope.FieldName.OPERATION, oldValue.get( Envelope.FieldName.OPERATION )); + newValue.put(Envelope.FieldName.AFTER, oldValue.get( Envelope.FieldName.AFTER )); + newValue.put(Envelope.FieldName.SOURCE, sourceInfo.source()); + newValue.put(Envelope.FieldName.TIMESTAMP, oldValue.get(Envelope.FieldName.TIMESTAMP)); + this.currentRecord.set(new SourceRecord(currentRecord.sourcePartition(), sourceInfo.offset(), currentRecord.topic(), currentRecord.kafkaPartition(), currentRecord.keySchema(), currentRecord.key(), - currentRecord.valueSchema(), currentRecord.value())); - + currentRecord.valueSchema(), newValue)); sendCurrentRecord(consumer); }
- relates to
-
DBZ-797 "source" block for Postgres events shouldn't contain "last_snapshot_record" field
- Open