Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-9006

Events inserted during snapshot are being duplicated

XMLWordPrintable

      Zulipchat discussion: https://debezium.zulipchat.com/#narrow/channel/302529-community-general/topic/Duplicate.20events.20after.20snapshot/with/515681218 .
      If you are having troubles running the simulation please ping me in the zulipchat, i will try to response asap.

      Bug report

      Currently I am snapshotting a db with only a single table. Lets say the table is relatively big (so it takes some time for the snapshot to finish). While the snapshot is running, I start inserting records into my table from another service .
      Then the snapshot finishes but what I can see is that some events are being duplicated with the only difference in the 'of' field, some are with 'c' other with 'r'. So these records are snapshot both from the snapshot and read from the binlog.

      I have created a simple simulation project where i hit the problem everytime. Read the readme.md for instrucitons

      What Debezium connector do you use and what version?

      3.0.6.Final and also latest 3.1.1.Final

      What is the connector configuration?

      Everything is in the project 

      // props.setProperty("name", applicationName);
      props.setProperty("bootstrap.servers", bootstrapServers);
      props.setProperty("offset.storage", "org.apache.kafka.connect.storage.KafkaOffsetBackingStore");
      props.setProperty("offset.storage.topic", systemTopicPrefix + ".offset_storage");
      props.setProperty("offset.storage.partitions", "1");
      props.setProperty("offset.storage.replication.factor", "1"); //Changed to 1 for the simulation. In our env is set to 2
      props.setProperty("offset.flush.interval.ms", "2000"); // 20 seconds CHANGED to 2 sec  for simulation
      props.setProperty("offset.flush.timeout.ms", "10000"); // 10 seconds
      props.setProperty("topic.prefix", topicPrefix);
      props.setProperty("database.server.id", generateServerId(applicationName));
      props.setProperty("schema.history.internal.kafka.bootstrap.servers", bootstrapServers);
      props.setProperty("schema.history.internal.kafka.topic", systemTopicPrefix + ".db_history");
      props.setProperty("signal.enabled.channels", "kafka");
      props.setProperty("signal.kafka.topic", systemTopicPrefix + ".debezium_signals");
      props.setProperty("signal.kafka.bootstrap.servers", bootstrapServers);
      
      props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
      props.setProperty("database.hostname", debeziumSourceDatabaseConfig.getHostname());
      props.setProperty("database.port", debeziumSourceDatabaseConfig.getPort());
      props.setProperty("database.user", debeziumSourceDatabaseConfig.getUsername());
      props.setProperty("database.password", debeziumSourceDatabaseConfig.getPassword());
      props.setProperty("database.server.name", applicationName.replace('.', '_'));
      props.setProperty("database.history.skip.unparseable.ddl", String.valueOf(true));
      props.setProperty("database.history.producer.max.request.size", String.valueOf(MAX_HISTORY_MESSAGE_SIZE));
      props.setProperty("database.history.producer.buffer.memory", String.valueOf(MAX_HISTORY_MESSAGE_SIZE));
      props.setProperty("database.history.consumer.max.partition.fetch.bytes", String.valueOf(MAX_HISTORY_MESSAGE_SIZE));
      props.setProperty("database.history.store.only.captured.tables.ddl", String.valueOf(false));
      props.setProperty("database.include.list", debeziumSourceDatabaseConfig.getSchemaWhitelist());
      props.setProperty("database.connectionTimeZone", "Europe/Sofia");
      props.setProperty("snapshot.mode", debeziumSourceDatabaseConfig.getSnapshotMode());
      props.setProperty("include.schema.changes", "true");
      props.setProperty("connect.keep.alive", "true");
      props.setProperty("connections.max.idle.ms", "540000"); 

      What is the captured database version and mode of deployment?

      on prem mariadb database, also tested on aws rds mariadb

      What behavior do you expect?

      I expect that if records are inserted during snapshot they must be captured either by the snapshot process or read from the binlog, not from both.

      What behavior do you see?

      If events are inserted during snapshot in the final result i can see that some event comes twice. Once as 'r' event and once as 'c' event.

      Do you see the same behaviour using the latest released Debezium version?

      yes

      Do you have the connector logs, ideally from start till finish?

      Please run the attached project

      How to reproduce the issue using our tutorial deployment?

      read the README.md to reproduce it

      debezium-engine-simualtion2.zip

              rh-ee-gpanice Giovanni Panice (Inactive)
              amanis Dimitar Hristov (Inactive)
              Chris Cranford, Jiri Pechanec
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

                Created:
                Updated:
                Resolved: