Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-1024

Postgres plugin does not signal the end of snapshot properly

XMLWordPrintable

    • Hide
      • Have a postgres table with a primary key and two rows inside
      • Use the embedded engine (as in https://debezium.io/blog/2018/08/30/streaming-mysql-data-changes-into-kinesis/)
      • In the code, change the Kinesis sender to instead simply log the content of the event
      • start the embedded engine and collect the log results. You will get something similar to:
        {"value":{"before":null,"after":{"tablekey":0,"value":100},"source":{"version":"${project.version}","name":"localhost:46642/postgres","db":"postgres","ts_usec":1544042374043000,"txId":559,"lsn":23350176,"schema":"public","table":"testtable","snapshot":true,"last_snapshot_record":false},"op":"r","ts_ms":1544042374044},"key":{"tablekey":0}}
        {"value":{"before":null,"after":{"tablekey":1,"value":1001},"source":{"version":"${project.version}","name":"localhost:46642/postgres","db":"postgres","ts_usec":1544042382218023,"txId":560,"lsn":23350312,"schema":"public","table":"testtable","snapshot":true,"last_snapshot_record":false},"op":"c","ts_ms":1544042382283},"key":{"tablekey":1}}
        

      While the correct output should be (difference is on the second record, field value.source.last_snapshot_record):

      {"value":{"before":null,"after":{"tablekey":0,"value":100},"source":{"version":"${project.version}","name":"localhost:46642/postgres","db":"postgres","ts_usec":1544042374043000,"txId":559,"lsn":23350176,"schema":"public","table":"testtable","snapshot":true,"last_snapshot_record":false},"op":"r","ts_ms":1544042374044},"key":{"tablekey":0}}
      {"value":{"before":null,"after":{"tablekey":1,"value":1001},"source":{"version":"${project.version}","name":"localhost:46642/postgres","db":"postgres","ts_usec":1544042382218023,"txId":560,"lsn":23350312,"schema":"public","table":"testtable","snapshot":true,"last_snapshot_record":true},"op":"c","ts_ms":1544042382283},"key":{"tablekey":1}}
      
      Show
      Have a postgres table with a primary key and two rows inside Use the embedded engine (as in https://debezium.io/blog/2018/08/30/streaming-mysql-data-changes-into-kinesis/ ) In the code, change the Kinesis sender to instead simply log the content of the event start the embedded engine and collect the log results. You will get something similar to: {"value":{"before":null,"after":{"tablekey":0,"value":100},"source":{"version":"${project.version}","name":"localhost:46642/postgres","db":"postgres","ts_usec":1544042374043000,"txId":559,"lsn":23350176,"schema":"public","table":"testtable","snapshot":true,"last_snapshot_record":false},"op":"r","ts_ms":1544042374044},"key":{"tablekey":0}} {"value":{"before":null,"after":{"tablekey":1,"value":1001},"source":{"version":"${project.version}","name":"localhost:46642/postgres","db":"postgres","ts_usec":1544042382218023,"txId":560,"lsn":23350312,"schema":"public","table":"testtable","snapshot":true,"last_snapshot_record":false},"op":"c","ts_ms":1544042382283},"key":{"tablekey":1}} While the correct output should be (difference is on the second record, field value.source.last_snapshot_record): {"value":{"before":null,"after":{"tablekey":0,"value":100},"source":{"version":"${project.version}","name":"localhost:46642/postgres","db":"postgres","ts_usec":1544042374043000,"txId":559,"lsn":23350176,"schema":"public","table":"testtable","snapshot":true,"last_snapshot_record":false},"op":"r","ts_ms":1544042374044},"key":{"tablekey":0}} {"value":{"before":null,"after":{"tablekey":1,"value":1001},"source":{"version":"${project.version}","name":"localhost:46642/postgres","db":"postgres","ts_usec":1544042382218023,"txId":560,"lsn":23350312,"schema":"public","table":"testtable","snapshot":true,"last_snapshot_record":true},"op":"c","ts_ms":1544042382283},"key":{"tablekey":1}}

      When streaming data from a Postgres DB, and a table that already has data in, the last record from the snapshot does not have the right value for the "last_snapshot_record" field: it is false instead of true.

      I believe the issue to be in io/debezium/connector/postgresql/RecordsSnapshotProducer.java, I have a small patch solving it but it is not very nice.

      --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsSnapshotProducer.java
      +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsSnapshotProducer.java
      @@ -206,11 +206,21 @@ public class RecordsSnapshotProducer extends RecordsProducer {
                       // process and send the last record after marking it as such
                       logger.info("Step 5: sending the last snapshot record");
                       sourceInfo.markLastSnapshotRecord();
      
      +                final Struct oldValue = (Struct)currentRecord.value();
      +                final Struct newValue = new Struct(oldValue.schema());
      +                newValue.put(Envelope.FieldName.OPERATION, oldValue.get( Envelope.FieldName.OPERATION ));
      +                newValue.put(Envelope.FieldName.AFTER, oldValue.get( Envelope.FieldName.AFTER ));
      +                newValue.put(Envelope.FieldName.SOURCE, sourceInfo.source());
      +                newValue.put(Envelope.FieldName.TIMESTAMP, oldValue.get(Envelope.FieldName.TIMESTAMP));
      +
                       this.currentRecord.set(new SourceRecord(currentRecord.sourcePartition(), sourceInfo.offset(),
                                                               currentRecord.topic(), currentRecord.kafkaPartition(),
                                                               currentRecord.keySchema(), currentRecord.key(),
      -                                                        currentRecord.valueSchema(), currentRecord.value()));
      -
      +                                                        currentRecord.valueSchema(), newValue));
                       sendCurrentRecord(consumer);
                   }
      

              jpechane Jiri Pechanec
              daniel.fredouille Daniel Fredouille (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

                Created:
                Updated:
                Resolved: