Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-3757

message.key.columns with column.include.list causes Data row is smaller than a column index, internal schema representation is probably out of sync with real database schema

    XMLWordPrintable

Details

    • Bug
    • Resolution: Unresolved
    • Minor
    • 1.9-backlog
    • 1.3.0.Final, 1.4.0.Final, 1.5.0.Final, 1.6.0.Final
    • sqlserver-connector
    • None
    • False
    • False
    • Undefined
    • Hide

      Setup:

      • Kafka version: 2.3.0
      • Working with debezium 1.2.0 and lower
      • Broken with debezium 1.3.0 and older.
      • SQL server version: 14.0.3257.3

       

      Steps to reproduce:

      Create connector configuration with "column.include.list" parameter ans specify some first columns.

      Add "message.key.columns" parameter and specify columns located after colums specified in "column.include.list" parameter.

       

       

      Show
      Setup: Kafka version: 2.3.0 Working with debezium 1.2.0 and lower Broken with debezium 1.3.0 and older. SQL server version: 14.0.3257.3   Steps to reproduce: Create connector configuration with "column.include.list" parameter ans specify some first columns. Add "message.key.columns" parameter and specify columns located after colums specified in "column.include.list" parameter.    

    Description

      Hello!

      We've recently upgraded from 1.2.0 Final version to 1.4.1 and faced an error with creating new connectors on our system:

      oorg.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
      org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped. at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42) at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:121) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
      Caused by: io.debezium.DebeziumException: org.apache.kafka.connect.errors.ConnectException: Data row is smaller than a column index, internal schema representation is probably out of sync with real database schema at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:82) at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:105) 
      ... 5 more
      Caused by: org.apache.kafka.connect.errors.ConnectException: Data row is smaller than a column index, internal schema representation is probably out of sync with real database schema at io.debezium.relational.TableSchemaBuilder.validateIncomingRowToInternalMetadata(TableSchemaBuilder.java:221) at io.debezium.relational.TableSchemaBuilder.lambda$createKeyGenerator$3(TableSchemaBuilder.java:183) at io.debezium.relational.TableSchema.keyFromColumnData(TableSchema.java:130) at io.debezium.relational.RelationalChangeRecordEmitter.emitReadRecord(RelationalChangeRecordEmitter.java:81) at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:48) at io.debezium.pipeline.EventDispatcher.dispatchSnapshotEvent(EventDispatcher.java:151) at io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEventsForTable(RelationalSnapshotChangeEventSource.java:348) at io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEvents(RelationalSnapshotChangeEventSource.java:284) at io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:137) at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:71) 
      ... 6 more
      

       

      Connector configuration is looking like:

      {
        "name" : "dbz-mssql-navision-prod-store124-v4",
        "config": {
          "column.include.list": "dbo.Test\\$Warehouse\\sActivity\\sLine.Line\\sNo_,dbo.Test\\$Warehouse\\sActivity\\sLine.Qty_\\sOutstanding\\s\\(Base\\)",
          "column.propagate.source.type": ".*",
          "column.truncate.to.15.chars": "dbo.Test\\$Warehouse\\sActivity\\sLine.Qty_\\sOutstanding\\s\\(Base\\)",
          "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
          "database.applicationName": "debezium-str",
          "database.dbname": "Test_124",
          "database.history.kafka.bootstrap.servers": "server1.test.com:9092,server2.test.com:9092,server3.test.com:9092",
          "database.history.kafka.topic": "__history_store124_test",
          "database.hostname": "nav.test.com",
          "database.password": "test",
          "database.port": "1446",
          "database.server.name": "test.nav",
          "database.user": "test",
          "include.schema.changes": "false",
          "max.batch.size": "8192",
          "max.queue.size": "20480",
          "message.key.columns": "dbo.Test\\$Warehouse\\sActivity\\sLine:Location\\sCode,Zone\\sCode,Item\\sNo_,No_,Activity\\sType,Action\\sType",
          "poll.interval.ms": "3000",
          "sanitize.field.names": "true",
          "snapshot.delay.ms": "2000",
          "snapshot.isolation.mode": "read_committed",
          "snapshot.mode": "initial",
          "table.whitelist": "dbo.Test\\$Warehouse\\sActivity\\sLine",
          "time.precision.mode": "connect",
          "tombstones.on.delete": "false",
          "transforms": "Combine",
          "transforms.Combine.key.field.name": "Table",
          "transforms.Combine.key.field.regex": "(.*)dbo.(.*)",
          "transforms.Combine.key.field.replacement": "$2",
          "transforms.Combine.topic.regex": "(.*)dbo(.*)",
          "transforms.Combine.topic.replacement": "testtest",
          "transforms.Combine.type": "io.debezium.transforms.ByLogicalTableRouter"
        }
      }

       

      Turned on debug level and got following messages:

      ERROR Error requesting a row value, row: 23, requested index: 42 at position 4 (io.debezium.relational.TableSchemaBuilder:220)
      [2021-07-13 16:16:19,720] TRACE columnData from current stack: [null, null, null, 1000, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, 56.00000000000000000000] (io.debezium.relational.TableSchema:128)
      

       

      So looks like plugin is extracting only fields described in "column.include.list" parameter and is forming a row structure from 1st row till 23d row filled with 'nulls' for not included columns  and with real values in included columns.

      But while forming message key, we request some columns beyond 23d position:

      "Zone Code" column is 43d column in original table (counting from 1), so i guess array index is starting from 0, so it should match.

      As a workaround, we've added colums described in "message.key.columns" into "column.include.list" and it worked fine, but message increased in size.

      Checked with 1.5, 1.6 debezium plugin versions and reproduced an error.

      So we've downgraded plugin back to 1.2.0 version.

       

      Could you please help?

      Thank you.

      Attachments

        Activity

          People

            Unassigned Unassigned
            hepebar Max Max (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: