Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-5955

A few triggers for incremental snapshots recognized as a one, in case when we have sequence of events in signal table with small time lag

    XMLWordPrintable

Details

    • Bug
    • Resolution: Not a Bug
    • Major
    • None
    • 1.9.5.Final
    • oracle-connector
    • None
    • False
    • None
    • False
    • Moderate

    Description

      Bug report

      If we add a couple of actions to start incremental snapshot into signal table with lag of 1-2 seconds:

       INSERT INTO db.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["DBINSTANCE.DB.ADDRESS"],"type":"incremental"}');
      INSERT INTO db.debezium_signal (id, type, data) VALUES('ad-hoc-2', 'execute-snapshot', '{"data-collections": ["DBINSTANCE.DB.ADDRESS"],"type":"incremental"}');
      INSERT INTO db.debezium_signal (id, type, data) VALUES('ad-hoc-3', 'execute-snapshot', '{"data-collections": ["DBINSTANCE.DB.ADDRESS"],"type":"incremental"}');
      

       

      In logs we see such situation:

      2022-12-20 07:49:39,174 INFO   Oracle|db|streaming  Requested 'INCREMENTAL' snapshot of data collections '[DBINSTANCE.DB.ADDRESS, DBINSTANCE.DB.ADDRESS,DBINSTANCE.DB.ADDRESS]'   [io.debezium.pipeline.signal.ExecuteSnapshot]
      incremental_snapshot_collections=DBINSTANCE.DB.ADDRESS,DBINSTANCE.DB.ADDRESS,DBINSTANCE.DB.ADDRESS.
      

       

      That looks like, debezium recognized a few actions in the signal table as a one action.

      What Debezium connector do you use and what version?

      oracle-connector, 1.9.5-final

      What is the connector configuration?

      {
      "name": "oracle-customers-connector",
      "config": {
      "connector.class": "io.debezium.connector.oracle.OracleConnector",
      "tasks.max": "1",
      "database.server.name": "${database.server.name}",
      "database.url": "${database.url}",
      "database.history.kafka.bootstrap.servers": "${bootstrap.servers}",
      "database.history.kafka.topic": "${history.topic}",
      "database.dbname": "${database.name}",
      "database.user": "${database.user}",
      "database.password": "${database.password}",
      "table.include.list": "DB.DEVICE,DB.CUSTOMER,DB.ADDRESS,DB.ECHANNEL,DB.PROFILE_PERSON,DB.KOM,DB.NMON,DB.INCOME,DB.LINKS,DB.DEBEZIUM_SIGNAL",
      "signal.data.collection": "DBINSTANCE.DB.DEBEZIUM_SIGNAL",
      "incremental.snapshot.chunk.size": "49152",
      "snapshot.mode": "initial",
      "snapshot.select.statement.overrides.DB.LINKS": "SELECT a.* from DB.LINKS a where a.is_active = 1",
      "snapshot.select.statement.overrides.DB.ADDRESS": "select a.* from DB.ADDRESS a where a.is_active = 1",
      "snapshot.select.statement.overrides.DB.ECHANNEL": "select a.* from DB.ECHANNEL a where a.is_active = 1",
      "snapshot.select.statement.overrides.DB.PROFILE_PERSON": "select a.* from DB.PROFILE_PERSON a",
      "snapshot.select.statement.overrides.DB.KOM": "select a.* from DB.KOM a",
      "snapshot.select.statement.overrides.DB.NMON": "select a.* from DB.NMON a",
      "snapshot.select.statement.overrides.DB.INCOME": "select a.* from DB.INCOME a where a.is_active = 1",
      "snapshot.select.statement.overrides.DB.CUSTOMER": "select a.* from DB.CUSTOMER a",
      "snapshot.select.statement.overrides.DB.DEVICE": "select a.* from DB.DEVICE a where a.is_active = 1",
      "decimal.handling.mode": "double",
      "security.protocol": "SSL",
      "ssl.truststore.location": "/****{}{}/{}{}****/kafka_truststore.jks",
      "ssl.truststore.password": "${truststore.password}",
      "ssl.keystore.location": "/****{}{}/{}{}****/client_keystore.jks",
      "ssl.keystore.password": "${keystore.password}",
      "ssl.key.password": "${ssl.key.password}",
      "database.history.producer.security.protocol": "SSL",
      "database.history.producer.ssl.truststore.location": "/****{}{}/{}{}****/kafka_truststore.jks",
      "database.history.producer.ssl.truststore.password": "${ruststore.password}",
      "database.history.producer.ssl.keystore.location": "/****{}{}/{}{}****/client_keystore.jks",
      "database.history.producer.ssl.keystore.password": "${ssl.keystore.password}",
      "database.history.producer.ssl.key.password": "${ssl.key.password}",
      "database.history.consumer.security.protocol": "SSL",
      "database.history.consumer.ssl.truststore.location": "/****{}{}/{}{}****/kafka_truststore.jks",
      "database.history.consumer.ssl.truststore.password": "${ssl.truststore.password}",
      "database.history.consumer.ssl.keystore.location": "/****{}{}/{}{}****/client_keystore.jks",
      "database.history.consumer.ssl.keystore.password": "${ssl.keystore.password}",
      "database.history.consumer.ssl.key.password": "${ssl.key.password}",
      "key.converter": "io.confluent.connect.avro.AvroConverter",
      "key.converter.schema.registry.url": "${schema.registry.url}",
      "value.converter": "io.confluent.connect.avro.AvroConverter",
      "value.converter.schema.registry.url": "${schema.registry.url}",
      "basic.auth.credentials.source": "USER_INFO",
      "basic.auth.user.info": "${schema.registry.user}:${schema.registry.password}",
      "key.converter.schema.registry.ssl.truststore.location": "/****{}{}/{}{}****/kafka_truststore.jks",
      "key.converter.schema.registry.ssl.truststore.password": "${ssl.truststore.password}",
      "key.converter.schema.registry.ssl.keystore.location": "/****{}{}/{}{}****/client_keystore.jks",
      "key.converter.schema.registry.ssl.keystore.password": "${ssl.keystore.password}",
      "key.converter.schema.registry.ssl.key.password": "${ssl.key.password}",
      "value.converter.schema.registry.ssl.truststore.location": "/****{}{}/{}{}****/kafka_truststore.jks",
      "value.converter.schema.registry.ssl.truststore.password": "${ssl.truststore.password}",
      "value.converter.schema.registry.ssl.keystore.location": "/****{}{}/{}{}****/client_keystore.jks",
      "value.converter.schema.registry.ssl.keystore.password": "${ssl.keystore.password}",
      "value.converter.schema.registry.ssl.key.password": "${ssl.key.password}"
      }
      }

      What is the captured database version and mode of depoyment?

      oracle 19c, aws ec2

      What behaviour do you expect?

      Each request to the signal table to create an incremental snapshot will be separated and launched as a separate snapshot

      What behaviour do you see?

      A few triggers for incremental snapshots recognized as one, in case when we have sequence of events in signal table with small time lag

      Do you see the same behaviour using the latest relesead Debezium version?

      It's not tested, we use only 1.9.5-final version

      Do you have the connector logs, ideally from start till finish?

       

      2022-12-20 07:49:39,174 INFO   Oracle|db|streaming  Requested 'INCREMENTAL' snapshot of data collections '[DBINSTANCE.DB.ADDRESS, DBINSTANCE.DB.ADDRESS,DBINSTANCE.DB.ADDRESS]'   [io.debezium.pipeline.signal.ExecuteSnapshot]
      incremental_snapshot_collections=DBINSTANCE.DB.ADDRESS,DBINSTANCE.DB.ADDRESS,DBINSTANCE.DB.ADDRESS.
      

       

      How to reproduce the issue using our tutorial deployment?

      Create a few of  events in signal table with small time lag

      Attachments

        Activity

          People

            Unassigned Unassigned
            dmytrii.shabotin@raiffeisen.ua Dmytrii Shabotin (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: