-
Bug
-
Resolution: Unresolved
-
Major
-
None
-
None
-
False
-
None
-
False
What Debezium connector do you use and what version?
SQL Server, 2.7.3.Final
Do you see the same behaviour using the latest released Debezium version?
Yes
What behavior do you see?
When a table includes columns which are marked as Hidden e.g.
CREATE TABLE niagaradb.niagaratest.HiddenTest ( DeptID INT NOT NULL PRIMARY KEY CLUSTERED, DeptName VARCHAR(50) NOT NULL, ManagerID INT NULL, ParentDeptID INT NULL, SysStartTime DATETIME2 GENERATED ALWAYS AS ROW START HIDDEN NOT NULL, SysEndTime DATETIME2 GENERATED ALWAYS AS ROW END HIDDEN NOT NULL, PERIOD FOR SYSTEM_TIME (SysStartTime, SysEndTime) ) WITH (SYSTEM_VERSIONING = ON);
If you trigger an incremental snapshot on the table, you will encounter a schema mismatch error:
2024-11-15T09:56:24.779Z WARN 1 --- [rce-coordinator] i.d.pipeline.signal.SignalProcessor : Action snapshot-window-close failed. The signal SignalRecord{id='8f354677-7179-499b-a68c-39b95804e480-close', type='snapshot-window-close', data='{"openWindowTimestamp": "2024-11-15T09:56:19.815216984Z", "closeWindowTimestamp": "2024-11-15T09:56:19.843987916Z"}', additionalData={}} may not have been processed. org.apache.kafka.connect.errors.ConnectException: Data row is smaller than a column index, internal schema representation is probably out of sync with real database schema
This occurs due to the fact that (by default) an incremental snapshot uses *Select ** statements throughout the process, meaning the data for the hidden columns is not returned by the query.
The hidden columns work as intended during an initial snapshot as we use a Select <list of columns> statement.
The hidden columns work as intended when streaming change events as they are included in the sql server cdc tables.
Implementation ideas (optional)
I believe we should rewrite AbstractChunkQueryBuilder#buildProjection to always use a list of columns in the incremental snapshot select statements. e.g.
protected String buildProjection(Table table) { TableId tableId = table.id(); return table.columns().stream() .filter(column -> !connectorConfig.isColumnsFiltered() || columnFilter.matches(tableId.catalog(), tableId.schema(), tableId.table(), column.name())) .map(column -> jdbcConnection.quotedColumnIdString(column.name())) .collect(Collectors.joining(", ")); }