-
Bug
-
Resolution: Done
-
Major
-
None
-
None
Let me put some context before:
We have a Debezium connector which is listening for SQLServer table but the original table has one column more than CDC table:
OriginalTable:
Columns: x, y, z
CDCTable to track OriginalTable:
Columns: x,y
So, when a CDC event comes in, TableSchemaBuilder:createValueGenerator() is called with columns taken from the initial schema snapshot (x,y,z) and incoming CDC event doesn't have "z" column. The problem is that this method is not filtering out blcklisted columns from columns, thus we are getting following exception:
org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped. at io.debezium.connector.base.ChangeEventQueue.throwProducerFailureIfPresent(ChangeEventQueue.java:171) at io.debezium.connector.base.ChangeEventQueue.poll(ChangeEventQueue.java:151) at io.debezium.connector.sqlserver.SqlServerConnectorTask.poll(SqlServerConnectorTask.java:164) at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:245) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:221) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.connect.errors.ConnectException: Data row is smaller than a column index, internal schema representation is probably out of sync with real database schema at io.debezium.relational.TableSchemaBuilder.validateIncomingRowToInternalMetadata(TableSchemaBuilder.java:218) at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$4(TableSchemaBuilder.java:244) at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:143) at io.debezium.relational.RelationalChangeRecordEmitter.emitUpdateRecord(RelationalChangeRecordEmitter.java:96) at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:49) at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:145) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$execute$1(SqlServerStreamingChangeEventSource.java:237) at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:497) at io.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:145) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:154) at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:97) ... 5 more\
To be honest, I'm not 100% sure that this is affecting all database types which use CDC tables, but I'd bet so.