Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-1778

Connector fails when performing a Hot Schema Update in SQLServer (Data row is smaller than a column index).

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Done
    • Icon: Critical Critical
    • 1.1.0.CR1
    • 1.0.0.Final
    • sqlserver-connector
    • None
    • Hide

      _Steps to reproduce using the debezium tutorial for MSSQL Server _
      (https://github.com/debezium/debezium-examples/tree/master/tutorial#using-sql-server)

      1) Launch stack

      export DEBEZIUM_VERSION=1.0
      docker-compose -f docker-compose-sqlserver.yaml up -d
      

      2) Initialize the database

      • Execute 'debezium-sqlserver-init/inventory.sql' within SSMS or VSCode

      3) Start the connector

      curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-sqlserver.json
      

      4) Consume Messages

      docker-compose -f docker-compose-sqlserver.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
          --bootstrap-server kafka:9092 \
          --property print.key=true \
          --topic server1.dbo.customers
      

      5) Monitor the connector

      docker-compose -f docker-compose-sqlserver.yaml logs -f --tail=0 connect
      

      6) Add some rows

      SET NOCOUNT ON;
      USE testDB;
      
      GO
      
      DECLARE @i INT = 0;
      DECLARE @iMax INT = 3;
      
      WHILE @i < @iMax
      BEGIN
          DECLARE @fn VARCHAR(50) = newid();
          DECLARE @ln VARCHAR(50) = newid();
          DECLARE @em VARCHAR(50) = newid();
      
          INSERT INTO customers(first_name,last_name,email)
          VALUES (@fn,@ln,@em);
          SET @i += 1;
      END;
      

      7) Simulate continuous writes during a schema change by executing the following in a separate session:

      SET NOCOUNT ON;
      USE testDB;
      
      GO
      
      DECLARE @i INT = 0;
      DECLARE @iMax INT = 200;
      
      WHILE @i < @iMax
      BEGIN
          
          WAITFOR DELAY '00:00:01';
      
          DECLARE @fn VARCHAR(50) = newid();
          DECLARE @ln VARCHAR(50) = newid();
          DECLARE @em VARCHAR(50) = newid();
      
          INSERT INTO customers(first_name,last_name,email)
          VALUES (@fn,@ln,@em);
          SET @i += 1;
      
      END;
      

      New rows are captured as expected

      8) While #7 is executing apply schema changes and enable a new CDC instance.

      USE testdb;
      GO
      ALTER TABLE customers ADD phone_number VARCHAR(32);
      EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'customers', @role_name = NULL, @supports_net_changes = 0, @capture_instance = 'dbo_customers_v2';
      

      Once step #8 completes the job will fail. On occasion the job will manage to make it through without error if timings are just right but most times the failure is encountered.

      Show
      _Steps to reproduce using the debezium tutorial for MSSQL Server _ ( https://github.com/debezium/debezium-examples/tree/master/tutorial#using-sql-server ) 1) Launch stack export DEBEZIUM_VERSION=1.0 docker-compose -f docker-compose-sqlserver.yaml up -d 2) Initialize the database Execute 'debezium-sqlserver-init/inventory.sql' within SSMS or VSCode 3) Start the connector curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-sqlserver.json 4) Consume Messages docker-compose -f docker-compose-sqlserver.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \ --bootstrap-server kafka:9092 \ --property print.key=true \ --topic server1.dbo.customers 5) Monitor the connector docker-compose -f docker-compose-sqlserver.yaml logs -f --tail=0 connect 6) Add some rows SET NOCOUNT ON ; USE testDB; GO DECLARE @i INT = 0; DECLARE @iMax INT = 3; WHILE @i < @iMax BEGIN DECLARE @fn VARCHAR (50) = newid(); DECLARE @ ln VARCHAR (50) = newid(); DECLARE @em VARCHAR (50) = newid(); INSERT INTO customers(first_name,last_name,email) VALUES (@fn,@ ln ,@em); SET @i += 1; END ; 7) Simulate continuous writes during a schema change by executing the following in a separate session: SET NOCOUNT ON ; USE testDB; GO DECLARE @i INT = 0; DECLARE @iMax INT = 200; WHILE @i < @iMax BEGIN WAITFOR DELAY '00:00:01' ; DECLARE @fn VARCHAR (50) = newid(); DECLARE @ ln VARCHAR (50) = newid(); DECLARE @em VARCHAR (50) = newid(); INSERT INTO customers(first_name,last_name,email) VALUES (@fn,@ ln ,@em); SET @i += 1; END ; New rows are captured as expected 8) While #7 is executing apply schema changes and enable a new CDC instance. USE testdb; GO ALTER TABLE customers ADD phone_number VARCHAR (32); EXEC sys.sp_cdc_enable_table @source_schema = 'dbo' , @source_name = 'customers' , @role_name = NULL , @supports_net_changes = 0, @capture_instance = 'dbo_customers_v2' ; Once step #8 completes the job will fail. On occasion the job will manage to make it through without error if timings are just right but most times the failure is encountered.

      The connector will fail with "Data row is smaller than a column index..." when performing a Hot Schema Update.

      Modified from original post here:
      https://groups.google.com/forum/#!topic/debezium/qnp4GHuSXEU

      Thank you for your time and support.


      Failure details (after completing step 8 in the steps to reproduce).
      GET `http://localhost:8083/connectors/inventory-connector/status`

      {
          "name": "inventory-connector",
          "connector": {
              "state": "RUNNING",
              "worker_id": "172.18.0.5:8083"
          },
          "tasks": [
              {
                  "id": 0,
                  "state": "FAILED",
                  "worker_id": "172.18.0.5:8083",
                  "trace": "org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.\n\tat io.debezium.connector.base.ChangeEventQueue.throwProducerFailureIfPresent(ChangeEventQueue.java:171)\n\tat io.debezium.connector.base.ChangeEventQueue.poll(ChangeEventQueue.java:151)\n\tat io.debezium.connector.sqlserver.SqlServerConnectorTask.poll(SqlServerConnectorTask.java:162)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:265)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: org.apache.kafka.connect.errors.ConnectException: Data row is smaller than a column index, internal schema representation is probably out of sync with real database schema\n\tat io.debezium.relational.TableSchemaBuilder.validateIncomingRowToInternalMetadata(TableSchemaBuilder.java:219)\n\tat io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:248)\n\tat io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:143)\n\tat io.debezium.relational.RelationalChangeRecordEmitter.emitCreateRecord(RelationalChangeRecordEmitter.java:68)\n\tat io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:43)\n\tat io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:145)\n\tat io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$execute$1(SqlServerStreamingChangeEventSource.java:243)\n\tat io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:504)\n\tat io.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:172)\n\tat io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:160)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:99)\n\t... 5 more\n"
              }
          ],
          "type": "source"
      }
      
      

              jpechane Jiri Pechanec
              jason.kososki Jason Kososki (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

                Created:
                Updated:
                Resolved: