Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-3597

Additional unique index referencing columns not exposed by CDC causes exception

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Done
    • Icon: Blocker Blocker
    • 1.6.0.CR1
    • 1.5.2.Final
    • sqlserver-connector
    • None
    • False
    • False
    • Undefined
    • Hide
      CREATE TABLE [dbo].[MyTable] (
        [Column1] [int] NOT NULL,
        [Column2] [varchar] (255) COLLATE SQL_Latin1_General_CP1_CI_AS NOT NULL,
        [Column3] [char] (10) COLLATE SQL_Latin1_General_CP1_CI_AS NOT NULL,
        [Column4] [timestamp] NOT NULL)
      GO
      
      CREATE UNIQUE NONCLUSTERED INDEX [idxColumn4] ON [dbo].[MyTable] ([Column4])
      GO
      
      CREATE UNIQUE NONCLUSTERED INDEX [uidxColumn1Column2] ON [dbo].[MyTable] ([Column1], [Column2])
      GO
      
      exec sys.sp_cdc_enable_table
        @source_schema = N'dbo',
        @source_name = N'MyTable',
        @role_name = N'MyRole',
        @filegroup_name = N'CDCTABLES',
        @supports_net_changes = 1,
        @index_name = N'uidxColumn1Column2',
        @captured_column_list = N'Column1, Column2, Column3',
        @allow_partition_switch = 0,
        @capture_instance = N'dbo_MyTable';
      
      {
        "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
        "database.user": "MyUser",
        "database.password": "MyPassword",
        "database.dbname": "MyDb",
        "database.hostname": "MySqlServerInstance",
        "database.history.kafka.bootstrap.servers": "b-1.MyCluster:9092,b-2.MyCluster:9092,b-3.MyCluster:9092",
        "database.history.kafka.topic": "MyHistoryTopic",
        "database.server.name": "MyServer",
        "message.key.columns": "MyDb.dbo.MyTable:Column1,Column2;"
      }
      
      Show
      CREATE TABLE [dbo].[MyTable] ( [Column1] [ int ] NOT NULL, [Column2] [varchar] (255) COLLATE SQL_Latin1_General_CP1_CI_AS NOT NULL, [Column3] [ char ] (10) COLLATE SQL_Latin1_General_CP1_CI_AS NOT NULL, [Column4] [timestamp] NOT NULL) GO CREATE UNIQUE NONCLUSTERED INDEX [idxColumn4] ON [dbo].[MyTable] ([Column4]) GO CREATE UNIQUE NONCLUSTERED INDEX [uidxColumn1Column2] ON [dbo].[MyTable] ([Column1], [Column2]) GO exec sys.sp_cdc_enable_table @source_schema = N 'dbo' , @source_name = N 'MyTable' , @role_name = N 'MyRole' , @filegroup_name = N 'CDCTABLES' , @supports_net_changes = 1, @index_name = N 'uidxColumn1Column2' , @captured_column_list = N 'Column1, Column2, Column3' , @allow_partition_switch = 0, @capture_instance = N 'dbo_MyTable' ; { "connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector" , "database.user" : "MyUser" , "database.password" : "MyPassword" , "database.dbname" : "MyDb" , "database.hostname" : "MySqlServerInstance" , "database.history.kafka.bootstrap.servers" : "b-1.MyCluster:9092,b-2.MyCluster:9092,b-3.MyCluster:9092" , "database.history.kafka.topic" : "MyHistoryTopic" , "database.server.name" : "MyServer" , "message.key.columns" : "MyDb.dbo.MyTable:Column1,Column2;" }

      I am using the SQL connector to capture CDC on a table that we only expose a subset of all columns on the table. The table has two unique indexes A & B on it. Neither index is marked as the PRIMARY INDEX but index A is logically the primary key in our product and what I want to use with the connector. Index B references a column we don't expose to CDC. Index B isn't truly used in our product as a unique key for the table and it is only marked UNIQUE as it is known to be unique and marking it gives us a performance benefit. I am using the SQL connector to capture CDC on a table that we only expose a subset of all columns on the table. The table has two unique indexes A & B on it. Neither index is marked as the PRIMARY INDEX but index A is logically the primary key in our product and what I want to use with the connector. Index B references a column we don't expose to CDC. Index B isn't truly used in our product as a unique key for the table and it is only marked UNIQUE as it is known to be unique and marking it gives us a performance benefit.
      This seems to be resulting in the error below. I've tried using the `message.key.columns` option on the connector to specify index A as the key for this table and hopefully ignore index B. However, the connector seems to still want to do something with index B

      TRACE:
      org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped. at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:290) at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:152) at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:119) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834)Caused by: java.lang.IllegalArgumentException: The column "Column4" is referenced as PRIMARY KEY, but a matching column is not defined in table "MyDb.dbo.MyTable"! at io.debezium.relational.TableEditorImpl.lambda$updatePrimaryKeys$0(TableEditorImpl.java:105) at java.base/java.util.ArrayList.removeIf(ArrayList.java:1702) at java.base/java.util.ArrayList.removeIf(ArrayList.java:1690) at io.debezium.relational.TableEditorImpl.updatePrimaryKeys(TableEditorImpl.java:101) at io.debezium.relational.TableEditorImpl.create(TableEditorImpl.java:254) at io.debezium.connector.sqlserver.SqlServerConnection.getTableSchemaFromTable(SqlServerConnection.java:428) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.getCdcTablesToQuery(SqlServerStreamingChangeEventSource.java:378) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:121)

              jpechane Jiri Pechanec
              dude0001 Mark Lambert (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

                Created:
                Updated:
                Resolved: