-
Bug
-
Resolution: Done
-
Blocker
-
1.7.2.Final, 1.8.1.Final, 1.9.0.Final
-
None
-
False
-
None
-
False
-
Bug report
step 1: Open two oracle session.
step 2: Insert a data in the first session without commit.
SQL> INSERT INTO CHEN.TEST_LOOP values(1,'1','xxxXXXXXX','cccCCC',sysdate,systimestamp,66.66);
1 row created.
step 3: Insert a data which has same primary key in the second session without commit.
SQL> INSERT INTO CHEN.TEST_LOOP values(1,'2','xxxXXXXXX','cccCCC',sysdate,systimestamp,22.22);
step 4: Commit in the first session.
SQL>Commit complete.
step 5: Commit in the second session.
SQL>ERROR at line 1:
ORA-00001: unique constraint (CHEN.TEST_LOOP_PK) violated
Then oracle-connector-logminer will got two insert sql and one sql with rollback mark because ORA-00001: unique constraint (CHEN.TEST_LOOP_PK) violated
insert into "CHEN"."TEST_LOOP"("NUMTEST","VARTWOTEST","VARTEST","CHARTEST","DATETEST","TIMETEST","DOUBLETEST") values ('1','1','xxxXXXXXX','cccCCC ',TO_DATE('2022-05-05 22:53:24', 'YYYY-MM-DD HH24:MI:SS'),TO_TIMESTAMP('2022-05-05 22:53:24.220999'),'66.66'); ROWID=AAAR33AAHAAAAsSAAL,tx_id=0a000d0045190000 insert into "CHEN"."TEST_LOOP"("NUMTEST","VARTWOTEST","VARTEST","CHARTEST","DATETEST","TIMETEST","DOUBLETEST") values ('1','2','xxxXXXXXX','cccCCC ',TO_DATE('2022-05-05 22:53:29', 'YYYY-MM-DD HH24:MI:SS'),TO_TIMESTAMP('2022-05-05 22:53:29.557254'),'22.22'); ROWID=AAAR33AAHAAAAs8AAB,tx_id=020014003d190000 delete from "CHEN"."TEST_LOOP" where ROWID = 'AAAR33AAHAAAAs8AAB'; ,txid is:02001400ffffffff
I can't find any transactions like '02001400ffffffff'. Therefore, the last insert SQL cannot be deleted from the queue, the useless insert will push to kafka topic.
If my sink database use upsert mode ,it will result in a different between source and sink.
What Debezium connector do you use and what version?
1.7.2.Final 1.8.1.Final 1.9.0.Final
What is the connector configuration?
oracle-connector
What behaviour do you expect?
Delete the insert SQL that has been put into the transaction queue
Implementation ideas (optional)
protected void removeEventWithRowId(LogMinerEventRow row) { MemoryTransaction transaction = getTransactionCache().get(row.getTransactionId()); if (transaction == null) { String txid = row.getTransactionId(); if (txid.toLowerCase().indexOf("ffffff") > 0) { String frontWords = txid.substring(0, txid.toLowerCase().indexOf("ffffff")); Optional<String> newTxid = Optional.empty(); Iterator<Map.Entry<String, MemoryTransaction>> iterator = transactionCache.iterator(); while (iterator.hasNext()) { Map.Entry<String, MemoryTransaction> entry = iterator.next(); if (entry.getKey().startsWith(frontWords)) { newTxid = Optional.of(entry.getKey()); break; } } if (newTxid.isPresent()) { transaction = getTransactionCache().get(newTxid.get()); } } if (transaction == null) { LOGGER.warn("Cannot undo change '{}' since transaction was not found.", row); return; } } transaction.removeEventWithRowId(row.getRowId()); }