Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-4273

SQL Server connector doesn't handle multiple capture instances for the same table with equal start LSN

    XMLWordPrintable

Details

    • Hide

      Use the attached SQL scripts to perform the steps below.

      1. Deploy the pipeline using the SQL Server example from the tutorial:
        env DEBEZIUM_VERSION=1.7 docker compose -f docker-compose-sqlserver.yaml up
      2. Create the test database, the table and the first capture instance:
        cat script1.sql | docker-compose -f docker-compose-sqlserver.yaml exec -T sqlserver bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'
      3. Start a connector:
        curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-sqlserver.json
      4. Start a consumer from the table topic. For simplicity, it will just display the keys:
        docker-compose -f docker-compose-sqlserver.yaml exec -T kafka /kafka/bin/kafka-console-consumer.sh \
            --bootstrap-server kafka:9092 \
            --from-beginning \
            --property print.key=true \
            --property print.value=false \
            --topic server1.dbo.SampleTable | jq .payload
        
      5. Observed the key inserted by the SQL script above:
        {
          "ID": 1
        }
        
      6. Temporarily delete the connector. This is needed to simulate the "bad timing". In real life, the same is possible if the connector is slightly behind the source.
        curl -X DELETE localhost:8083/connectors/inventory-connector
      7. Run the second part of the script. The details of what it does are in the script comments. It will take approximately 5 minutes to run, mostly just because of conservative delays:
        cat script2.sql | docker-compose -f docker-compose-sqlserver.yaml exec -T sqlserver bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'
      8. Note that both capture instances have the same start LSN. Below is the end of the script output for better readability:
        source_schema  source_table  capture_instance                         object_id  source_object_id start_lsn              end_lsn
        -------------- ------------- ---------------------------------------- ---------- ---------------- ---------------------- -------
        dbo            SampleTable   testDB_dbo_SampleTable                   1269579561 885578193        0x0000002500000DB00004 NULL
        dbo            SampleTable   testDB_dbo_SampleTable_AfterSchemaChange 1349579846 885578193        0x0000002500000DB00004 NULL
        
      9. Redeploy the connector
        curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-sqlserver.json
      10. Observe the output of the consumer started on step 4:
        {
          "ID": 1
        }
        {
          "ID": 3
        }
        

        Row with ID=2 is missing, but it's missing from both capture instances due to the cleanup which is correct.

      11. Depending on the state of the runtime specifically, the physical order of elements in the HashSet<> used by the connector, the connector may produce records 4 and 5 or fail with the following error:
        org.apache.kafka.connect.errors.ConnectException: Error while processing event at offset {transaction_id=null, event_serial_no=1, commit_lsn=00000025:00000ee8:0003, change_lsn=00000025:00000ee8:0002}
        	at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:255)
        	at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$execute$1(SqlServerStreamingChangeEventSource.java:270)
        	at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:609)
        	at io.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:279)
        	at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:170)
        	at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:59)
        	at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:166)
        	at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:127)
        	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        	at java.base/java.lang.Thread.run(Thread.java:829)
        Caused by: org.apache.kafka.connect.errors.ConnectException: Data row is smaller than a column index, internal schema representation is probably out of sync with real database schema
        	at io.debezium.relational.TableSchemaBuilder.validateIncomingRowToInternalMetadata(TableSchemaBuilder.java:221)
        	at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:250)
        	at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:141)
        	at io.debezium.relational.RelationalChangeRecordEmitter.emitCreateRecord(RelationalChangeRecordEmitter.java:70)
        	at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:46)
        	at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:218)
        	... 12 more
        

      If the issue isn't reproducible, the following steps could be added:

      1. Note down the connector offsets after step 6 above:
        docker-compose -f docker-compose-sqlserver.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
            --bootstrap-server kafka:9092 \
            --from-beginning \
            --property print.key=true \
            --property key.separator=\| \
            --topic my_connect_offsets
        ["inventory-connector",{"server":"server1"}]|{"commit_lsn":"00000026:00000678:002e","snapshot":true,"snapshot_completed":true}
      2. After attempting to reproduce the issue, stop the connector:
        curl -X DELETE localhost:8083/connectors/inventory-connector
      3. Restore connector offsets
        echo '["inventory-connector",{"server":"server1"}]|{"commit_lsn":"00000026:00000d60:00a2","snapshot":true,"snapshot_completed":true}' | docker-compose -f docker-compose-sqlserver.yaml exec -T kafka /kafka/bin/kafka-console-producer.sh \
            --broker-list kafka:9092 \
            --property parse.key=true \
            --property key.separator=\| \
            --topic my_connect_offsets
      4. Redeploy the connector
        curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-sqlserver.json
      5. If still not reproducible GOTO 2.
      Show
      Use the attached SQL scripts to perform the steps below. Deploy the pipeline using the SQL Server example from the tutorial: env DEBEZIUM_VERSION=1.7 docker compose -f docker-compose-sqlserver.yaml up Create the test database, the table and the first capture instance: cat script1.sql | docker-compose -f docker-compose-sqlserver.yaml exec -T sqlserver bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD' Start a connector: curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http: //localhost:8083/connectors/ -d @register-sqlserver.json Start a consumer from the table topic. For simplicity, it will just display the keys: docker-compose -f docker-compose-sqlserver.yaml exec -T kafka /kafka/bin/kafka-console-consumer.sh \ --bootstrap-server kafka:9092 \ --from-beginning \ --property print.key= true \ --property print.value= false \ --topic server1.dbo.SampleTable | jq .payload Observed the key inserted by the SQL script above: { "ID" : 1 } Temporarily delete the connector. This is needed to simulate the "bad timing". In real life, the same is possible if the connector is slightly behind the source. curl -X DELETE localhost:8083/connectors/inventory-connector Run the second part of the script. The details of what it does are in the script comments. It will take approximately 5 minutes to run, mostly just because of conservative delays: cat script2.sql | docker-compose -f docker-compose-sqlserver.yaml exec -T sqlserver bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD' Note that both capture instances have the same start LSN. Below is the end of the script output for better readability: source_schema source_table capture_instance object_id source_object_id start_lsn end_lsn -------------- ------------- ---------------------------------------- ---------- ---------------- ---------------------- ------- dbo SampleTable testDB_dbo_SampleTable 1269579561 885578193 0x0000002500000DB00004 NULL dbo SampleTable testDB_dbo_SampleTable_AfterSchemaChange 1349579846 885578193 0x0000002500000DB00004 NULL Redeploy the connector curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http: //localhost:8083/connectors/ -d @register-sqlserver.json Observe the output of the consumer started on step 4: { "ID" : 1 } { "ID" : 3 } Row with ID=2 is missing, but it's missing from both capture instances due to the cleanup which is correct. Depending on the state of the runtime specifically, the physical order of elements in the HashSet<> used by the connector, the connector may produce records 4 and 5 or fail with the following error: org.apache.kafka.connect.errors.ConnectException: Error while processing event at offset {transaction_id=null, event_serial_no=1, commit_lsn=00000025:00000ee8:0003, change_lsn=00000025:00000ee8:0002} at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:255) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$execute$1(SqlServerStreamingChangeEventSource.java:270) at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:609) at io.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:279) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:170) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:59) at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:166) at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:127) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.kafka.connect.errors.ConnectException: Data row is smaller than a column index, internal schema representation is probably out of sync with real database schema at io.debezium.relational.TableSchemaBuilder.validateIncomingRowToInternalMetadata(TableSchemaBuilder.java:221) at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:250) at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:141) at io.debezium.relational.RelationalChangeRecordEmitter.emitCreateRecord(RelationalChangeRecordEmitter.java:70) at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:46) at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:218) ... 12 more If the issue isn't reproducible, the following steps could be added: Note down the connector offsets after step 6 above: docker-compose -f docker-compose-sqlserver.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \ --bootstrap-server kafka:9092 \ --from-beginning \ --property print.key= true \ --property key.separator=\| \ --topic my_connect_offsets [ "inventory-connector" ,{ "server" : "server1" }]|{ "commit_lsn" : "00000026:00000678:002e" , "snapshot" : true , "snapshot_completed" : true } After attempting to reproduce the issue, stop the connector: curl -X DELETE localhost:8083/connectors/inventory-connector Restore connector offsets echo '[ "inventory-connector" ,{ "server" : "server1" }]|{ "commit_lsn" : "00000026:00000d60:00a2" , "snapshot" : true , "snapshot_completed" : true }' | docker-compose -f docker-compose-sqlserver.yaml exec -T kafka /kafka/bin/kafka-console-producer.sh \ --broker-list kafka:9092 \ --property parse.key= true \ --property key.separator=\| \ --topic my_connect_offsets Redeploy the connector curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http: //localhost:8083/connectors/ -d @register-sqlserver.json If still not reproducible GOTO 2 .

    Description

      If two capture instances exist for a table, the connector determines the sequence of their usage by their start LSN. If the LSNs are the same, the result of choosing which of the two is current and which is future is undefined (see SqlServerStreamingChangeEventSource#getCdcTablesToQuery) and may lead to the connector using the schema of one capture instance while reading data from another which will result in a task failure.

      Potential design issues

      According to the documentation, a capture instance's start LSN is a:

      Log sequence number (LSN) representing the low endpoint for querying the change table.

      It doesn't determine the order in which multiple capture instances were created and/or should be used but the connector uses the start LSN for ordering capture instances. A CDC cleanup job may update the start LSN of multiple co-existing capture instances.

      Additionally (not strictly related but worth mentioning) the end LSN is an:

      LSN representing the high endpoint for querying the change table. In SQL Server 2012 (11.x), this column is always NULL.

      It looks like the connector shouldn't use the end LSN but it does.

      Technical details

      Given the actual semantics of the start LSN, after a CDC cleanup job completes, it's very much likely that both capture instances of a table will contain the same start LSN which will cause the above issue.

      Attachments

        1. script2.sql
          2 kB
        2. script1.sql
          0.8 kB

        Activity

          People

            Unassigned Unassigned
            sergeimorozov Sergei Morozov
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: