-
Bug
-
Resolution: Done
-
Major
-
0.8.3.Final
-
None
-
Linux RHEL7, postgres 10.3
When streaming data from a Postgres DB, and a table that already has data in, the last record from the snapshot does not have the right value for the "last_snapshot_record" field: it is false instead of true.
I believe the issue to be in io/debezium/connector/postgresql/RecordsSnapshotProducer.java, I have a small patch solving it but it is not very nice.
--- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsSnapshotProducer.java
+++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsSnapshotProducer.java
@@ -206,11 +206,21 @@ public class RecordsSnapshotProducer extends RecordsProducer {
// process and send the last record after marking it as such
logger.info("Step 5: sending the last snapshot record");
sourceInfo.markLastSnapshotRecord();
+ final Struct oldValue = (Struct)currentRecord.value();
+ final Struct newValue = new Struct(oldValue.schema());
+ newValue.put(Envelope.FieldName.OPERATION, oldValue.get( Envelope.FieldName.OPERATION ));
+ newValue.put(Envelope.FieldName.AFTER, oldValue.get( Envelope.FieldName.AFTER ));
+ newValue.put(Envelope.FieldName.SOURCE, sourceInfo.source());
+ newValue.put(Envelope.FieldName.TIMESTAMP, oldValue.get(Envelope.FieldName.TIMESTAMP));
+
this.currentRecord.set(new SourceRecord(currentRecord.sourcePartition(), sourceInfo.offset(),
currentRecord.topic(), currentRecord.kafkaPartition(),
currentRecord.keySchema(), currentRecord.key(),
- currentRecord.valueSchema(), currentRecord.value()));
-
+ currentRecord.valueSchema(), newValue));
sendCurrentRecord(consumer);
}
- relates to
-
DBZ-797 "source" block for Postgres events shouldn't contain "last_snapshot_record" field
-
- Open
-