Details
-
Bug
-
Resolution: Not a Bug
-
Major
-
None
-
1.9.5.Final
-
None
-
False
-
None
-
False
-
Moderate
Description
Bug report
If we add a couple of actions to start incremental snapshot into signal table with lag of 1-2 seconds:
INSERT INTO db.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["DBINSTANCE.DB.ADDRESS"],"type":"incremental"}'); INSERT INTO db.debezium_signal (id, type, data) VALUES('ad-hoc-2', 'execute-snapshot', '{"data-collections": ["DBINSTANCE.DB.ADDRESS"],"type":"incremental"}'); INSERT INTO db.debezium_signal (id, type, data) VALUES('ad-hoc-3', 'execute-snapshot', '{"data-collections": ["DBINSTANCE.DB.ADDRESS"],"type":"incremental"}');
In logs we see such situation:
2022-12-20 07:49:39,174 INFO Oracle|db|streaming Requested 'INCREMENTAL' snapshot of data collections '[DBINSTANCE.DB.ADDRESS, DBINSTANCE.DB.ADDRESS,DBINSTANCE.DB.ADDRESS]' [io.debezium.pipeline.signal.ExecuteSnapshot] incremental_snapshot_collections=DBINSTANCE.DB.ADDRESS,DBINSTANCE.DB.ADDRESS,DBINSTANCE.DB.ADDRESS.
That looks like, debezium recognized a few actions in the signal table as a one action.
What Debezium connector do you use and what version?
oracle-connector, 1.9.5-final
What is the connector configuration?
{
"name": "oracle-customers-connector",
"config": {
"connector.class": "io.debezium.connector.oracle.OracleConnector",
"tasks.max": "1",
"database.server.name": "${database.server.name}",
"database.url": "${database.url}",
"database.history.kafka.bootstrap.servers": "${bootstrap.servers}",
"database.history.kafka.topic": "${history.topic}",
"database.dbname": "${database.name}",
"database.user": "${database.user}",
"database.password": "${database.password}",
"table.include.list": "DB.DEVICE,DB.CUSTOMER,DB.ADDRESS,DB.ECHANNEL,DB.PROFILE_PERSON,DB.KOM,DB.NMON,DB.INCOME,DB.LINKS,DB.DEBEZIUM_SIGNAL",
"signal.data.collection": "DBINSTANCE.DB.DEBEZIUM_SIGNAL",
"incremental.snapshot.chunk.size": "49152",
"snapshot.mode": "initial",
"snapshot.select.statement.overrides.DB.LINKS": "SELECT a.* from DB.LINKS a where a.is_active = 1",
"snapshot.select.statement.overrides.DB.ADDRESS": "select a.* from DB.ADDRESS a where a.is_active = 1",
"snapshot.select.statement.overrides.DB.ECHANNEL": "select a.* from DB.ECHANNEL a where a.is_active = 1",
"snapshot.select.statement.overrides.DB.PROFILE_PERSON": "select a.* from DB.PROFILE_PERSON a",
"snapshot.select.statement.overrides.DB.KOM": "select a.* from DB.KOM a",
"snapshot.select.statement.overrides.DB.NMON": "select a.* from DB.NMON a",
"snapshot.select.statement.overrides.DB.INCOME": "select a.* from DB.INCOME a where a.is_active = 1",
"snapshot.select.statement.overrides.DB.CUSTOMER": "select a.* from DB.CUSTOMER a",
"snapshot.select.statement.overrides.DB.DEVICE": "select a.* from DB.DEVICE a where a.is_active = 1",
"decimal.handling.mode": "double",
"security.protocol": "SSL",
"ssl.truststore.location": "/****{}{}/{}{}****/kafka_truststore.jks",
"ssl.truststore.password": "${truststore.password}",
"ssl.keystore.location": "/****{}{}/{}{}****/client_keystore.jks",
"ssl.keystore.password": "${keystore.password}",
"ssl.key.password": "${ssl.key.password}",
"database.history.producer.security.protocol": "SSL",
"database.history.producer.ssl.truststore.location": "/****{}{}/{}{}****/kafka_truststore.jks",
"database.history.producer.ssl.truststore.password": "${ruststore.password}",
"database.history.producer.ssl.keystore.location": "/****{}{}/{}{}****/client_keystore.jks",
"database.history.producer.ssl.keystore.password": "${ssl.keystore.password}",
"database.history.producer.ssl.key.password": "${ssl.key.password}",
"database.history.consumer.security.protocol": "SSL",
"database.history.consumer.ssl.truststore.location": "/****{}{}/{}{}****/kafka_truststore.jks",
"database.history.consumer.ssl.truststore.password": "${ssl.truststore.password}",
"database.history.consumer.ssl.keystore.location": "/****{}{}/{}{}****/client_keystore.jks",
"database.history.consumer.ssl.keystore.password": "${ssl.keystore.password}",
"database.history.consumer.ssl.key.password": "${ssl.key.password}",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "${schema.registry.url}",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "${schema.registry.url}",
"basic.auth.credentials.source": "USER_INFO",
"basic.auth.user.info": "${schema.registry.user}:${schema.registry.password}",
"key.converter.schema.registry.ssl.truststore.location": "/****{}{}/{}{}****/kafka_truststore.jks",
"key.converter.schema.registry.ssl.truststore.password": "${ssl.truststore.password}",
"key.converter.schema.registry.ssl.keystore.location": "/****{}{}/{}{}****/client_keystore.jks",
"key.converter.schema.registry.ssl.keystore.password": "${ssl.keystore.password}",
"key.converter.schema.registry.ssl.key.password": "${ssl.key.password}",
"value.converter.schema.registry.ssl.truststore.location": "/****{}{}/{}{}****/kafka_truststore.jks",
"value.converter.schema.registry.ssl.truststore.password": "${ssl.truststore.password}",
"value.converter.schema.registry.ssl.keystore.location": "/****{}{}/{}{}****/client_keystore.jks",
"value.converter.schema.registry.ssl.keystore.password": "${ssl.keystore.password}",
"value.converter.schema.registry.ssl.key.password": "${ssl.key.password}"
}
}
What is the captured database version and mode of depoyment?
oracle 19c, aws ec2
What behaviour do you expect?
Each request to the signal table to create an incremental snapshot will be separated and launched as a separate snapshot
What behaviour do you see?
A few triggers for incremental snapshots recognized as one, in case when we have sequence of events in signal table with small time lag
Do you see the same behaviour using the latest relesead Debezium version?
It's not tested, we use only 1.9.5-final version
Do you have the connector logs, ideally from start till finish?
2022-12-20 07:49:39,174 INFO Oracle|db|streaming Requested 'INCREMENTAL' snapshot of data collections '[DBINSTANCE.DB.ADDRESS, DBINSTANCE.DB.ADDRESS,DBINSTANCE.DB.ADDRESS]' [io.debezium.pipeline.signal.ExecuteSnapshot] incremental_snapshot_collections=DBINSTANCE.DB.ADDRESS,DBINSTANCE.DB.ADDRESS,DBINSTANCE.DB.ADDRESS.
How to reproduce the issue using our tutorial deployment?
Create a few of events in signal table with small time lag