Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-8141

The first cdc message always lost when using debezium engine to capture oracle data

XMLWordPrintable

    • False
    • Hide

      None

      Show
      None
    • False

      Bug report

      What Debezium connector do you use and what version?

      We're using debezium engine in our java service, with version 2.7.0.Final and 2.7.1.Final

      What is the connector configuration?

       

      connector.class = io.debezium.connector.oracle.OracleConnector
      auto.create.topics.enable = false
      bootstrap.servers = kafka:9093,kafka2:9095,kafka3:9097
      schema.history.internal.producer.security.protocol = SSL
      schema.history.internal.producer.ssl.truststore.password = ********
      lob.enabled = true
      offset.storage.topic = org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7.reader.offset.storage
      interval.handling.mode = string
      errors.retry.delay.initial.ms = 300
      ssl.truststore.password = ********
      schema.history.internal.consumer.ssl.keystore.password = ********
      key.converter = org.apache.kafka.connect.json.JsonConverter
      ssl.endpoint.identification.algorithm = 
      database.dbname = ORCLCDB
      database.user = c##dbzuser
      schema.history.internal.consumer.ssl.keystore.location = /etc/kafka/secrets/client.keystore.p12
      schema.history.internal.consumer.ssl.endpoint.identification.algorithm = 
      offset.storage = org.apache.kafka.connect.storage.KafkaOffsetBackingStore
      internal.log.mining.transaction.snapshot.boundary.mode = all
      schema.history.internal.kafka.bootstrap.servers = kafka:9093,kafka2:9095,kafka3:9097
      schema.history.internal.producer.ssl.endpoint.identification.algorithm = 
      ssl.truststore.location = /etc/kafka/secrets/int.truststore.p12
      ssl.keystore.password = ********
      errors.max.retries = -1
      database.password = ********
      schema.history.internal.producer.ssl.keystore.location = /etc/kafka/secrets/client.keystore.p12
      name = ORCLCDB/ORCLPDB1
      schema.history.internal.producer.bootstrap.servers = kafka:9093,kafka2:9095,kafka3:9097
      max.batch.size = 2048
      skipped.operations = none
      snapshot.mode = initial
      schema.history.internal.consumer.security.protocol = SSL
      max.queue.size = 8192
      tasks.max = 1
      retry.backoff.ms = 1000
      migration.feature.gates = 
      tombstones.on.delete = false
      topic.prefix = org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7
      offset.storage.file.filename = 
      schema.history.internal.kafka.topic = org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7.reader.history
      schema.history.internal.consumer.ssl.truststore.location = /etc/kafka/secrets/int.truststore.p12
      offset.storage.partitions = 1
      value.converter = org.apache.kafka.connect.json.JsonConverter
      schema.history.internal.producer.ssl.keystore.password = ********
      log.mining.batch.size.max = 300000
      database.pdb.name = ORCLPDB1
      topics = org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7.DEBEZIUM.ORA_MISSING_RECORD
      schema.history.internal.consumer.bootstrap.servers = kafka:9093,kafka2:9095,kafka3:9097
      schema.history.internal.consumer.ssl.truststore.password = ********
      ssl.keystore.location = /etc/kafka/secrets/client.keystore.p12
      log.mining.view.fetch.size = 100000
      offset.flush.timeout.ms = 2000
      errors.retry.delay.max.ms = 10000
      snapshot.max.threads = 1
      database.port = 1521
      offset.flush.interval.ms = 10000
      security.protocol = SSL
      driver.connection.adapter = logminer
      schema.history.internal = io.debezium.storage.kafka.history.KafkaSchemaHistory
      database.hostname = oracle
      schema.name.adjustment.mode = avro
      log.mining.batch.size.min = 10000
      log.mining.batch.size.default = 1024
      offset.storage.replication.factor = 3
      schema.history.internal.producer.ssl.truststore.location = /etc/kafka/secrets/int.truststore.p12
      driver.out.server.name = dbzxout
      table.include.list = DEBEZIUM.ORA_MISSING_RECORD 

       

      What is the captured database version and mode of deployment?

      oracle 19 using LogMiner, deployed using docker in local environment

      What behavior do you expect?

      We first insert 1000 records in oracle source table, like the pseudo code shows below:

       

      for i := 1; i <= 1000; i++ {
          Do("insert into source_table values ($i, $randomString); commit;")
      } 

      Then we launch the debezium engine in our java service, connecting to this oracle source table. Within our service code, we implemented the handleBatch interface by simply printing each message, as the code snippet shows:

       

      @Override
      public void handleBatch(
              final List<RecordChangeEvent<SourceRecord>> records,
              final RecordCommitter<RecordChangeEvent<SourceRecord>> committer)
              throws InterruptedException {
          for (RecordChangeEvent<SourceRecord> recordChangeEvent : records) {
              SourceRecord record = recordChangeEvent.record();
              log.info("received record: {}", record);
              committer.markProcessed(recordChangeEvent);
          }
          committer.markBatchFinished();
      } 

      After the debezium engine gets launched, we immediately begin to insert new rows into source table until row number reaches 20000 while the engine is running, as the pesudo code shows:

       

       

      for i := 1001; i <= 20000; i++ { 
          Do("insert into source_table values ($i, $randomString); commit;") 
      }  

      So together, the workflow happens in this order:

       

       

      // insert 1000 initial rows
      for i := 1; i <= 1000; i++ { 
          Do("insert into source_table values ($i, $randomString); commit;") 
      } 
      // launch debezium engine in java service
      launchDbzEngine();
      // immediately continue to insert more rows until it reaches 20000
      for i := 1001; i <= 20000; i++ { 
          Do("insert into source_table values ($i, $randomString); commit;") 
      } 

      What we expect to see is, java service will log 20000 data records, with primary key ranging from 1 to 20000.

       

      What behavior do you see?

      From our java code log, we observe that we can only receive 19999 records, with 1 record missing, and the missing record is always the first cdc message after snapshot stage completes. 

      In our code, we detect the last snapshot message by checking the "snapshot_completed" attribute in dbz messages, once the last snapshot message detected, we specially print it, then we will also specially log the next record received from debezium, I.g. the first cdc message, and the following are 2 logs we get:

       

      // last snapshot message, together there're 3036 snapshot messages
      
      last snapshot message: SourceRecord{sourcePartition={server=org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7}, sourceOffset={snapshot_scn=5908027, snapshot=true, scn=5908027, snapshot_completed=true}} ConnectRecord{topic='org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7.DEBEZIUM.ORA_MISSING_RECORD', kafkaPartition=null, key=Struct{COL1=1939}, keySchema=Schema{org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e_5897_11ef_9c11_966d242088e7.DEBEZIUM.ORA_MISSING_RECORD.Key:STRUCT}, value=Struct{after=Struct{COL1=1939,COL2=jCIGtqzZZZyKCcseiVyH},source=Struct{version=2.7.0.Final,connector=oracle,name=org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7,ts_ms=1723459331000,snapshot=last,db=ORCLPDB1,ts_us=1723459331000000,ts_ns=1723459331000000000,schema=DEBEZIUM,table=ORA_MISSING_RECORD,scn=5908027,ssn=0},op=r,ts_ms=1723459339273,ts_us=1723459339273460,ts_ns=1723459339273460000}, valueSchema=Schema{org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e_5897_11ef_9c11_966d242088e7.DEBEZIUM.ORA_MISSING_RECORD.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
      
      
      // next message: the first cdc message, hopefully col1 should be 3037, but it's actually 3038, with 3037 missing
      
      first cdc message SourceRecord{sourcePartition={server=org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7}, sourceOffset={commit_scn=5908031:1:04000400340e0000, snapshot_scn=5908027, scn=5908030}} ConnectRecord{topic='org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7.DEBEZIUM.ORA_MISSING_RECORD', kafkaPartition=null, key=Struct{COL1=3038}, keySchema=Schema{org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e_5897_11ef_9c11_966d242088e7.DEBEZIUM.ORA_MISSING_RECORD.Key:STRUCT}, value=Struct{after=Struct{COL1=3038,COL2=yteibTEdFGUGoIYTbYVV},source=Struct{version=2.7.0.Final,connector=oracle,name=org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7,ts_ms=1723459332000,db=ORCLPDB1,ts_us=1723459332000000,ts_ns=1723459332000000000,schema=DEBEZIUM,table=ORA_MISSING_RECORD,txId=04000400340e0000,scn=5908030,commit_scn=5908031,rs_id=0x000065.00058549.0010,ssn=0,redo_thread=1,user_name=DEBEZIUM,row_id=AAASLFAAdAAAAI0ACC},op=c,ts_ms=1723459357444,ts_us=1723459357444213,ts_ns=1723459357444213000}

      In snapshot stage, we receive 3036 data rows, with largest col1=3036. However, the first cdc message we receive is 3038, with 3037 missing (at the end, we also check all the messages again, 3037 just doesn't appear).

       

       

      However, if we make a minor adjustment to the workflow by waiting for 10 seconds after launching the debezium engine and before inserting cdc rows like this:

      // insert 1000 initial rows
      for i := 1; i <= 1000; i++ { 
      Do("insert into source_table values ($i, $randomString); commit;") 
      } 
      // launch debezium engine in java service
      launchDbzEngine();
      
      // wait for 10 seconds before inserting new rows
      time.Sleep(10s);
      
      // continue to insert more rows until it reaches 20000
      for i := 1001; i <= 20000; i++ { Do("insert into source_table values ($i, $randomString); commit;") }  

      This time, there won't be any record missing, we can receive 20000 records.

      Do you see the same behaviour using the latest released Debezium version?

      Yes, the newest stable version is 2.7.1.Final, exactly is what we're using

      Do you have the connector logs, ideally from start till finish?

      Yes, I have all the logs printed by pakcage "io.deb.*", as pasted below:

       

      2024-08-12 10:42:10,426 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) Starting OracleConnectorTask with configuration:
      2024-08-12 10:42:10,437 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    connector.class = io.debezium.connector.oracle.OracleConnector
      2024-08-12 10:42:10,439 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    auto.create.topics.enable = false
      2024-08-12 10:42:10,443 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    bootstrap.servers = kafka:9093,kafka2:9095,kafka3:9097
      2024-08-12 10:42:10,444 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    schema.history.internal.producer.security.protocol = SSL
      2024-08-12 10:42:10,444 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    schema.history.internal.producer.ssl.truststore.password = ********
      2024-08-12 10:42:10,444 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    lob.enabled = true
      2024-08-12 10:42:10,447 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    offset.storage.topic = org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7.reader.offset.storage
      2024-08-12 10:42:10,447 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    interval.handling.mode = string
      2024-08-12 10:42:10,447 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    errors.retry.delay.initial.ms = 300
      2024-08-12 10:42:10,447 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    ssl.truststore.password = ********
      2024-08-12 10:42:10,447 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    schema.history.internal.consumer.ssl.keystore.password = ********
      2024-08-12 10:42:10,447 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    key.converter = org.apache.kafka.connect.json.JsonConverter
      2024-08-12 10:42:10,448 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    ssl.endpoint.identification.algorithm = 
      2024-08-12 10:42:10,455 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    database.dbname = ORCLCDB
      2024-08-12 10:42:10,455 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    database.user = c##dbzuser
      2024-08-12 10:42:10,455 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    schema.history.internal.consumer.ssl.keystore.location = /etc/kafka/secrets/client.keystore.p12
      2024-08-12 10:42:10,455 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    schema.history.internal.consumer.ssl.endpoint.identification.algorithm = 
      2024-08-12 10:42:10,455 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    offset.storage = org.apache.kafka.connect.storage.KafkaOffsetBackingStore
      2024-08-12 10:42:10,456 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    internal.log.mining.transaction.snapshot.boundary.mode = all
      2024-08-12 10:42:10,456 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    schema.history.internal.kafka.bootstrap.servers = kafka:9093,kafka2:9095,kafka3:9097
      2024-08-12 10:42:10,456 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    schema.history.internal.producer.ssl.endpoint.identification.algorithm = 
      2024-08-12 10:42:10,457 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    ssl.truststore.location = /etc/kafka/secrets/int.truststore.p12
      2024-08-12 10:42:10,457 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    ssl.keystore.password = ********
      2024-08-12 10:42:10,457 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    errors.max.retries = -1
      2024-08-12 10:42:10,458 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    database.password = ********
      2024-08-12 10:42:10,458 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    schema.history.internal.producer.ssl.keystore.location = /etc/kafka/secrets/client.keystore.p12
      2024-08-12 10:42:10,458 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    name = ORCLCDB/ORCLPDB1
      2024-08-12 10:42:10,458 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    schema.history.internal.producer.bootstrap.servers = kafka:9093,kafka2:9095,kafka3:9097
      2024-08-12 10:42:10,458 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    max.batch.size = 2048
      2024-08-12 10:42:10,459 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    skipped.operations = none
      2024-08-12 10:42:10,459 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    snapshot.mode = initial
      2024-08-12 10:42:10,459 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    schema.history.internal.consumer.security.protocol = SSL
      2024-08-12 10:42:10,459 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    max.queue.size = 8192
      2024-08-12 10:42:10,459 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    tasks.max = 1
      2024-08-12 10:42:10,459 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    retry.backoff.ms = 1000
      2024-08-12 10:42:10,460 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    migration.feature.gates = 
      2024-08-12 10:42:10,460 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    tombstones.on.delete = false
      2024-08-12 10:42:10,460 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    topic.prefix = org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7
      2024-08-12 10:42:10,461 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    offset.storage.file.filename = 
      2024-08-12 10:42:10,461 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    schema.history.internal.kafka.topic = org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7.reader.history
      2024-08-12 10:42:10,463 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    schema.history.internal.consumer.ssl.truststore.location = /etc/kafka/secrets/int.truststore.p12
      2024-08-12 10:42:10,463 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    offset.storage.partitions = 1
      2024-08-12 10:42:10,463 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    value.converter = org.apache.kafka.connect.json.JsonConverter
      2024-08-12 10:42:10,464 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    schema.history.internal.producer.ssl.keystore.password = ********
      2024-08-12 10:42:10,464 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    log.mining.batch.size.max = 300000
      2024-08-12 10:42:10,464 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    database.pdb.name = ORCLPDB1
      2024-08-12 10:42:10,464 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    topics = org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7.DEBEZIUM.ORA_MISSING_RECORD
      2024-08-12 10:42:10,464 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    schema.history.internal.consumer.bootstrap.servers = kafka:9093,kafka2:9095,kafka3:9097
      2024-08-12 10:42:10,464 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    schema.history.internal.consumer.ssl.truststore.password = ********
      2024-08-12 10:42:10,464 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    ssl.keystore.location = /etc/kafka/secrets/client.keystore.p12
      2024-08-12 10:42:10,465 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    log.mining.view.fetch.size = 100000
      2024-08-12 10:42:10,465 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    offset.flush.timeout.ms = 2000
      2024-08-12 10:42:10,465 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    errors.retry.delay.max.ms = 10000
      2024-08-12 10:42:10,465 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    snapshot.max.threads = 1
      2024-08-12 10:42:10,465 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    database.port = 1521
      2024-08-12 10:42:10,465 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    offset.flush.interval.ms = 10000
      2024-08-12 10:42:10,465 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    security.protocol = SSL
      2024-08-12 10:42:10,465 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    driver.connection.adapter = logminer
      2024-08-12 10:42:10,465 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    schema.history.internal = io.debezium.storage.kafka.history.KafkaSchemaHistory
      2024-08-12 10:42:10,465 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    database.hostname = oracle
      2024-08-12 10:42:10,465 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    schema.name.adjustment.mode = avro
      2024-08-12 10:42:10,465 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    log.mining.batch.size.min = 10000
      2024-08-12 10:42:10,466 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    log.mining.batch.size.default = 1024
      2024-08-12 10:42:10,466 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    offset.storage.replication.factor = 3
      2024-08-12 10:42:10,466 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    schema.history.internal.producer.ssl.truststore.location = /etc/kafka/secrets/int.truststore.p12
      2024-08-12 10:42:10,466 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    driver.out.server.name = dbzxout
      2024-08-12 10:42:10,466 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    table.include.list = DEBEZIUM.ORA_MISSING_RECORD
      2024-08-12 10:42:10,476 INFO  [io.deb.con.CommonConnectorConfig] (pool-8-thread-1) Loading the custom source info struct maker plugin: io.debezium.connector.oracle.OracleSourceInfoStructMaker
      2024-08-12 10:42:10,494 INFO  [io.deb.con.CommonConnectorConfig] (pool-8-thread-1) Loading the custom topic naming strategy plugin: io.debezium.schema.SchemaTopicNamingStrategy
      2024-08-12 10:42:10,663 INFO  [io.deb.con.ora.OracleConnection] (pool-8-thread-1) Database Version: Oracle Database 19c Enterprise Edition Release 19.0.0.0.0 - Production
      2024-08-12 10:42:10,705 INFO  [io.deb.sto.kaf.his.KafkaSchemaHistory] (pool-8-thread-1) KafkaSchemaHistory Consumer config: {key.deserializer=org.apache.kafka.common.serialization.StringDeserializer, value.deserializer=org.apache.kafka.common.serialization.StringDeserializer, group.id=org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-schemahistory, ssl.keystore.location=/etc/kafka/secrets/client.keystore.p12, bootstrap.servers=kafka:9093,kafka2:9095,kafka3:9097, security.protocol=SSL, enable.auto.commit=false, ssl.truststore.location=/etc/kafka/secrets/int.truststore.p12, ssl.keystore.password=********, fetch.min.bytes=1, ssl.truststore.password=********, session.timeout.ms=10000, auto.offset.reset=earliest, ssl.endpoint.identification.algorithm=, client.id=org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-schemahistory}
      2024-08-12 10:42:10,707 INFO  [io.deb.sto.kaf.his.KafkaSchemaHistory] (pool-8-thread-1) KafkaSchemaHistory Producer config: {batch.size=32768, acks=1, ssl.keystore.location=/etc/kafka/secrets/client.keystore.p12, bootstrap.servers=kafka:9093,kafka2:9095,kafka3:9097, buffer.memory=1048576, key.serializer=org.apache.kafka.common.serialization.StringSerializer, security.protocol=SSL, retries=1, ssl.truststore.location=/etc/kafka/secrets/int.truststore.p12, value.serializer=org.apache.kafka.common.serialization.StringSerializer, ssl.keystore.password=********, max.block.ms=10000, ssl.truststore.password=********, ssl.endpoint.identification.algorithm=, client.id=org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-schemahistory, linger.ms=0}
      2024-08-12 10:42:10,708 INFO  [io.deb.uti.Threads] (pool-8-thread-1) Requested thread factory for component OracleConnector, id = org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7 named = db-history-config-check
      2024-08-12 10:42:10,804 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) No previous offsets found
      2024-08-12 10:42:10,898 INFO  [io.deb.con.ora.OracleConnection] (pool-8-thread-1) Database Version: Oracle Database 19c Enterprise Edition Release 19.0.0.0.0 - Production
      2024-08-12 10:42:10,951 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) Connector started for the first time.
      2024-08-12 10:42:11,416 INFO  [io.deb.sto.kaf.his.KafkaSchemaHistory] (pool-8-thread-1) Database schema history topic '(name=org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7.reader.history, numPartitions=1, replicationFactor=default, replicasAssignments=null, configs={cleanup.policy=delete, retention.ms=9223372036854775807, retention.bytes=-1})' created
      2024-08-12 10:42:11,445 INFO  [io.deb.con.ora.OracleConnectorTask] (pool-8-thread-1) No previous offset found
      2024-08-12 10:42:11,455 INFO  [io.deb.uti.Threads] (pool-8-thread-1) Requested thread factory for component OracleConnector, id = org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7 named = SignalProcessor
      2024-08-12 10:42:11,460 INFO  [io.deb.uti.Threads] (pool-8-thread-1) Requested thread factory for component OracleConnector, id = org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7 named = change-event-source-coordinator
      2024-08-12 10:42:11,460 INFO  [io.deb.uti.Threads] (pool-8-thread-1) Requested thread factory for component OracleConnector, id = org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7 named = blocking-snapshot
      2024-08-12 10:42:11,461 INFO  [io.deb.uti.Threads] (pool-8-thread-1) Creating thread debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator
      2024-08-12 10:42:11,478 INFO  [io.deb.pip.ChangeEventSourceCoordinator] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Metrics registered
      2024-08-12 10:42:11,486 INFO  [io.deb.pip.ChangeEventSourceCoordinator] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Context created
      2024-08-12 10:42:11,487 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) According to the connector configuration both schema and data will be snapshot.
      2024-08-12 10:42:11,520 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Snapshot step 1 - Preparing
      2024-08-12 10:42:11,521 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Snapshot step 2 - Determining captured tables
      2024-08-12 10:42:12,234 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Adding table ORCLPDB1.DEBEZIUM.EMPLOYEE to the list of capture schema tables
      2024-08-12 10:42:12,236 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Adding table ORCLPDB1.DEBEZIUM.MLYTEST2 to the list of capture schema tables
      2024-08-12 10:42:12,238 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Adding table ORCLPDB1.C##DBZUSER.MLYTEST to the list of capture schema tables
      2024-08-12 10:42:12,238 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Adding table ORCLPDB1.DEBEZIUM.ORA_MISSING_RECORD to the list of capture schema tables
      2024-08-12 10:42:12,246 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Created connection pool with 1 threads
      2024-08-12 10:42:12,247 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Snapshot step 3 - Locking captured tables [ORCLPDB1.DEBEZIUM.ORA_MISSING_RECORD]
      2024-08-12 10:42:12,251 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Snapshot step 4 - Determining snapshot offset
      2024-08-12 10:42:12,507 INFO  [io.deb.con.ora.log.LogMinerAdapter] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) 	Consulting V$TRANSACTION and transaction logs for resolving snapshot offset.
      2024-08-12 10:42:14,396 INFO  [io.deb.con.ora.log.LogMinerAdapter] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) 	Querying transaction logs, please wait...
      2024-08-12 10:42:17,458 INFO  [io.deb.con.ora.log.LogMinerAdapter] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) 	Found no in-progress transactions.
      2024-08-12 10:42:17,471 INFO  [io.deb.jdb.JdbcConnection] (pool-20-thread-1) Connection gracefully closed
      2024-08-12 10:42:17,473 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Snapshot step 5 - Reading structure of captured tables
      2024-08-12 10:42:17,474 INFO  [io.deb.con.ora.OracleSnapshotChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) All eligible tables schema should be captured, capturing: [ORCLPDB1.C##DBZUSER.MLYTEST, ORCLPDB1.DEBEZIUM.EMPLOYEE, ORCLPDB1.DEBEZIUM.MLYTEST2, ORCLPDB1.DEBEZIUM.ORA_MISSING_RECORD]
      2024-08-12 10:42:17,563 INFO  [io.deb.con.ora.OracleConnection] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) 	Registering 'ORCLPDB1.C##DBZUSER.LOG_MINING_FLUSH' attributes: object_id=74252, data_object_id=74252
      2024-08-12 10:42:17,566 INFO  [io.deb.con.ora.OracleConnection] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) 	Registering 'ORCLPDB1.C##DBZUSER.MLYTEST' attributes: object_id=74266, data_object_id=74266
      2024-08-12 10:42:17,681 INFO  [io.deb.con.ora.OracleConnection] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) 	Registering 'ORCLPDB1.DEBEZIUM.EMPLOYEE' attributes: object_id=74318, data_object_id=74318
      2024-08-12 10:42:17,682 INFO  [io.deb.con.ora.OracleConnection] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) 	Registering 'ORCLPDB1.DEBEZIUM.MLYTEST2' attributes: object_id=74276, data_object_id=74276
      2024-08-12 10:42:17,683 INFO  [io.deb.con.ora.OracleConnection] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) 	Registering 'ORCLPDB1.DEBEZIUM.ORA_MISSING_RECORD' attributes: object_id=74437, data_object_id=74437
      2024-08-12 10:42:17,704 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Snapshot step 6 - Persisting schema history
      2024-08-12 10:42:17,704 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Capturing structure of table ORCLPDB1.C##DBZUSER.MLYTEST
      2024-08-12 10:42:18,495 INFO  [io.deb.rel.his.SchemaHistoryMetrics] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Already applied 1 database changes
      2024-08-12 10:42:18,517 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Capturing structure of table ORCLPDB1.DEBEZIUM.EMPLOYEE
      2024-08-12 10:42:18,659 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Capturing structure of table ORCLPDB1.DEBEZIUM.MLYTEST2
      2024-08-12 10:42:18,738 INFO  [io.deb.rel.his.SchemaHistoryMetrics] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Already applied 3 database changes
      2024-08-12 10:42:18,739 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Capturing structure of table ORCLPDB1.DEBEZIUM.ORA_MISSING_RECORD
      2024-08-12 10:42:18,961 WARN  [io.deb.sch.SchemaNameAdjuster] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) The Kafka Connect schema name 'org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7.DEBEZIUM.ORA_MISSING_RECORD.Value' is not a valid Avro schema name, so replacing with 'org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e_5897_11ef_9c11_966d242088e7.DEBEZIUM.ORA_MISSING_RECORD.Value'
      2024-08-12 10:42:18,962 WARN  [io.deb.sch.SchemaNameAdjuster] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) The Kafka Connect schema name 'org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7.DEBEZIUM.ORA_MISSING_RECORD.Key' is not a valid Avro schema name, so replacing with 'org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e_5897_11ef_9c11_966d242088e7.DEBEZIUM.ORA_MISSING_RECORD.Key'
      2024-08-12 10:42:18,968 WARN  [io.deb.sch.SchemaNameAdjuster] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) The Kafka Connect schema name 'org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7.DEBEZIUM.ORA_MISSING_RECORD.Envelope' is not a valid Avro schema name, so replacing with 'org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e_5897_11ef_9c11_966d242088e7.DEBEZIUM.ORA_MISSING_RECORD.Envelope'
      2024-08-12 10:42:18,981 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Snapshot step 7 - Snapshotting data
      2024-08-12 10:42:18,982 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Creating snapshot worker pool with 1 worker thread(s)
      2024-08-12 10:42:18,984 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) For table 'ORCLPDB1.DEBEZIUM.ORA_MISSING_RECORD' using select statement: 'SELECT "COL1", "COL2" FROM "DEBEZIUM"."ORA_MISSING_RECORD" AS OF SCN 5908027'
      2024-08-12 10:42:18,989 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (pool-21-thread-1) Exporting data from table 'ORCLPDB1.DEBEZIUM.ORA_MISSING_RECORD' (1 of 1 tables)
      2024-08-12 10:42:18,996 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) 4 records sent during previous 00:00:08.68, last recorded offset of {server=org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7} partition is {snapshot_scn=5908027, snapshot=true, scn=5908027, snapshot_completed=false}
      2024-08-12 10:42:19,273 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (pool-21-thread-1) 	 Finished exporting 3036 records for table 'ORCLPDB1.DEBEZIUM.ORA_MISSING_RECORD' (1 of 1 tables); total duration '00:00:00.284'
      2024-08-12 10:42:19,281 INFO  [io.deb.pip.sou.AbstractSnapshotChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Snapshot - Final stage
      2024-08-12 10:42:19,288 INFO  [io.deb.pip.sou.AbstractSnapshotChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Snapshot completed
      2024-08-12 10:42:19,290 INFO  [io.deb.pip.ChangeEventSourceCoordinator] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Snapshot ended with SnapshotResult [status=COMPLETED, offset=OracleOffsetContext [scn=5908027, commit_scn=[], lcr_position=null]]
      2024-08-12 10:42:19,298 INFO  [io.deb.pip.ChangeEventSourceCoordinator] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Connected metrics set to 'true'
      2024-08-12 10:42:19,327 INFO  [io.deb.pip.sig.SignalProcessor] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) SignalProcessor started. Scheduling it every 5000ms
      2024-08-12 10:42:19,337 INFO  [io.deb.uti.Threads] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Creating thread debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-SignalProcessor
      2024-08-12 10:42:19,355 INFO  [io.deb.pip.ChangeEventSourceCoordinator] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Starting streaming
      2024-08-12 10:42:19,547 INFO  [io.deb.con.ora.log.LogMinerStreamingChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Redo Log Group Sizes:
      2024-08-12 10:42:19,555 INFO  [io.deb.con.ora.log.LogMinerStreamingChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) 	Group #4: 8589934592 bytes
      2024-08-12 10:42:19,556 INFO  [io.deb.con.ora.log.LogMinerStreamingChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) 	Group #5: 8589934592 bytes
      2024-08-12 10:42:19,556 INFO  [io.deb.con.ora.log.LogMinerStreamingChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) 	Group #6: 8589934592 bytes
      2024-08-12 10:42:19,556 INFO  [io.deb.con.ora.log.LogMinerStreamingChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) 	Group #7: 8589934592 bytes
      2024-08-12 10:42:37,818 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) 3186 records sent during previous 00:00:18.823, last recorded offset of {server=org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7} partition is {commit_scn=5909049:1:0900130065070000, snapshot_scn=5908027, scn=5909048}
      2024-08-12 10:42:45,758 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) 2200 records sent during previous 00:00:07.94, last recorded offset of {server=org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7} partition is {commit_scn=5920071:1:07000100530e0000, snapshot_scn=5908027, scn=5920070} 

       

       

      How to reproduce the issue using our tutorial deployment?

      We first observed this bug in our system using debezium engine, then I tried using tutorial deployment, and this is also very easy to reproduce, I wrote 2 auxiliary scripts to help reproduce this issue, please follow these steps:

       

      1. Ensure a local oracle docker is running. (In our case, oracle 19)

      2. Enter tutorial directory, run the following command to set up environment:

       

      # use newest version, but actually old version can also run into the same bug
      export DEBEZIUM_VERSION=3.0
      docker-compose -f docker-compose-oracle.yaml up --build 

      3. Under tutorial directory, add these 2 scripts: first one called insert.sh which is used to create table and keep inserting 10000 rows of records:

      #!/bin/bash
      
      # remember to replace with your own settings
      username=debezium
      password=transporter
      port=1521
      hostname=localhost
      dbname=ORCLPDB1
      
      # first create customers table
      create_table_sql="drop table customers;\n
      CREATE TABLE customers (\n
        id NUMBER(6) GENERATED BY DEFAULT ON NULL AS IDENTITY (START WITH 1001) NOT NULL PRIMARY KEY,\n
        first_name VARCHAR2(255) NOT NULL,\n
        last_name VARCHAR2(255) NOT NULL,\n
        email VARCHAR2(255) NOT NULL\n
      );\n
      GRANT SELECT ON customers to c##dbzuser;\n
      ALTER TABLE customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;\n
      exit;"
      
      sql_file="/tmp/create.sql"
      echo $create_table_sql > $sql_file
      
      # create table
      sqlplus $username/$password@$hostname:$port/$dbname @ $sql_file
      
      # insert 10000 rows, with id from 1 to 10000
      get_batch_insert () {
          batch_sql=""
          for i in {1..10000}
          do
              first_name="firstname$i"
              last_name="lastname$i"
              batch_sql+="insert into customers values ($i, '$first_name', '$last_name', '$first_name@example.com');\ncommit;\n"
          done
          batch_sql+="exit;\n"
          echo $batch_sql
      }
      
      batch_sql=$( get_batch_insert )
      printf "$batch_sql" | sqlplus $username/$password@$hostname:$port/$dbname  

      second one is called launch.sh which orchestrates the whole test workflow: first launches insert.sh to keep inserting data, then start oracle connector, finally start kafka consumer to export kafka messages to a text file for analysis:

      #!/bin/bash
      
      # set trace log level
      curl -s -X PUT -H "Content-Type:application/json" http://localhost:8083/admin/loggers/io.debezium.connector.oracle -d '{"level": "TRACE"}'
      
      # create table + insert data
      sh ./insert.sh &
      
      # sleep some time to wait for some snapshot data to be inserted, adjust this parameter according to your oracle db processing speed
      
      sleep 17
      
      # launch oracle connector
      curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-oracle-logminer.jsonsleep 5# consume customers table, exporting data into records.txt for analysis
      docker-compose -f docker-compose-oracle.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
          --bootstrap-server kafka:9092 \
          --from-beginning \
          --property print.key=true \
          --topic server1.DEBEZIUM.CUSTOMERS > /tmp/records.txt  

      Remember to change register-oracle-logminer.json file value based on your settings, here is what I'm using:

      {
        "name": "inventory-connector",
        "config": {
          "connector.class" : "io.debezium.connector.oracle.OracleConnector",
          "tasks.max" : "1",
          "topic.prefix" : "server1",
          "database.hostname" : "host.docker.internal",
          "database.port" : "1521",
          "database.user" : "c##dbzuser",
          "database.password" : "transporter",
          "database.dbname" : "ORCLCDB",
          "database.pdb.name" : "ORCLPDB1",
          "schema.history.internal.kafka.bootstrap.servers" : "kafka:9092",
          "schema.history.internal.kafka.topic": "schema-changes.inventory",
          "internal.log.mining.transaction.snapshot.boundary.mode": "all",
          "table.include.list": "DEBEZIUM.CUSTOMERS"
        }
      }

      4. Launch test case by running `sh launch.sh`

      5. Wait for insert.sh to finish, then open `/tmp/records.txt`, you will find there're 9999 lines, with 1 line missing. Find the keyword "snapshot":"last", one result will show up, and that line is the last snapshot message, while the next line is the first cdc message, judging from these 2 records' id, you can find the first cdc message is lost, below is the example result of these 2 records in my test results, with ID=3565 and ID=3667, ID 3666 is missing if you search the whole file.

      {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"ID"}],"optional":false,"name":"server1.DEBEZIUM.CUSTOMERS.Key"},"payload":{"ID":3565}}    {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"ID"},{"type":"string","optional":false,"field":"FIRST_NAME"},{"type":"string","optional":false,"field":"LAST_NAME"},{"type":"string","optional":false,"field":"EMAIL"}],"optional":true,"name":"server1.DEBEZIUM.CUSTOMERS.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"ID"},{"type":"string","optional":false,"field":"FIRST_NAME"},{"type":"string","optional":false,"field":"LAST_NAME"},{"type":"string","optional":false,"field":"EMAIL"}],"optional":true,"name":"server1.DEBEZIUM.CUSTOMERS.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,first,first_in_data_collection,last_in_data_collection,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"string","optional":true,"field":"txId"},{"type":"string","optional":true,"field":"scn"},{"type":"string","optional":true,"field":"commit_scn"},{"type":"string","optional":true,"field":"lcr_position"},{"type":"string","optional":true,"field":"rs_id"},{"type":"int64","optional":true,"field":"ssn"},{"type":"int32","optional":true,"field":"redo_thread"},{"type":"string","optional":true,"field":"user_name"},{"type":"string","optional":true,"field":"redo_sql"},{"type":"string","optional":true,"field":"row_id"}],"optional":false,"name":"io.debezium.connector.oracle.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"server1.DEBEZIUM.CUSTOMERS.Envelope","version":2},"payload":{"before":null,"after":{"ID":3565,"FIRST_NAME":"firstname3565","LAST_NAME":"lastname3565","EMAIL":"firstname3565@example.com"},"source":{"version":"3.0.8.Final","connector":"oracle","name":"server1","ts_ms":1742270703000,"snapshot":"last","db":"ORCLPDB1","sequence":null,"ts_us":1742270703000000,"ts_ns":1742270703000000000,"schema":"DEBEZIUM","table":"CUSTOMERS","txId":null,"scn":"20732831","commit_scn":null,"lcr_position":null,"rs_id":null,"ssn":0,"redo_thread":null,"user_name":null,"redo_sql":null,"row_id":null},"transaction":null,"op":"r","ts_ms":1742270710874,"ts_us":1742270710874853,"ts_ns":1742270710874853721}}
      
      {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"ID"}],"optional":false,"name":"server1.DEBEZIUM.CUSTOMERS.Key"},"payload":{"ID":3667}}    {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"ID"},{"type":"string","optional":false,"field":"FIRST_NAME"},{"type":"string","optional":false,"field":"LAST_NAME"},{"type":"string","optional":false,"field":"EMAIL"}],"optional":true,"name":"server1.DEBEZIUM.CUSTOMERS.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"ID"},{"type":"string","optional":false,"field":"FIRST_NAME"},{"type":"string","optional":false,"field":"LAST_NAME"},{"type":"string","optional":false,"field":"EMAIL"}],"optional":true,"name":"server1.DEBEZIUM.CUSTOMERS.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,first,first_in_data_collection,last_in_data_collection,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"string","optional":true,"field":"txId"},{"type":"string","optional":true,"field":"scn"},{"type":"string","optional":true,"field":"commit_scn"},{"type":"string","optional":true,"field":"lcr_position"},{"type":"string","optional":true,"field":"rs_id"},{"type":"int64","optional":true,"field":"ssn"},{"type":"int32","optional":true,"field":"redo_thread"},{"type":"string","optional":true,"field":"user_name"},{"type":"string","optional":true,"field":"redo_sql"},{"type":"string","optional":true,"field":"row_id"}],"optional":false,"name":"io.debezium.connector.oracle.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"server1.DEBEZIUM.CUSTOMERS.Envelope","version":2},"payload":{"before":null,"after":{"ID":3667,"FIRST_NAME":"firstname3667","LAST_NAME":"lastname3667","EMAIL":"firstname3667@example.com"},"source":{"version":"3.0.8.Final","connector":"oracle","name":"server1","ts_ms":1742270704000,"snapshot":"false","db":"ORCLPDB1","sequence":null,"ts_us":1742270704000000,"ts_ns":1742270704000000000,"schema":"DEBEZIUM","table":"CUSTOMERS","txId":"07001600cd220000","scn":"20732834","commit_scn":"20732835","lcr_position":null,"rs_id":"0x000025.000a56d1.0010","ssn":0,"redo_thread":1,"user_name":"DEBEZIUM","redo_sql":null,"row_id":"AAAXYcAAYAAAG65ABl"},"transaction":null,"op":"c","ts_ms":1742270711705,"ts_us":1742270711705119,"ts_ns":1742270711705119345}}
        

       

      Feature request or enhancement

      For feature requests or enhancements, provide this information, please:

      Which use case/requirement will be addressed by the proposed feature?

      The scenario in which we ran into this bug is a quite common use case: when executing data migration using debezium engine on some tables with high RPS, this error is likely to occur, causing the final data missing

      Implementation ideas (optional)

      I'm not sure what caused this bug, but seems related to some locking mechanism flaw? Could this be solved by applying a more strict lock during snapshot stage?

              ccranfor@redhat.com Chris Cranford
              lingyang.ma@enterprisedb.com Lingyang Ma (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

                Created:
                Updated:
                Resolved: