-
Bug
-
Resolution: Done
-
Critical
-
2.7.4.Final, 3.0.8.Final
-
None
-
False
-
-
False
Bug report
What Debezium connector do you use and what version?
We're using debezium engine in our java service, with version 2.7.0.Final and 2.7.1.Final
What is the connector configuration?
connector.class = io.debezium.connector.oracle.OracleConnector auto.create.topics.enable = false bootstrap.servers = kafka:9093,kafka2:9095,kafka3:9097 schema.history.internal.producer.security.protocol = SSL schema.history.internal.producer.ssl.truststore.password = ******** lob.enabled = true offset.storage.topic = org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7.reader.offset.storage interval.handling.mode = string errors.retry.delay.initial.ms = 300 ssl.truststore.password = ******** schema.history.internal.consumer.ssl.keystore.password = ******** key.converter = org.apache.kafka.connect.json.JsonConverter ssl.endpoint.identification.algorithm = database.dbname = ORCLCDB database.user = c##dbzuser schema.history.internal.consumer.ssl.keystore.location = /etc/kafka/secrets/client.keystore.p12 schema.history.internal.consumer.ssl.endpoint.identification.algorithm = offset.storage = org.apache.kafka.connect.storage.KafkaOffsetBackingStore internal.log.mining.transaction.snapshot.boundary.mode = all schema.history.internal.kafka.bootstrap.servers = kafka:9093,kafka2:9095,kafka3:9097 schema.history.internal.producer.ssl.endpoint.identification.algorithm = ssl.truststore.location = /etc/kafka/secrets/int.truststore.p12 ssl.keystore.password = ******** errors.max.retries = -1 database.password = ******** schema.history.internal.producer.ssl.keystore.location = /etc/kafka/secrets/client.keystore.p12 name = ORCLCDB/ORCLPDB1 schema.history.internal.producer.bootstrap.servers = kafka:9093,kafka2:9095,kafka3:9097 max.batch.size = 2048 skipped.operations = none snapshot.mode = initial schema.history.internal.consumer.security.protocol = SSL max.queue.size = 8192 tasks.max = 1 retry.backoff.ms = 1000 migration.feature.gates = tombstones.on.delete = false topic.prefix = org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7 offset.storage.file.filename = schema.history.internal.kafka.topic = org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7.reader.history schema.history.internal.consumer.ssl.truststore.location = /etc/kafka/secrets/int.truststore.p12 offset.storage.partitions = 1 value.converter = org.apache.kafka.connect.json.JsonConverter schema.history.internal.producer.ssl.keystore.password = ******** log.mining.batch.size.max = 300000 database.pdb.name = ORCLPDB1 topics = org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7.DEBEZIUM.ORA_MISSING_RECORD schema.history.internal.consumer.bootstrap.servers = kafka:9093,kafka2:9095,kafka3:9097 schema.history.internal.consumer.ssl.truststore.password = ******** ssl.keystore.location = /etc/kafka/secrets/client.keystore.p12 log.mining.view.fetch.size = 100000 offset.flush.timeout.ms = 2000 errors.retry.delay.max.ms = 10000 snapshot.max.threads = 1 database.port = 1521 offset.flush.interval.ms = 10000 security.protocol = SSL driver.connection.adapter = logminer schema.history.internal = io.debezium.storage.kafka.history.KafkaSchemaHistory database.hostname = oracle schema.name.adjustment.mode = avro log.mining.batch.size.min = 10000 log.mining.batch.size.default = 1024 offset.storage.replication.factor = 3 schema.history.internal.producer.ssl.truststore.location = /etc/kafka/secrets/int.truststore.p12 driver.out.server.name = dbzxout table.include.list = DEBEZIUM.ORA_MISSING_RECORD
What is the captured database version and mode of deployment?
oracle 19 using LogMiner, deployed using docker in local environment
What behavior do you expect?
We first insert 1000 records in oracle source table, like the pseudo code shows below:
for i := 1; i <= 1000; i++ { Do("insert into source_table values ($i, $randomString); commit;") }
Then we launch the debezium engine in our java service, connecting to this oracle source table. Within our service code, we implemented the handleBatch interface by simply printing each message, as the code snippet shows:
@Override public void handleBatch( final List<RecordChangeEvent<SourceRecord>> records, final RecordCommitter<RecordChangeEvent<SourceRecord>> committer) throws InterruptedException { for (RecordChangeEvent<SourceRecord> recordChangeEvent : records) { SourceRecord record = recordChangeEvent.record(); log.info("received record: {}", record); committer.markProcessed(recordChangeEvent); } committer.markBatchFinished(); }
After the debezium engine gets launched, we immediately begin to insert new rows into source table until row number reaches 20000 while the engine is running, as the pesudo code shows:
for i := 1001; i <= 20000; i++ { Do("insert into source_table values ($i, $randomString); commit;") }
So together, the workflow happens in this order:
// insert 1000 initial rows for i := 1; i <= 1000; i++ { Do("insert into source_table values ($i, $randomString); commit;") } // launch debezium engine in java service launchDbzEngine(); // immediately continue to insert more rows until it reaches 20000 for i := 1001; i <= 20000; i++ { Do("insert into source_table values ($i, $randomString); commit;") }
What we expect to see is, java service will log 20000 data records, with primary key ranging from 1 to 20000.
What behavior do you see?
From our java code log, we observe that we can only receive 19999 records, with 1 record missing, and the missing record is always the first cdc message after snapshot stage completes.
In our code, we detect the last snapshot message by checking the "snapshot_completed" attribute in dbz messages, once the last snapshot message detected, we specially print it, then we will also specially log the next record received from debezium, I.g. the first cdc message, and the following are 2 logs we get:
// last snapshot message, together there're 3036 snapshot messages last snapshot message: SourceRecord{sourcePartition={server=org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7}, sourceOffset={snapshot_scn=5908027, snapshot=true, scn=5908027, snapshot_completed=true}} ConnectRecord{topic='org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7.DEBEZIUM.ORA_MISSING_RECORD', kafkaPartition=null, key=Struct{COL1=1939}, keySchema=Schema{org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e_5897_11ef_9c11_966d242088e7.DEBEZIUM.ORA_MISSING_RECORD.Key:STRUCT}, value=Struct{after=Struct{COL1=1939,COL2=jCIGtqzZZZyKCcseiVyH},source=Struct{version=2.7.0.Final,connector=oracle,name=org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7,ts_ms=1723459331000,snapshot=last,db=ORCLPDB1,ts_us=1723459331000000,ts_ns=1723459331000000000,schema=DEBEZIUM,table=ORA_MISSING_RECORD,scn=5908027,ssn=0},op=r,ts_ms=1723459339273,ts_us=1723459339273460,ts_ns=1723459339273460000}, valueSchema=Schema{org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e_5897_11ef_9c11_966d242088e7.DEBEZIUM.ORA_MISSING_RECORD.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)} // next message: the first cdc message, hopefully col1 should be 3037, but it's actually 3038, with 3037 missing first cdc message SourceRecord{sourcePartition={server=org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7}, sourceOffset={commit_scn=5908031:1:04000400340e0000, snapshot_scn=5908027, scn=5908030}} ConnectRecord{topic='org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7.DEBEZIUM.ORA_MISSING_RECORD', kafkaPartition=null, key=Struct{COL1=3038}, keySchema=Schema{org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e_5897_11ef_9c11_966d242088e7.DEBEZIUM.ORA_MISSING_RECORD.Key:STRUCT}, value=Struct{after=Struct{COL1=3038,COL2=yteibTEdFGUGoIYTbYVV},source=Struct{version=2.7.0.Final,connector=oracle,name=org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7,ts_ms=1723459332000,db=ORCLPDB1,ts_us=1723459332000000,ts_ns=1723459332000000000,schema=DEBEZIUM,table=ORA_MISSING_RECORD,txId=04000400340e0000,scn=5908030,commit_scn=5908031,rs_id=0x000065.00058549.0010,ssn=0,redo_thread=1,user_name=DEBEZIUM,row_id=AAASLFAAdAAAAI0ACC},op=c,ts_ms=1723459357444,ts_us=1723459357444213,ts_ns=1723459357444213000}
In snapshot stage, we receive 3036 data rows, with largest col1=3036. However, the first cdc message we receive is 3038, with 3037 missing (at the end, we also check all the messages again, 3037 just doesn't appear).
However, if we make a minor adjustment to the workflow by waiting for 10 seconds after launching the debezium engine and before inserting cdc rows like this:
// insert 1000 initial rows for i := 1; i <= 1000; i++ { Do("insert into source_table values ($i, $randomString); commit;") } // launch debezium engine in java service launchDbzEngine(); // wait for 10 seconds before inserting new rows time.Sleep(10s); // continue to insert more rows until it reaches 20000 for i := 1001; i <= 20000; i++ { Do("insert into source_table values ($i, $randomString); commit;") }
This time, there won't be any record missing, we can receive 20000 records.
Do you see the same behaviour using the latest released Debezium version?
Yes, the newest stable version is 2.7.1.Final, exactly is what we're using
Do you have the connector logs, ideally from start till finish?
Yes, I have all the logs printed by pakcage "io.deb.*", as pasted below:
2024-08-12 10:42:10,426 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) Starting OracleConnectorTask with configuration: 2024-08-12 10:42:10,437 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) connector.class = io.debezium.connector.oracle.OracleConnector 2024-08-12 10:42:10,439 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) auto.create.topics.enable = false 2024-08-12 10:42:10,443 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) bootstrap.servers = kafka:9093,kafka2:9095,kafka3:9097 2024-08-12 10:42:10,444 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) schema.history.internal.producer.security.protocol = SSL 2024-08-12 10:42:10,444 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) schema.history.internal.producer.ssl.truststore.password = ******** 2024-08-12 10:42:10,444 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) lob.enabled = true 2024-08-12 10:42:10,447 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) offset.storage.topic = org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7.reader.offset.storage 2024-08-12 10:42:10,447 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) interval.handling.mode = string 2024-08-12 10:42:10,447 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) errors.retry.delay.initial.ms = 300 2024-08-12 10:42:10,447 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) ssl.truststore.password = ******** 2024-08-12 10:42:10,447 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) schema.history.internal.consumer.ssl.keystore.password = ******** 2024-08-12 10:42:10,447 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) key.converter = org.apache.kafka.connect.json.JsonConverter 2024-08-12 10:42:10,448 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) ssl.endpoint.identification.algorithm = 2024-08-12 10:42:10,455 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) database.dbname = ORCLCDB 2024-08-12 10:42:10,455 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) database.user = c##dbzuser 2024-08-12 10:42:10,455 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) schema.history.internal.consumer.ssl.keystore.location = /etc/kafka/secrets/client.keystore.p12 2024-08-12 10:42:10,455 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) schema.history.internal.consumer.ssl.endpoint.identification.algorithm = 2024-08-12 10:42:10,455 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) offset.storage = org.apache.kafka.connect.storage.KafkaOffsetBackingStore 2024-08-12 10:42:10,456 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) internal.log.mining.transaction.snapshot.boundary.mode = all 2024-08-12 10:42:10,456 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) schema.history.internal.kafka.bootstrap.servers = kafka:9093,kafka2:9095,kafka3:9097 2024-08-12 10:42:10,456 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) schema.history.internal.producer.ssl.endpoint.identification.algorithm = 2024-08-12 10:42:10,457 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) ssl.truststore.location = /etc/kafka/secrets/int.truststore.p12 2024-08-12 10:42:10,457 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) ssl.keystore.password = ******** 2024-08-12 10:42:10,457 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) errors.max.retries = -1 2024-08-12 10:42:10,458 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) database.password = ******** 2024-08-12 10:42:10,458 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) schema.history.internal.producer.ssl.keystore.location = /etc/kafka/secrets/client.keystore.p12 2024-08-12 10:42:10,458 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) name = ORCLCDB/ORCLPDB1 2024-08-12 10:42:10,458 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) schema.history.internal.producer.bootstrap.servers = kafka:9093,kafka2:9095,kafka3:9097 2024-08-12 10:42:10,458 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) max.batch.size = 2048 2024-08-12 10:42:10,459 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) skipped.operations = none 2024-08-12 10:42:10,459 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) snapshot.mode = initial 2024-08-12 10:42:10,459 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) schema.history.internal.consumer.security.protocol = SSL 2024-08-12 10:42:10,459 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) max.queue.size = 8192 2024-08-12 10:42:10,459 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) tasks.max = 1 2024-08-12 10:42:10,459 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) retry.backoff.ms = 1000 2024-08-12 10:42:10,460 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) migration.feature.gates = 2024-08-12 10:42:10,460 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) tombstones.on.delete = false 2024-08-12 10:42:10,460 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) topic.prefix = org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7 2024-08-12 10:42:10,461 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) offset.storage.file.filename = 2024-08-12 10:42:10,461 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) schema.history.internal.kafka.topic = org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7.reader.history 2024-08-12 10:42:10,463 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) schema.history.internal.consumer.ssl.truststore.location = /etc/kafka/secrets/int.truststore.p12 2024-08-12 10:42:10,463 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) offset.storage.partitions = 1 2024-08-12 10:42:10,463 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) value.converter = org.apache.kafka.connect.json.JsonConverter 2024-08-12 10:42:10,464 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) schema.history.internal.producer.ssl.keystore.password = ******** 2024-08-12 10:42:10,464 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) log.mining.batch.size.max = 300000 2024-08-12 10:42:10,464 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) database.pdb.name = ORCLPDB1 2024-08-12 10:42:10,464 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) topics = org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7.DEBEZIUM.ORA_MISSING_RECORD 2024-08-12 10:42:10,464 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) schema.history.internal.consumer.bootstrap.servers = kafka:9093,kafka2:9095,kafka3:9097 2024-08-12 10:42:10,464 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) schema.history.internal.consumer.ssl.truststore.password = ******** 2024-08-12 10:42:10,464 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) ssl.keystore.location = /etc/kafka/secrets/client.keystore.p12 2024-08-12 10:42:10,465 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) log.mining.view.fetch.size = 100000 2024-08-12 10:42:10,465 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) offset.flush.timeout.ms = 2000 2024-08-12 10:42:10,465 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) errors.retry.delay.max.ms = 10000 2024-08-12 10:42:10,465 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) snapshot.max.threads = 1 2024-08-12 10:42:10,465 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) database.port = 1521 2024-08-12 10:42:10,465 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) offset.flush.interval.ms = 10000 2024-08-12 10:42:10,465 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) security.protocol = SSL 2024-08-12 10:42:10,465 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) driver.connection.adapter = logminer 2024-08-12 10:42:10,465 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) schema.history.internal = io.debezium.storage.kafka.history.KafkaSchemaHistory 2024-08-12 10:42:10,465 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) database.hostname = oracle 2024-08-12 10:42:10,465 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) schema.name.adjustment.mode = avro 2024-08-12 10:42:10,465 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) log.mining.batch.size.min = 10000 2024-08-12 10:42:10,466 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) log.mining.batch.size.default = 1024 2024-08-12 10:42:10,466 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) offset.storage.replication.factor = 3 2024-08-12 10:42:10,466 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) schema.history.internal.producer.ssl.truststore.location = /etc/kafka/secrets/int.truststore.p12 2024-08-12 10:42:10,466 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) driver.out.server.name = dbzxout 2024-08-12 10:42:10,466 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) table.include.list = DEBEZIUM.ORA_MISSING_RECORD 2024-08-12 10:42:10,476 INFO [io.deb.con.CommonConnectorConfig] (pool-8-thread-1) Loading the custom source info struct maker plugin: io.debezium.connector.oracle.OracleSourceInfoStructMaker 2024-08-12 10:42:10,494 INFO [io.deb.con.CommonConnectorConfig] (pool-8-thread-1) Loading the custom topic naming strategy plugin: io.debezium.schema.SchemaTopicNamingStrategy 2024-08-12 10:42:10,663 INFO [io.deb.con.ora.OracleConnection] (pool-8-thread-1) Database Version: Oracle Database 19c Enterprise Edition Release 19.0.0.0.0 - Production 2024-08-12 10:42:10,705 INFO [io.deb.sto.kaf.his.KafkaSchemaHistory] (pool-8-thread-1) KafkaSchemaHistory Consumer config: {key.deserializer=org.apache.kafka.common.serialization.StringDeserializer, value.deserializer=org.apache.kafka.common.serialization.StringDeserializer, group.id=org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-schemahistory, ssl.keystore.location=/etc/kafka/secrets/client.keystore.p12, bootstrap.servers=kafka:9093,kafka2:9095,kafka3:9097, security.protocol=SSL, enable.auto.commit=false, ssl.truststore.location=/etc/kafka/secrets/int.truststore.p12, ssl.keystore.password=********, fetch.min.bytes=1, ssl.truststore.password=********, session.timeout.ms=10000, auto.offset.reset=earliest, ssl.endpoint.identification.algorithm=, client.id=org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-schemahistory} 2024-08-12 10:42:10,707 INFO [io.deb.sto.kaf.his.KafkaSchemaHistory] (pool-8-thread-1) KafkaSchemaHistory Producer config: {batch.size=32768, acks=1, ssl.keystore.location=/etc/kafka/secrets/client.keystore.p12, bootstrap.servers=kafka:9093,kafka2:9095,kafka3:9097, buffer.memory=1048576, key.serializer=org.apache.kafka.common.serialization.StringSerializer, security.protocol=SSL, retries=1, ssl.truststore.location=/etc/kafka/secrets/int.truststore.p12, value.serializer=org.apache.kafka.common.serialization.StringSerializer, ssl.keystore.password=********, max.block.ms=10000, ssl.truststore.password=********, ssl.endpoint.identification.algorithm=, client.id=org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-schemahistory, linger.ms=0} 2024-08-12 10:42:10,708 INFO [io.deb.uti.Threads] (pool-8-thread-1) Requested thread factory for component OracleConnector, id = org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7 named = db-history-config-check 2024-08-12 10:42:10,804 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) No previous offsets found 2024-08-12 10:42:10,898 INFO [io.deb.con.ora.OracleConnection] (pool-8-thread-1) Database Version: Oracle Database 19c Enterprise Edition Release 19.0.0.0.0 - Production 2024-08-12 10:42:10,951 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) Connector started for the first time. 2024-08-12 10:42:11,416 INFO [io.deb.sto.kaf.his.KafkaSchemaHistory] (pool-8-thread-1) Database schema history topic '(name=org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7.reader.history, numPartitions=1, replicationFactor=default, replicasAssignments=null, configs={cleanup.policy=delete, retention.ms=9223372036854775807, retention.bytes=-1})' created 2024-08-12 10:42:11,445 INFO [io.deb.con.ora.OracleConnectorTask] (pool-8-thread-1) No previous offset found 2024-08-12 10:42:11,455 INFO [io.deb.uti.Threads] (pool-8-thread-1) Requested thread factory for component OracleConnector, id = org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7 named = SignalProcessor 2024-08-12 10:42:11,460 INFO [io.deb.uti.Threads] (pool-8-thread-1) Requested thread factory for component OracleConnector, id = org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7 named = change-event-source-coordinator 2024-08-12 10:42:11,460 INFO [io.deb.uti.Threads] (pool-8-thread-1) Requested thread factory for component OracleConnector, id = org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7 named = blocking-snapshot 2024-08-12 10:42:11,461 INFO [io.deb.uti.Threads] (pool-8-thread-1) Creating thread debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator 2024-08-12 10:42:11,478 INFO [io.deb.pip.ChangeEventSourceCoordinator] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Metrics registered 2024-08-12 10:42:11,486 INFO [io.deb.pip.ChangeEventSourceCoordinator] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Context created 2024-08-12 10:42:11,487 INFO [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) According to the connector configuration both schema and data will be snapshot. 2024-08-12 10:42:11,520 INFO [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Snapshot step 1 - Preparing 2024-08-12 10:42:11,521 INFO [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Snapshot step 2 - Determining captured tables 2024-08-12 10:42:12,234 INFO [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Adding table ORCLPDB1.DEBEZIUM.EMPLOYEE to the list of capture schema tables 2024-08-12 10:42:12,236 INFO [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Adding table ORCLPDB1.DEBEZIUM.MLYTEST2 to the list of capture schema tables 2024-08-12 10:42:12,238 INFO [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Adding table ORCLPDB1.C##DBZUSER.MLYTEST to the list of capture schema tables 2024-08-12 10:42:12,238 INFO [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Adding table ORCLPDB1.DEBEZIUM.ORA_MISSING_RECORD to the list of capture schema tables 2024-08-12 10:42:12,246 INFO [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Created connection pool with 1 threads 2024-08-12 10:42:12,247 INFO [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Snapshot step 3 - Locking captured tables [ORCLPDB1.DEBEZIUM.ORA_MISSING_RECORD] 2024-08-12 10:42:12,251 INFO [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Snapshot step 4 - Determining snapshot offset 2024-08-12 10:42:12,507 INFO [io.deb.con.ora.log.LogMinerAdapter] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Consulting V$TRANSACTION and transaction logs for resolving snapshot offset. 2024-08-12 10:42:14,396 INFO [io.deb.con.ora.log.LogMinerAdapter] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Querying transaction logs, please wait... 2024-08-12 10:42:17,458 INFO [io.deb.con.ora.log.LogMinerAdapter] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Found no in-progress transactions. 2024-08-12 10:42:17,471 INFO [io.deb.jdb.JdbcConnection] (pool-20-thread-1) Connection gracefully closed 2024-08-12 10:42:17,473 INFO [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Snapshot step 5 - Reading structure of captured tables 2024-08-12 10:42:17,474 INFO [io.deb.con.ora.OracleSnapshotChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) All eligible tables schema should be captured, capturing: [ORCLPDB1.C##DBZUSER.MLYTEST, ORCLPDB1.DEBEZIUM.EMPLOYEE, ORCLPDB1.DEBEZIUM.MLYTEST2, ORCLPDB1.DEBEZIUM.ORA_MISSING_RECORD] 2024-08-12 10:42:17,563 INFO [io.deb.con.ora.OracleConnection] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Registering 'ORCLPDB1.C##DBZUSER.LOG_MINING_FLUSH' attributes: object_id=74252, data_object_id=74252 2024-08-12 10:42:17,566 INFO [io.deb.con.ora.OracleConnection] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Registering 'ORCLPDB1.C##DBZUSER.MLYTEST' attributes: object_id=74266, data_object_id=74266 2024-08-12 10:42:17,681 INFO [io.deb.con.ora.OracleConnection] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Registering 'ORCLPDB1.DEBEZIUM.EMPLOYEE' attributes: object_id=74318, data_object_id=74318 2024-08-12 10:42:17,682 INFO [io.deb.con.ora.OracleConnection] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Registering 'ORCLPDB1.DEBEZIUM.MLYTEST2' attributes: object_id=74276, data_object_id=74276 2024-08-12 10:42:17,683 INFO [io.deb.con.ora.OracleConnection] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Registering 'ORCLPDB1.DEBEZIUM.ORA_MISSING_RECORD' attributes: object_id=74437, data_object_id=74437 2024-08-12 10:42:17,704 INFO [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Snapshot step 6 - Persisting schema history 2024-08-12 10:42:17,704 INFO [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Capturing structure of table ORCLPDB1.C##DBZUSER.MLYTEST 2024-08-12 10:42:18,495 INFO [io.deb.rel.his.SchemaHistoryMetrics] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Already applied 1 database changes 2024-08-12 10:42:18,517 INFO [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Capturing structure of table ORCLPDB1.DEBEZIUM.EMPLOYEE 2024-08-12 10:42:18,659 INFO [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Capturing structure of table ORCLPDB1.DEBEZIUM.MLYTEST2 2024-08-12 10:42:18,738 INFO [io.deb.rel.his.SchemaHistoryMetrics] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Already applied 3 database changes 2024-08-12 10:42:18,739 INFO [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Capturing structure of table ORCLPDB1.DEBEZIUM.ORA_MISSING_RECORD 2024-08-12 10:42:18,961 WARN [io.deb.sch.SchemaNameAdjuster] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) The Kafka Connect schema name 'org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7.DEBEZIUM.ORA_MISSING_RECORD.Value' is not a valid Avro schema name, so replacing with 'org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e_5897_11ef_9c11_966d242088e7.DEBEZIUM.ORA_MISSING_RECORD.Value' 2024-08-12 10:42:18,962 WARN [io.deb.sch.SchemaNameAdjuster] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) The Kafka Connect schema name 'org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7.DEBEZIUM.ORA_MISSING_RECORD.Key' is not a valid Avro schema name, so replacing with 'org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e_5897_11ef_9c11_966d242088e7.DEBEZIUM.ORA_MISSING_RECORD.Key' 2024-08-12 10:42:18,968 WARN [io.deb.sch.SchemaNameAdjuster] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) The Kafka Connect schema name 'org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7.DEBEZIUM.ORA_MISSING_RECORD.Envelope' is not a valid Avro schema name, so replacing with 'org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e_5897_11ef_9c11_966d242088e7.DEBEZIUM.ORA_MISSING_RECORD.Envelope' 2024-08-12 10:42:18,981 INFO [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Snapshot step 7 - Snapshotting data 2024-08-12 10:42:18,982 INFO [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Creating snapshot worker pool with 1 worker thread(s) 2024-08-12 10:42:18,984 INFO [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) For table 'ORCLPDB1.DEBEZIUM.ORA_MISSING_RECORD' using select statement: 'SELECT "COL1", "COL2" FROM "DEBEZIUM"."ORA_MISSING_RECORD" AS OF SCN 5908027' 2024-08-12 10:42:18,989 INFO [io.deb.rel.RelationalSnapshotChangeEventSource] (pool-21-thread-1) Exporting data from table 'ORCLPDB1.DEBEZIUM.ORA_MISSING_RECORD' (1 of 1 tables) 2024-08-12 10:42:18,996 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) 4 records sent during previous 00:00:08.68, last recorded offset of {server=org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7} partition is {snapshot_scn=5908027, snapshot=true, scn=5908027, snapshot_completed=false} 2024-08-12 10:42:19,273 INFO [io.deb.rel.RelationalSnapshotChangeEventSource] (pool-21-thread-1) Finished exporting 3036 records for table 'ORCLPDB1.DEBEZIUM.ORA_MISSING_RECORD' (1 of 1 tables); total duration '00:00:00.284' 2024-08-12 10:42:19,281 INFO [io.deb.pip.sou.AbstractSnapshotChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Snapshot - Final stage 2024-08-12 10:42:19,288 INFO [io.deb.pip.sou.AbstractSnapshotChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Snapshot completed 2024-08-12 10:42:19,290 INFO [io.deb.pip.ChangeEventSourceCoordinator] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Snapshot ended with SnapshotResult [status=COMPLETED, offset=OracleOffsetContext [scn=5908027, commit_scn=[], lcr_position=null]] 2024-08-12 10:42:19,298 INFO [io.deb.pip.ChangeEventSourceCoordinator] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Connected metrics set to 'true' 2024-08-12 10:42:19,327 INFO [io.deb.pip.sig.SignalProcessor] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) SignalProcessor started. Scheduling it every 5000ms 2024-08-12 10:42:19,337 INFO [io.deb.uti.Threads] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Creating thread debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-SignalProcessor 2024-08-12 10:42:19,355 INFO [io.deb.pip.ChangeEventSourceCoordinator] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Starting streaming 2024-08-12 10:42:19,547 INFO [io.deb.con.ora.log.LogMinerStreamingChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Redo Log Group Sizes: 2024-08-12 10:42:19,555 INFO [io.deb.con.ora.log.LogMinerStreamingChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Group #4: 8589934592 bytes 2024-08-12 10:42:19,556 INFO [io.deb.con.ora.log.LogMinerStreamingChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Group #5: 8589934592 bytes 2024-08-12 10:42:19,556 INFO [io.deb.con.ora.log.LogMinerStreamingChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Group #6: 8589934592 bytes 2024-08-12 10:42:19,556 INFO [io.deb.con.ora.log.LogMinerStreamingChangeEventSource] (debezium-oracleconnector-org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7-change-event-source-coordinator) Group #7: 8589934592 bytes 2024-08-12 10:42:37,818 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) 3186 records sent during previous 00:00:18.823, last recorded offset of {server=org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7} partition is {commit_scn=5909049:1:0900130065070000, snapshot_scn=5908027, scn=5909048} 2024-08-12 10:42:45,758 INFO [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) 2200 records sent during previous 00:00:07.94, last recorded offset of {server=org_vctpdemo2oE4SUGc_prj_vctpdemo2oE4SUGc.8485740e-5897-11ef-9c11-966d242088e7} partition is {commit_scn=5920071:1:07000100530e0000, snapshot_scn=5908027, scn=5920070}
How to reproduce the issue using our tutorial deployment?
We first observed this bug in our system using debezium engine, then I tried using tutorial deployment, and this is also very easy to reproduce, I wrote 2 auxiliary scripts to help reproduce this issue, please follow these steps:
1. Ensure a local oracle docker is running. (In our case, oracle 19)
2. Enter tutorial directory, run the following command to set up environment:
# use newest version, but actually old version can also run into the same bug export DEBEZIUM_VERSION=3.0 docker-compose -f docker-compose-oracle.yaml up --build
3. Under tutorial directory, add these 2 scripts: first one called insert.sh which is used to create table and keep inserting 10000 rows of records:
#!/bin/bash # remember to replace with your own settings username=debezium password=transporter port=1521 hostname=localhost dbname=ORCLPDB1 # first create customers table create_table_sql="drop table customers;\n CREATE TABLE customers (\n id NUMBER(6) GENERATED BY DEFAULT ON NULL AS IDENTITY (START WITH 1001) NOT NULL PRIMARY KEY,\n first_name VARCHAR2(255) NOT NULL,\n last_name VARCHAR2(255) NOT NULL,\n email VARCHAR2(255) NOT NULL\n );\n GRANT SELECT ON customers to c##dbzuser;\n ALTER TABLE customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;\n exit;" sql_file="/tmp/create.sql" echo $create_table_sql > $sql_file # create table sqlplus $username/$password@$hostname:$port/$dbname @ $sql_file # insert 10000 rows, with id from 1 to 10000 get_batch_insert () { batch_sql="" for i in {1..10000} do first_name="firstname$i" last_name="lastname$i" batch_sql+="insert into customers values ($i, '$first_name', '$last_name', '$first_name@example.com');\ncommit;\n" done batch_sql+="exit;\n" echo $batch_sql } batch_sql=$( get_batch_insert ) printf "$batch_sql" | sqlplus $username/$password@$hostname:$port/$dbname
second one is called launch.sh which orchestrates the whole test workflow: first launches insert.sh to keep inserting data, then start oracle connector, finally start kafka consumer to export kafka messages to a text file for analysis:
#!/bin/bash # set trace log level curl -s -X PUT -H "Content-Type:application/json" http://localhost:8083/admin/loggers/io.debezium.connector.oracle -d '{"level": "TRACE"}' # create table + insert data sh ./insert.sh & # sleep some time to wait for some snapshot data to be inserted, adjust this parameter according to your oracle db processing speed sleep 17 # launch oracle connector curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-oracle-logminer.jsonsleep 5# consume customers table, exporting data into records.txt for analysis docker-compose -f docker-compose-oracle.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \ --bootstrap-server kafka:9092 \ --from-beginning \ --property print.key=true \ --topic server1.DEBEZIUM.CUSTOMERS > /tmp/records.txt
Remember to change register-oracle-logminer.json file value based on your settings, here is what I'm using:
{ "name": "inventory-connector", "config": { "connector.class" : "io.debezium.connector.oracle.OracleConnector", "tasks.max" : "1", "topic.prefix" : "server1", "database.hostname" : "host.docker.internal", "database.port" : "1521", "database.user" : "c##dbzuser", "database.password" : "transporter", "database.dbname" : "ORCLCDB", "database.pdb.name" : "ORCLPDB1", "schema.history.internal.kafka.bootstrap.servers" : "kafka:9092", "schema.history.internal.kafka.topic": "schema-changes.inventory", "internal.log.mining.transaction.snapshot.boundary.mode": "all", "table.include.list": "DEBEZIUM.CUSTOMERS" } }
4. Launch test case by running `sh launch.sh`
5. Wait for insert.sh to finish, then open `/tmp/records.txt`, you will find there're 9999 lines, with 1 line missing. Find the keyword "snapshot":"last", one result will show up, and that line is the last snapshot message, while the next line is the first cdc message, judging from these 2 records' id, you can find the first cdc message is lost, below is the example result of these 2 records in my test results, with ID=3565 and ID=3667, ID 3666 is missing if you search the whole file.
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"ID"}],"optional":false,"name":"server1.DEBEZIUM.CUSTOMERS.Key"},"payload":{"ID":3565}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"ID"},{"type":"string","optional":false,"field":"FIRST_NAME"},{"type":"string","optional":false,"field":"LAST_NAME"},{"type":"string","optional":false,"field":"EMAIL"}],"optional":true,"name":"server1.DEBEZIUM.CUSTOMERS.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"ID"},{"type":"string","optional":false,"field":"FIRST_NAME"},{"type":"string","optional":false,"field":"LAST_NAME"},{"type":"string","optional":false,"field":"EMAIL"}],"optional":true,"name":"server1.DEBEZIUM.CUSTOMERS.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,first,first_in_data_collection,last_in_data_collection,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"string","optional":true,"field":"txId"},{"type":"string","optional":true,"field":"scn"},{"type":"string","optional":true,"field":"commit_scn"},{"type":"string","optional":true,"field":"lcr_position"},{"type":"string","optional":true,"field":"rs_id"},{"type":"int64","optional":true,"field":"ssn"},{"type":"int32","optional":true,"field":"redo_thread"},{"type":"string","optional":true,"field":"user_name"},{"type":"string","optional":true,"field":"redo_sql"},{"type":"string","optional":true,"field":"row_id"}],"optional":false,"name":"io.debezium.connector.oracle.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"server1.DEBEZIUM.CUSTOMERS.Envelope","version":2},"payload":{"before":null,"after":{"ID":3565,"FIRST_NAME":"firstname3565","LAST_NAME":"lastname3565","EMAIL":"firstname3565@example.com"},"source":{"version":"3.0.8.Final","connector":"oracle","name":"server1","ts_ms":1742270703000,"snapshot":"last","db":"ORCLPDB1","sequence":null,"ts_us":1742270703000000,"ts_ns":1742270703000000000,"schema":"DEBEZIUM","table":"CUSTOMERS","txId":null,"scn":"20732831","commit_scn":null,"lcr_position":null,"rs_id":null,"ssn":0,"redo_thread":null,"user_name":null,"redo_sql":null,"row_id":null},"transaction":null,"op":"r","ts_ms":1742270710874,"ts_us":1742270710874853,"ts_ns":1742270710874853721}} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"ID"}],"optional":false,"name":"server1.DEBEZIUM.CUSTOMERS.Key"},"payload":{"ID":3667}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"ID"},{"type":"string","optional":false,"field":"FIRST_NAME"},{"type":"string","optional":false,"field":"LAST_NAME"},{"type":"string","optional":false,"field":"EMAIL"}],"optional":true,"name":"server1.DEBEZIUM.CUSTOMERS.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"ID"},{"type":"string","optional":false,"field":"FIRST_NAME"},{"type":"string","optional":false,"field":"LAST_NAME"},{"type":"string","optional":false,"field":"EMAIL"}],"optional":true,"name":"server1.DEBEZIUM.CUSTOMERS.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,first,first_in_data_collection,last_in_data_collection,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"string","optional":true,"field":"txId"},{"type":"string","optional":true,"field":"scn"},{"type":"string","optional":true,"field":"commit_scn"},{"type":"string","optional":true,"field":"lcr_position"},{"type":"string","optional":true,"field":"rs_id"},{"type":"int64","optional":true,"field":"ssn"},{"type":"int32","optional":true,"field":"redo_thread"},{"type":"string","optional":true,"field":"user_name"},{"type":"string","optional":true,"field":"redo_sql"},{"type":"string","optional":true,"field":"row_id"}],"optional":false,"name":"io.debezium.connector.oracle.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"server1.DEBEZIUM.CUSTOMERS.Envelope","version":2},"payload":{"before":null,"after":{"ID":3667,"FIRST_NAME":"firstname3667","LAST_NAME":"lastname3667","EMAIL":"firstname3667@example.com"},"source":{"version":"3.0.8.Final","connector":"oracle","name":"server1","ts_ms":1742270704000,"snapshot":"false","db":"ORCLPDB1","sequence":null,"ts_us":1742270704000000,"ts_ns":1742270704000000000,"schema":"DEBEZIUM","table":"CUSTOMERS","txId":"07001600cd220000","scn":"20732834","commit_scn":"20732835","lcr_position":null,"rs_id":"0x000025.000a56d1.0010","ssn":0,"redo_thread":1,"user_name":"DEBEZIUM","redo_sql":null,"row_id":"AAAXYcAAYAAAG65ABl"},"transaction":null,"op":"c","ts_ms":1742270711705,"ts_us":1742270711705119,"ts_ns":1742270711705119345}}
Feature request or enhancement
For feature requests or enhancements, provide this information, please:
Which use case/requirement will be addressed by the proposed feature?
The scenario in which we ran into this bug is a quite common use case: when executing data migration using debezium engine on some tables with high RPS, this error is likely to occur, causing the final data missing
Implementation ideas (optional)
I'm not sure what caused this bug, but seems related to some locking mechanism flaw? Could this be solved by applying a more strict lock during snapshot stage?
- relates to
-
DBZ-8060 Dropping in process batch transactions when shutting down
-
- Closed
-