Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-1697

Last Snapshot Record flag not set if last table to be monitored has no snapshot query defined

    XMLWordPrintable

Details

    • Bug
    • Resolution: Unresolved
    • Trivial
    • None
    • 0.10.0.Beta4, 0.10.0.CR1, 0.10.0.CR2, 0.10.0.Final, 1.0.0.Beta1, 1.0.0.Beta2, 1.0.0.Beta3, 1.0.0.CR1, 1.0.0.Final
    • core-library
    • None
    • Hide

      1. Create two tables in any RDBMS.

      CREATE DATABASE dbz1174; USE DATABASE dbz1174;
      CREATE TABLE table1 (id INT PRIMARY KEY);
      INSERT INTO table1 (id) VALUES (1), (2), (3);
      CREATE TABLE table2 (id INT PRIMARY KEY);
      INSERT INTO table2 (id) VALUES (1), (2), (3);
      

      2. Create a debezium connector with the following configuration.

      {
        "name": "inventory-connector",  
        "config": {
          "connector.class": "io.debezium.connector.postgresql.PostgresConnector", 
          "plugin.name": "wal2json",
          "database.hostname": "172.17.0.1", 
          "database.port": "5432", 
          "database.user": "user", 
          "database.password": "password", 
          "database.dbname" : "dbz1174", 
          "database.server.name": "fullfillment", 
          "table.whitelist": "public.table1,public.table2",
          "snapshot.mode": "custom",
          "snapshot.custom.class": "io.debezium.test.dbz1174",
          "snapshot.select.statement.overrides": "public.table1,public.table2",
          "snapshot.select.statement.overrides.public.table1": "SELECT * FROM public.table1 WHERE id > 0"
        }
      }
      

      3. Provide custom snapshotter class as below:

      package io.debezium.test;
      
      import java.util.Optional;
      
      import io.debezium.connector.postgresql.spi.OffsetState;
      import io.debezium.connector.postgresql.spi.SlotCreationResult;
      import io.debezium.connector.postgresql.spi.SlotState;
      import io.debezium.connector.postgresql.spi.Snapshotter;
      import io.debezium.relational.TableId;
      
      public class dbz1174 implements Snapshotter {
          @Override
          public void init(PostgresConnectorConfig config, OffsetState sourceInfo, SlotState slotState) {
          }
      
          @Override
          public boolean shouldSnapshot() {
              return true;
          }
      
          @Override
          public boolean shouldStream() {
              return true;
          }
      
          @Override
          public boolean exportSnapshot() {
              return true;
          }
      
          @Override
          public Optional<String> buildSnapshotQuery(TableId tableId) {
              // on an empty state, don't read from s2 schema, but afterwards, do
              if (!tableId.table().equals("table2")) {
                  return Optional.empty();
              }
              else {
                  return Optional.of("select * from " + tableId.toDoubleQuotedString());
              }
          }
      
          @Override
          public String snapshotTransactionIsolationLevelStatement(SlotCreationResult newSlotInfo) {
              if (newSlotInfo != null) {
                  String snapSet = String.format("SET TRANSACTION SNAPSHOT '%s';", newSlotInfo.snapshotName());
                  return "SET TRANSACTION ISOLATION LEVEL REPEATABLE READ; \n" + snapSet;
              }
              else {
                  return "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY, DEFERRABLE;";
              }
          }
      }
      

      4. Deploy.

      The result is that the log For table 'public.table2' the select statement was not provided, skipping table and IN CODE THE sourceInfo.snapshot = SnapshotRecord.TRUE (instead of LAST).

      Show
      1. Create two tables in any RDBMS. CREATE DATABASE dbz1174; USE DATABASE dbz1174; CREATE TABLE table1 (id INT PRIMARY KEY ); INSERT INTO table1 (id) VALUES (1), (2), (3); CREATE TABLE table2 (id INT PRIMARY KEY ); INSERT INTO table2 (id) VALUES (1), (2), (3); 2. Create a debezium connector with the following configuration. { "name" : "inventory-connector" , "config" : { "connector.class" : "io.debezium.connector.postgresql.PostgresConnector" , "plugin.name" : "wal2json" , "database.hostname" : "172.17.0.1" , "database.port" : "5432" , "database.user" : "user" , "database.password" : "password" , "database.dbname" : "dbz1174" , "database.server.name" : "fullfillment" , "table.whitelist" : " public .table1, public .table2" , "snapshot.mode" : "custom" , "snapshot.custom.class" : "io.debezium.test.dbz1174" , "snapshot.select.statement.overrides" : " public .table1, public .table2" , "snapshot.select.statement.overrides. public .table1" : "SELECT * FROM public .table1 WHERE id > 0" } } 3. Provide custom snapshotter class as below: package io.debezium.test; import java.util.Optional; import io.debezium.connector.postgresql.spi.OffsetState; import io.debezium.connector.postgresql.spi.SlotCreationResult; import io.debezium.connector.postgresql.spi.SlotState; import io.debezium.connector.postgresql.spi.Snapshotter; import io.debezium.relational.TableId; public class dbz1174 implements Snapshotter { @Override public void init(PostgresConnectorConfig config, OffsetState sourceInfo, SlotState slotState) { } @Override public boolean shouldSnapshot() { return true ; } @Override public boolean shouldStream() { return true ; } @Override public boolean exportSnapshot() { return true ; } @Override public Optional< String > buildSnapshotQuery(TableId tableId) { // on an empty state, don't read from s2 schema, but afterwards, do if (!tableId.table().equals( "table2" )) { return Optional.empty(); } else { return Optional.of( "select * from " + tableId.toDoubleQuotedString()); } } @Override public String snapshotTransactionIsolationLevelStatement(SlotCreationResult newSlotInfo) { if (newSlotInfo != null ) { String snapSet = String .format( "SET TRANSACTION SNAPSHOT '%s' ;" , newSlotInfo.snapshotName()); return "SET TRANSACTION ISOLATION LEVEL REPEATABLE READ; \n" + snapSet; } else { return "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY, DEFERRABLE;" ; } } } 4. Deploy. The result is that the log For table 'public.table2' the select statement was not provided, skipping table and IN CODE THE sourceInfo.snapshot = SnapshotRecord.TRUE (instead of LAST).

    Description

      The core connector that handles relational sources snapshots (io/debezium/relational/RelationalSnapshotChangeEventSource.java) is responsible for marking the last snapshot record.

      It is done by checking a flag to determine if the table being snapshotted is the last table. If yes, then the last record published for that table is marked as the last snapshot record.

      But before all this is done there is an early return if custom snapshot query was enabled but was not provided for the current table.

      This means that if the above conditions are combined, ie. the last table to be snapshotted has a missing snapshot query, then NO record will be marked as last snapshot record.

      Since this condition is so difficult to invoke (custom snapshot mode + select statement override + somehow missing snapshot query for last table) I don't think this is a priority. Also, I'm not sure what the impact is of SnapshotRecord.LAST not being set. I cannot see it being in any place except isSnapshot() method but that will still work since it returns true for both SnapshotRecord.TRUE and SnapshotRecord.LAST.

      Attachments

        Activity

          People

            Unassigned Unassigned
            hashhar Ashhar Hasan (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: