-
Bug
-
Resolution: Done
-
Major
-
3.0.1.Final
-
None
-
False
-
None
-
False
-
-
The test failures look like this (example from IncrementalSnapshotIT#snapshotWithAdditionalConditionWithSurrogateKey on DB2):
2024-11-05T16:58:45.0193933Z ---Assert Expected No Records, Found These--- 2024-11-05T16:58:45.0199152Z SourceRecord{sourcePartition={server=testdb}, sourceOffset={event_serial_no=1, commit_lsn=00000000:00001f97:00000000000555d7, change_lsn=00000000:00000000:0000000004f2d6c0}} ConnectRecord{topic='testdb.DB2INST1.A', kafkaPartition=null, key=Struct{PK=2011}, keySchema=Schema{testdb.DB2INST1.A.Key:STRUCT}, value=Struct{after=Struct{PK=2011,AA=12345678},source=Struct{version=3.0.0-SNAPSHOT,connector=db2,name=testdb,ts_ms=1730825924503,db=testdb,ts_us=1730825924503433,ts_ns=1730825924503433000,schema=DB2INST1,table=A,change_lsn=00000000:00000000:0000000004f2d6c0,commit_lsn=00000000:00001f97:00000000000555d7},op=c,ts_ms=1730825924503,ts_us=1730825924503791,ts_ns=1730825924503791279}, valueSchema=Schema{testdb.DB2INST1.A.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)} 2024-11-05T16:58:45.0208556Z SourceRecord{sourcePartition={server=testdb}, sourceOffset={event_serial_no=1, commit_lsn=00000000:00001f97:00000000000555d7, change_lsn=00000000:00000000:0000000004f2d769}} ConnectRecord{topic='testdb.DB2INST1.A', kafkaPartition=null, key=Struct{PK=2012}, keySchema=Schema{testdb.DB2INST1.A.Key:STRUCT}, value=Struct{after=Struct{PK=2012,AA=12345678},source=Struct{version=3.0.0-SNAPSHOT,connector=db2,name=testdb,ts_ms=1730825924503,db=testdb,ts_us=1730825924503433,ts_ns=1730825924503433000,schema=DB2INST1,table=A,change_lsn=00000000:00000000:0000000004f2d769,commit_lsn=00000000:00001f97:00000000000555d7},op=c,ts_ms=1730825924503,ts_us=1730825924503904,ts_ns=1730825924503904000}, valueSchema=Schema{testdb.DB2INST1.A.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)} 2024-11-05T16:58:45.0215248Z SourceRecord{sourcePartition={server=testdb}, sourceOffset={event_serial_no=1, commit_lsn=00000000:00001f97:00000000000555d7, change_lsn=00000000:00000000:0000000004f2d812}} ConnectRecord{topic='testdb.DB2INST1.A', kafkaPartition=null, key=Struct{PK=2013}, keySchema=Schema{testdb.DB2INST1.A.Key:STRUCT}, value=Struct{after=Struct{PK=2013,AA=12345678},source=Struct{version=3.0.0-SNAPSHOT,connector=db2,name=testdb,ts_ms=1730825924503,db=testdb,ts_us=1730825924503433,ts_ns=1730825924503433000,schema=DB2INST1,table=A,change_lsn=00000000:00000000:0000000004f2d812,commit_lsn=00000000:00001f97:00000000000555d7},op=c,ts_ms=1730825924503,ts_us=1730825924503918,ts_ns=1730825924503918928}, valueSchema=Schema{testdb.DB2INST1.A.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)} 2024-11-05T16:58:45.0221893Z SourceRecord{sourcePartition={server=testdb}, sourceOffset={event_serial_no=1, commit_lsn=00000000:00001f97:00000000000555d7, change_lsn=00000000:00000000:0000000004f2d8bb}} ConnectRecord{topic='testdb.DB2INST1.A', kafkaPartition=null, key=Struct{PK=2014}, keySchema=Schema{testdb.DB2INST1.A.Key:STRUCT}, value=Struct{after=Struct{PK=2014,AA=12345678},source=Struct{version=3.0.0-SNAPSHOT,connector=db2,name=testdb,ts_ms=1730825924503,db=testdb,ts_us=1730825924503433,ts_ns=1730825924503433000,schema=DB2INST1,table=A,change_lsn=00000000:00000000:0000000004f2d8bb,commit_lsn=00000000:00001f97:00000000000555d7},op=c,ts_ms=1730825924503,ts_us=1730825924503927,ts_ns=1730825924503927674}, valueSchema=Schema{testdb.DB2INST1.A.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)} 2024-11-05T16:58:45.0228313Z SourceRecord{sourcePartition={server=testdb}, sourceOffset={event_serial_no=1, commit_lsn=00000000:00001f97:00000000000555d7, change_lsn=00000000:00000000:0000000004f2d964}} ConnectRecord{topic='testdb.DB2INST1.A', kafkaPartition=null, key=Struct{PK=2015}, keySchema=Schema{testdb.DB2INST1.A.Key:STRUCT}, value=Struct{after=Struct{PK=2015,AA=12345678},source=Struct{version=3.0.0-SNAPSHOT,connector=db2,name=testdb,ts_ms=1730825924503,db=testdb,ts_us=1730825924503433,ts_ns=1730825924503433000,schema=DB2INST1,table=A,change_lsn=00000000:00000000:0000000004f2d964,commit_lsn=00000000:00001f97:00000000000555d7},op=c,ts_ms=1730825924503,ts_us=1730825924503935,ts_ns=1730825924503935549}, valueSchema=Schema{testdb.DB2INST1.A.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)} 2024-11-05T16:58:45.0234801Z SourceRecord{sourcePartition={server=testdb}, sourceOffset={event_serial_no=1, commit_lsn=00000000:00001f97:00000000000555d7, change_lsn=00000000:00000000:0000000004f2da0d}} ConnectRecord{topic='testdb.DB2INST1.A', kafkaPartition=null, key=Struct{PK=2016}, keySchema=Schema{testdb.DB2INST1.A.Key:STRUCT}, value=Struct{after=Struct{PK=2016,AA=12345678},source=Struct{version=3.0.0-SNAPSHOT,connector=db2,name=testdb,ts_ms=1730825924503,db=testdb,ts_us=1730825924503433,ts_ns=1730825924503433000,schema=DB2INST1,table=A,change_lsn=00000000:00000000:0000000004f2da0d,commit_lsn=00000000:00001f97:00000000000555d7},op=c,ts_ms=1730825924503,ts_us=1730825924503943,ts_ns=1730825924503943083}, valueSchema=Schema{testdb.DB2INST1.A.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)} 2024-11-05T16:58:45.0241187Z SourceRecord{sourcePartition={server=testdb}, sourceOffset={event_serial_no=1, commit_lsn=00000000:00001f97:00000000000555d7, change_lsn=00000000:00000000:0000000004f2dab6}} ConnectRecord{topic='testdb.DB2INST1.A', kafkaPartition=null, key=Struct{PK=2017}, keySchema=Schema{testdb.DB2INST1.A.Key:STRUCT}, value=Struct{after=Struct{PK=2017,AA=12345678},source=Struct{version=3.0.0-SNAPSHOT,connector=db2,name=testdb,ts_ms=1730825924503,db=testdb,ts_us=1730825924503433,ts_ns=1730825924503433000,schema=DB2INST1,table=A,change_lsn=00000000:00000000:0000000004f2dab6,commit_lsn=00000000:00001f97:00000000000555d7},op=c,ts_ms=1730825924503,ts_us=1730825924503950,ts_ns=1730825924503950377}, valueSchema=Schema{testdb.DB2INST1.A.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)} 2024-11-05T16:58:45.0247979Z SourceRecord{sourcePartition={server=testdb}, sourceOffset={event_serial_no=1, commit_lsn=00000000:00001f97:00000000000555d7, change_lsn=00000000:00000000:0000000004f2db5f}} ConnectRecord{topic='testdb.DB2INST1.A', kafkaPartition=null, key=Struct{PK=2018}, keySchema=Schema{testdb.DB2INST1.A.Key:STRUCT}, value=Struct{after=Struct{PK=2018,AA=12345678},source=Struct{version=3.0.0-SNAPSHOT,connector=db2,name=testdb,ts_ms=1730825924503,db=testdb,ts_us=1730825924503433,ts_ns=1730825924503433000,schema=DB2INST1,table=A,change_lsn=00000000:00000000:0000000004f2db5f,commit_lsn=00000000:00001f97:00000000000555d7},op=c,ts_ms=1730825924503,ts_us=1730825924503957,ts_ns=1730825924503957661}, valueSchema=Schema{testdb.DB2INST1.A.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)} 2024-11-05T16:58:45.0254568Z SourceRecord{sourcePartition={server=testdb}, sourceOffset={event_serial_no=1, commit_lsn=00000000:00001f97:00000000000555d7, change_lsn=00000000:00000000:0000000004f2dc08}} ConnectRecord{topic='testdb.DB2INST1.A', kafkaPartition=null, key=Struct{PK=2019}, keySchema=Schema{testdb.DB2INST1.A.Key:STRUCT}, value=Struct{after=Struct{PK=2019,AA=12345678},source=Struct{version=3.0.0-SNAPSHOT,connector=db2,name=testdb,ts_ms=1730825924503,db=testdb,ts_us=1730825924503433,ts_ns=1730825924503433000,schema=DB2INST1,table=A,change_lsn=00000000:00000000:0000000004f2dc08,commit_lsn=00000000:00001f97:00000000000555d7},op=c,ts_ms=1730825924503,ts_us=1730825924503964,ts_ns=1730825924503964984}, valueSchema=Schema{testdb.DB2INST1.A.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)} 2024-11-05T16:58:45.0260977Z SourceRecord{sourcePartition={server=testdb}, sourceOffset={event_serial_no=1, commit_lsn=00000000:00001f97:00000000000555d7, change_lsn=00000000:00000000:0000000004f2dcb1}} ConnectRecord{topic='testdb.DB2INST1.A', kafkaPartition=null, key=Struct{PK=2020}, keySchema=Schema{testdb.DB2INST1.A.Key:STRUCT}, value=Struct{after=Struct{PK=2020,AA=12345678},source=Struct{version=3.0.0-SNAPSHOT,connector=db2,name=testdb,ts_ms=1730825924503,db=testdb,ts_us=1730825924503433,ts_ns=1730825924503433000,schema=DB2INST1,table=A,change_lsn=00000000:00000000:0000000004f2dcb1,commit_lsn=00000000:00001f97:00000000000555d7},op=c,ts_ms=1730825924503,ts_us=1730825924503972,ts_ns=1730825924503972177}, valueSchema=Schema{testdb.DB2INST1.A.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
The root cause is that this part of the test:
int expectedCount = 1000, expectedValue = 12345678; populateTable(); populateTableWithSpecificValue(2000, expectedCount, expectedValue); waitForCdcTransactionPropagation(3); final Configuration config = config().build(); startAndConsumeTillEnd(connectorClass(), config); waitForConnectorToStart(); waitForAvailableRecords(waitTimeForRecords(), TimeUnit.SECONDS); // there shouldn't be any snapshot records assertNoRecordsToConsume();
where we insert ~1000 records at the beginning of the test, start connector and assume there are no records available. However, on slower DBs it may happen that the inserts (especially the second transaction inserting specific events) hasn't been processed yet when the connector is started and resulted into this failure.
Questionable is also
waitForAvailableRecords(waitTimeForRecords(), TimeUnit.SECONDS); // there shouldn't be any snapshot records assertNoRecordsToConsume();
where we wait for the first record and immediately afterwards want to assert that there is no record (which works on faster DBs because waitForAvailableRecords() times out without throwing an exception).
The only reliable solution seems to be SQl server which implements waitForCdcTransactionPropagation(3); and thus we can be sure the transaction were already processed.