Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-1617

Blacklisted columns are not being filtered out when generating a Kafka message from a CDC event

    XMLWordPrintable

    Details

    • Steps to Reproduce:
      Hide
      • Create a table in SQLServer called 'table1' with columns x,y,z
      • Create a CDC table called 'table1_cdc' in order to track 'table1' but only including columns x,y
      • Create a Kafka Connect connector which tracks 'table1' and having column 'z' in "column.blacklist" parameter
      • Wait till connector is created and snapshot is done
      • Update a record in 'table1'
      • You should get exception mentioned in 'Description' above
      Show
      Create a table in SQLServer called 'table1' with columns x,y,z Create a CDC table called 'table1_cdc' in order to track 'table1' but only including columns x,y Create a Kafka Connect connector which tracks 'table1' and having column 'z' in "column.blacklist" parameter Wait till connector is created and snapshot is done Update a record in 'table1' You should get exception mentioned in 'Description' above

      Description

      Let me put some context before:
      We have a Debezium connector which is listening for SQLServer table but the original table has one column more than CDC table:
      OriginalTable:

       Columns: x, y, z 

      CDCTable to track OriginalTable:

       Columns: x,y 

      So, when a CDC event comes in, TableSchemaBuilder:createValueGenerator() is called with columns taken from the initial schema snapshot (x,y,z) and incoming CDC event doesn't have "z" column. The problem is that this method is not filtering out blcklisted columns from columns, thus we are getting following exception:

      org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped. at io.debezium.connector.base.ChangeEventQueue.throwProducerFailureIfPresent(ChangeEventQueue.java:171) at io.debezium.connector.base.ChangeEventQueue.poll(ChangeEventQueue.java:151) at io.debezium.connector.sqlserver.SqlServerConnectorTask.poll(SqlServerConnectorTask.java:164) at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:245) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:221) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.connect.errors.ConnectException: Data row is smaller than a column index, internal schema representation is probably out of sync with real database schema at io.debezium.relational.TableSchemaBuilder.validateIncomingRowToInternalMetadata(TableSchemaBuilder.java:218) at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$4(TableSchemaBuilder.java:244) at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:143) at io.debezium.relational.RelationalChangeRecordEmitter.emitUpdateRecord(RelationalChangeRecordEmitter.java:96) at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:49) at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:145) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$execute$1(SqlServerStreamingChangeEventSource.java:237) at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:497) at io.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:145) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:154) at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:97) ... 5 more\
      

      To be honest, I'm not 100% sure that this is affecting all database types which use CDC tables, but I'd bet so.

        Attachments

          Activity

            People

            Assignee:
            Unassigned Unassigned
            Reporter:
            isanjose Ivan San Jose Garcia
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

              Dates

              Created:
              Updated:
              Resolved: