  1. Debezium
  2. DBZ-1118

Data from Postgres partitioned table written to wrong topic during snapshot


      DB Schema

      create table public.dbz_repo
        user_id       bigint           not null,
        sequence_id   bigint           not null,
        longitude     double precision not null,
        latitude      double precision not null,
        created_by_id bigint           not null,
        client_ts     timestamp        not null,
        server_ts     timestamp        not null,
        type          varchar(50)      not null
        partition by RANGE (user_id);
      create table public.dbz_repo_1_500
        partition of public.dbz_repo
          constraint dbz_repo_1_500_pkey
            primary key (user_id, sequence_id)
          FOR VALUES FROM ('1') TO ('501');
      create table public.dbz_repo_501_1000
        partition of public.dbz_repo
          constraint dbz_repo_501_1000_pkey
            primary key (user_id, sequence_id)
          FOR VALUES FROM ('501') TO ('1001');
      create table public.dbz_repo_1001_1500
        partition of public.dbz_repo
          constraint dbz_repo_1001_1500_pkey
            primary key (user_id, sequence_id)
          FOR VALUES FROM ('1001') TO ('1501');
      create table public.dbz_repo_1501_2000
        partition of public.dbz_repo
          constraint dbz_repo_1501_2000_pkey
            primary key (user_id, sequence_id)
          FOR VALUES FROM ('1501') TO ('2001');

      DB Inserts

      insert into dbz_repo
      (user_id, sequence_id, longitude, latitude, created_by_id, client_ts, server_ts, type)
      select random() * 1999 + 1, random() * 1540277658018 + 1, random() * 180, random() * 180, random() * 1999 + 1, now(), now(), 'abcdefg'
      from generate_series(1, 2000000);

      Debezium config

          "name": "debezium_repo",
          "config": {
            "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
            "plugin.name": "wal2json_rds_streaming",
            "database.hostname": "<HOST>.rds.amazonaws.com",
            "database.port": "5432",
            "database.user": "debezium",
            "database.password": "<PASSWORD>",
            "database.dbname" : "nextgeo",
            "database.server.name": "debezium,
            "decimal.handling.mode": "string",
            "hstore.handling.mode": "json",
            "tombstones.on.delete": false,
            "table.whitelist": ".*dbz.*",
            "slot.name": "debezium_repo"

      Python comparison script

      import psycopg2
      from kafka import KafkaConsumer
      psql_host = "<HOST>.rds.amazonaws.com"
      psql_user = "debezium"
      psql_pass = "<PASSWORD>"
      psql_db = "debezium"
      db = psycopg2.connect(host=psql_host, database=psql_db, user=psql_user, password=psql_pass)
      cur = db.cursor()
      tables = []
      cur.execute("""SELECT table_name
      FROM information_schema.tables
      WHERE table_type='BASE TABLE'
      AND table_schema='public'
      AND table_name LIKE '%dbz%'
      ORDER BY table_name;""")
      for row in cur.fetchall():
      print("table | rows_in_db | records_in_topic | diff")
      for table in tables:
          cur.execute("select count(*) from " + table)
          rows = cur.fetchone()[0]
          consumer = KafkaConsumer("debezium.public." + table,
          count = 0
          for _ in consumer:
              count += 1
          diff = rows - count
          print(table + " | " + str(rows) + " | " + str(count) + " | " + str(diff))
      During a snapshot phase, when reading from a partitioned postgres table, rows from the parent partition table are written to a topic for a derived table.

      As described further in the reproduction notes below, let's create a parent partition named `dbz_repo` and two partition tables named `dbz_repo_1_500` and `dbz_repo_501_1000`. Next we insert random records into the partitioned tables and kick off a debezium snapshot phase.

      After the snapshot completes, the `dbz_repo` topic contains 0 records, while the `dbz_repo_1_500` contains N records from the `dbz_repo_1_500` table PLUS M records from the `dbz_repo` table (i.e. an additionally copy of `dbz_repo_1_500` and everything from `dbz_repo_501_1000`). The `dbz_repo_501_1000` is the only correct topic, containing only a single copy of all rows of the source table.


      During snapshotting there are errors related to flush failures, but otherwise look clean:

      [2019-01-31 18:40:59,085] ERROR WorkerSourceTask

      {id=debezium_repo-0} Failed to flush, timed out while waiting for producer to flush outstanding 29175 messages (org.apache.kafka.connect.runtime.WorkerSourceTask)
      [2019-01-31 18:40:59,091] ERROR WorkerSourceTask{id=debezium_repo-0}

      Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter)

      Comparison output sample

      Attached below is a small python script to compare the number of records in the database tables to the number of records in each topic. Here's sample output for a run:

      table rows_in_db records_in_topic diff
      dbz_repo 2000000 0 2000000
      dbz_repo_1001_1500 500077 500077 0
      dbz_repo_1_500 499596 2499596 -2000000
      dbz_repo_1501_2000 499529 499529 0
      dbz_repo_501_1000 500798 500798 0

