-
Bug
-
Resolution: Done
-
Critical
-
2.5.3.Final
-
None
In order to make your issue reports as actionable as possible, please provide the following information, depending on the issue type.
Bug report
It seems Debezium send duplicated data after restarting. We have stored offsets.dat file on object storage and we make sure the Debezium read the offset.data file when restarting. Here are the details:
- Start Debezium and synchronizing data of database test_big_sync, using incremental snapshot. Database test_big_sync has 1000 tables, each table has 20000 rows.
- Wait for all table structures(DDL) having been sent.
- Wait for 5 table's snapshot data having been received.
- Stop Debezium.
- Restart Debezium.
- We can see the snapshot data of 5 tables below is being sent again.
By reading the code in
AbstractIncrementalSnapshotContext, we find that the
dataCollectionsToSnapshotJson in
SnapshotDataCollection is never updated after start. Since the cache in jsonString() is always hit, dataCollectionsToSnapshotJson will never be updated.
void add(List<DataCollection<T>> dataCollectionIds) { this.dataCollectionsToSnapshot.addAll(dataCollectionIds); this.dataCollectionsToSnapshotJson = jsonString(); } DataCollection<T> getNext() { DataCollection<T> nextDataCollection = this.dataCollectionsToSnapshot.poll(); this.dataCollectionsToSnapshotJson = jsonString(); return nextDataCollection; } private String jsonString() { // TODO Handle non-standard table ids containing dots, commas etc. if (!Strings.isNullOrEmpty(dataCollectionsToSnapshotJson)) { // A cached value to improve performance since this method is called in the "store" // that is called during events processing return dataCollectionsToSnapshotJson; } try { List<LinkedHashMap<String, String>> dataCollectionsMap = dataCollectionsToSnapshot.stream() .map(x -> { LinkedHashMap<String, String> map = new LinkedHashMap<>(); map.put(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_ID, x.getId().toString()); map.put(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_ADDITIONAL_CONDITION, x.getAdditionalCondition().orElse(null)); map.put(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_SURROGATE_KEY, x.getSurrogateKey().orElse(null)); return map; }) .collect(Collectors.toList()); return mapper.writeValueAsString(dataCollectionsMap); } catch (JsonProcessingException e) { throw new DebeziumException("Cannot serialize dataCollectionsToSnapshot information"); }
What Debezium connector do you use and what version?
v2.5.3.Final
What is the connector configuration?
<Your answer>
What is the captured database version and mode of depoyment?
GaiaDB - Baidu Cloud
What behaviour do you expect?
No duplicated data is sent.
What behaviour do you see?
Duplicated data of table has been sent.
Do you see the same behaviour using the latest relesead Debezium version?
Yes, the code in branch main is still the same as v2.5.3.Final
Do you have the connector logs, ideally from start till finish?
(You might be asked later to provide DEBUG/TRACE level log)
<Your answer>
How to reproduce the issue using our tutorial deployment?
<Your answer>
Feature request or enhancement
Don't send the duplicated data of table in snapshot stage.
Which use case/requirement will be addressed by the proposed feature?
<Your answer>
Implementation ideas (optional)
<Your answer>
- links to
-
RHEA-2024:139598 Red Hat build of Debezium 2.5.4 release