Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-1118

Data from Postgres partitioned table written to wrong topic during snapshot

XMLWordPrintable

    • Hide

      DB Schema

      create table public.dbz_repo
      (
        user_id       bigint           not null,
        sequence_id   bigint           not null,
        longitude     double precision not null,
        latitude      double precision not null,
        created_by_id bigint           not null,
        client_ts     timestamp        not null,
        server_ts     timestamp        not null,
        type          varchar(50)      not null
      )
        partition by RANGE (user_id);
      
      create table public.dbz_repo_1_500
        partition of public.dbz_repo
          (
          constraint dbz_repo_1_500_pkey
            primary key (user_id, sequence_id)
          )
          FOR VALUES FROM ('1') TO ('501');
      
      create table public.dbz_repo_501_1000
        partition of public.dbz_repo
          (
          constraint dbz_repo_501_1000_pkey
            primary key (user_id, sequence_id)
          )
          FOR VALUES FROM ('501') TO ('1001');
      
      create table public.dbz_repo_1001_1500
        partition of public.dbz_repo
          (
          constraint dbz_repo_1001_1500_pkey
            primary key (user_id, sequence_id)
          )
          FOR VALUES FROM ('1001') TO ('1501');
      
      
      create table public.dbz_repo_1501_2000
        partition of public.dbz_repo
          (
          constraint dbz_repo_1501_2000_pkey
            primary key (user_id, sequence_id)
          )
          FOR VALUES FROM ('1501') TO ('2001');
      

      DB Inserts

      insert into dbz_repo
      (user_id, sequence_id, longitude, latitude, created_by_id, client_ts, server_ts, type)
      select random() * 1999 + 1, random() * 1540277658018 + 1, random() * 180, random() * 180, random() * 1999 + 1, now(), now(), 'abcdefg'
      from generate_series(1, 2000000);
      
      

      Debezium config

      {
          "name": "debezium_repo",
          "config": {
            "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
            "plugin.name": "wal2json_rds_streaming",
            "database.hostname": "<HOST>.rds.amazonaws.com",
            "database.port": "5432",
            "database.user": "debezium",
            "database.password": "<PASSWORD>",
            "database.dbname" : "nextgeo",
            "database.server.name": "debezium,
            "decimal.handling.mode": "string",
            "hstore.handling.mode": "json",
            "tombstones.on.delete": false,
            "table.whitelist": ".*dbz.*",
            "slot.name": "debezium_repo"
          }
      }
      
      

      Python comparison script

      import psycopg2
      from kafka import KafkaConsumer
      
      psql_host = "<HOST>.rds.amazonaws.com"
      psql_user = "debezium"
      psql_pass = "<PASSWORD>"
      psql_db = "debezium"
      
      db = psycopg2.connect(host=psql_host, database=psql_db, user=psql_user, password=psql_pass)
      
      cur = db.cursor()
      
      tables = []
      
      cur.execute("""SELECT table_name
      FROM information_schema.tables
      WHERE table_type='BASE TABLE'
      AND table_schema='public'
      AND table_name LIKE '%dbz%'
      ORDER BY table_name;""")
      
      for row in cur.fetchall():
          tables.append(row[0])
      
      print("table | rows_in_db | records_in_topic | diff")
      
      for table in tables:
      
          cur.execute("select count(*) from " + table)
          rows = cur.fetchone()[0]
      
          consumer = KafkaConsumer("debezium.public." + table,
                                   bootstrap_servers=["kafka:9092"],
                                   enable_auto_commit=False,
                                   consumer_timeout_ms=10000,
                                   auto_offset_reset="earliest",
                                   group_id=None)
      
          count = 0
          for _ in consumer:
              count += 1
      
          diff = rows - count
      
          print(table + " | " + str(rows) + " | " + str(count) + " | " + str(diff))
      
      db.close()
      
      
      Show
      DB Schema create table public .dbz_repo ( user_id bigint not null , sequence_id bigint not null , longitude double precision not null , latitude double precision not null , created_by_id bigint not null , client_ts timestamp not null , server_ts timestamp not null , type varchar (50) not null ) partition by RANGE (user_id); create table public .dbz_repo_1_500 partition of public .dbz_repo ( constraint dbz_repo_1_500_pkey primary key (user_id, sequence_id) ) FOR VALUES FROM ( '1' ) TO ( '501' ); create table public .dbz_repo_501_1000 partition of public .dbz_repo ( constraint dbz_repo_501_1000_pkey primary key (user_id, sequence_id) ) FOR VALUES FROM ( '501' ) TO ( '1001' ); create table public .dbz_repo_1001_1500 partition of public .dbz_repo ( constraint dbz_repo_1001_1500_pkey primary key (user_id, sequence_id) ) FOR VALUES FROM ( '1001' ) TO ( '1501' ); create table public .dbz_repo_1501_2000 partition of public .dbz_repo ( constraint dbz_repo_1501_2000_pkey primary key (user_id, sequence_id) ) FOR VALUES FROM ( '1501' ) TO ( '2001' ); DB Inserts insert into dbz_repo (user_id, sequence_id, longitude, latitude, created_by_id, client_ts, server_ts, type ) select random() * 1999 + 1, random() * 1540277658018 + 1, random() * 180, random() * 180, random() * 1999 + 1, now(), now(), 'abcdefg' from generate_series(1, 2000000); Debezium config { "name" : "debezium_repo" , "config" : { "connector.class" : "io.debezium.connector.postgresql.PostgresConnector" , "plugin.name" : "wal2json_rds_streaming" , "database.hostname" : "<HOST>.rds.amazonaws.com" , "database.port" : "5432" , "database.user" : "debezium" , "database.password" : "<PASSWORD>" , "database.dbname" : "nextgeo" , "database.server.name" : "debezium, "decimal.handling.mode" : "string" , "hstore.handling.mode" : "json" , "tombstones.on. delete " : false , "table.whitelist" : ".*dbz.*" , "slot.name" : "debezium_repo" } } Python comparison script import psycopg2 from kafka import KafkaConsumer psql_host = "<HOST>.rds.amazonaws.com" psql_user = "debezium" psql_pass = "<PASSWORD>" psql_db = "debezium" db = psycopg2.connect(host=psql_host, database=psql_db, user=psql_user, password=psql_pass) cur = db.cursor() tables = [] cur.execute("""SELECT table_name FROM information_schema.tables WHERE table_type= 'BASE TABLE' AND table_schema= 'public' AND table_name LIKE '%dbz%' ORDER BY table_name;""") for row in cur.fetchall(): tables.append(row[0]) print ( "table | rows_in_db | records_in_topic | diff" ) for table in tables: cur.execute( "select count(*) from " + table) rows = cur.fetchone()[0] consumer = KafkaConsumer( "debezium.public." + table, bootstrap_servers=[ "kafka:9092" ], enable_auto_commit= False , consumer_timeout_ms=10000, auto_offset_reset= "earliest" , group_id= None ) count = 0 for _ in consumer: count += 1 diff = rows - count print (table + " | " + str (rows) + " | " + str (count) + " | " + str (diff)) db.close()

      Overview

      During a snapshot phase, when reading from a partitioned postgres table, rows from the parent partition table are written to a topic for a derived table.

      As described further in the reproduction notes below, let's create a parent partition named `dbz_repo` and two partition tables named `dbz_repo_1_500` and `dbz_repo_501_1000`. Next we insert random records into the partitioned tables and kick off a debezium snapshot phase.

      After the snapshot completes, the `dbz_repo` topic contains 0 records, while the `dbz_repo_1_500` contains N records from the `dbz_repo_1_500` table PLUS M records from the `dbz_repo` table (i.e. an additionally copy of `dbz_repo_1_500` and everything from `dbz_repo_501_1000`). The `dbz_repo_501_1000` is the only correct topic, containing only a single copy of all rows of the source table.

      Logs

      During snapshotting there are errors related to flush failures, but otherwise look clean:

      [2019-01-31 18:40:59,085] ERROR WorkerSourceTask

      {id=debezium_repo-0} Failed to flush, timed out while waiting for producer to flush outstanding 29175 messages (org.apache.kafka.connect.runtime.WorkerSourceTask)
      [2019-01-31 18:40:59,091] ERROR WorkerSourceTask{id=debezium_repo-0}

      Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter)

      Comparison output sample

      Attached below is a small python script to compare the number of records in the database tables to the number of records in each topic. Here's sample output for a run:

      table rows_in_db records_in_topic diff
      dbz_repo 2000000 0 2000000
      dbz_repo_1001_1500 500077 500077 0
      dbz_repo_1_500 499596 2499596 -2000000
      dbz_repo_1501_2000 499529 499529 0
      dbz_repo_501_1000 500798 500798 0

              Unassigned Unassigned
              kppullin Kevin Pullin (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

                Created:
                Updated:
                Resolved: