Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-3857

SQL Server Connector finds tables for streaming but not snapshot

XMLWordPrintable

    • False
    • False
    • undefined
    • Hide

      This is purely a testing setup - running dockerized SQL Server and Debezium:

      # Use root/example as user/password credentials
      version: '3.1'
      services:
        sql_server:
          image: mcr.microsoft.com/mssql/server:2017-latest
          environment:
            - SA_PASSWORD=Pass@word
            - ACCEPT_EULA=Y
            - MSSQL_AGENT_ENABLED=True
          ports:
            - "5434:1433" 

      Script to create table:

      CREATE DATABASE test_sql_server
      GO
      USE test_sql_server
      GO
      CREATE TABLE customers (id int identity primary key, foo text)
      GO
      INSERT INTO customers (foo) VALUES ('hello world');
      GO
      INSERT INTO customers (foo) VALUES ('hello world again');
      GO
      INSERT INTO customers (foo) VALUES ('hello world again and again');
      GO
      EXEC sys.sp_cdc_enable_db
      GO
      EXEC sys.sp_cdc_enable_table
      @source_schema = N'dbo',
      @source_name   = N'customers',
      @role_name     = NULL
      GO 

      My connector config:

      {
          "config": {
              "binary.handling.mode": "base64",
              "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
              "database.dbname": "test_sql_server",
              "database.history.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${file:/etc/kafka/secrets/connect.properties:WORKER_USERNAME}\" password=\"${file:/etc/kafka/secrets/connect.properties:WORKER_PASSWORD}\";",
              "database.history.consumer.sasl.mechanism": "PLAIN",
              "database.history.consumer.security.protocol": "SASL_SSL",
              "database.history.consumer.ssl.endpoint.identification.algorithm": "https",
              "database.history.kafka.bootstrap.servers": "<hidden>:9092",
              "database.history.kafka.topic": "dbhistory.test_sql_server",
              "database.history.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${file:/etc/kafka/secrets/connect.properties:WORKER_USERNAME}\" password=\"${file:/etc/kafka/secrets/connect.properties:WORKER_PASSWORD}\";",
              "database.history.producer.sasl.mechanism": "PLAIN",
              "database.history.producer.security.protocol": "SASL_SSL",
              "database.history.producer.ssl.endpoint.identification.algorithm": "https",
              "database.hostname": "10.220.20.184", # localhost
              "database.password": "${file:/etc/kafka/secrets/connect.properties:DB_PASSWORD}",
              "database.port": "5434",
              "database.server.name": "test_sql_server",
              "database.user": "${file:/etc/kafka/secrets/connect.properties:DB_USERNAME}",
              "datatype.propagate.source.type": ".+\\.decimal,.+\\.double,.+\\.float,.+\\.bytes,.+\\.binary,.+\\.varbinary",
              "decimal.handling.mode": "string",
              "heartbeat.interval.ms": "10000",
              "key.converter": "io.confluent.connect.avro.AvroConverter",
              "key.converter.basic.auth.credentials.source": "USER_INFO",
              "key.converter.basic.auth.user.info": "${file:/etc/kafka/secrets/connect.properties:SR_USERNAME}:${file:/etc/kafka/secrets/connect.properties:SR_PASSWORD}",
              "key.converter.schema.registry.url": "<hidden>",
              "max.batch.size": "100",
              "max.queue.size": "200",
              "name": "test_sql_server:debezium_sql_server",
              "snapshot.fetch.size": "10",
              "snapshot.include.collection.list": "dbo.customers",
              "snapshot.mode": "initial",
              "table.include.list": "^dbo.customers$",
              "tasks.max": "1",
              "value.converter": "io.confluent.connect.avro.AvroConverter",
              "value.converter.basic.auth.credentials.source": "USER_INFO",
              "value.converter.basic.auth.user.info": "${file:/etc/kafka/secrets/connect.properties:SR_USERNAME}:${file:/etc/kafka/secrets/connect.properties:SR_PASSWORD}",
              "value.converter.schema.registry.url": "<hidden>"
          },
          "name": "test_sql_server:debezium_sql_server",
          "tasks": [],
          "type": "source"
      } 

      I even tried adding snapshot.include.collection.list as you see but as per the docs, I shouldn't need to do that.

      Show
      This is purely a testing setup - running dockerized SQL Server and Debezium: # Use root/example as user/password credentials version: '3.1' services: sql_server: image: mcr.microsoft.com/mssql/server:2017-latest environment: - SA_PASSWORD=Pass@word - ACCEPT_EULA=Y - MSSQL_AGENT_ENABLED=True ports: - "5434:1433" Script to create table: CREATE DATABASE test_sql_server GO USE test_sql_server GO CREATE TABLE customers (id int identity primary key, foo text) GO INSERT INTO customers (foo) VALUES ( 'hello world' ); GO INSERT INTO customers (foo) VALUES ( 'hello world again' ); GO INSERT INTO customers (foo) VALUES ( 'hello world again and again' ); GO EXEC sys.sp_cdc_enable_db GO EXEC sys.sp_cdc_enable_table @source_schema = N 'dbo' , @source_name = N 'customers' , @role_name = NULL GO My connector config: { "config" : { "binary.handling.mode" : "base64" , "connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector" , "database.dbname" : "test_sql_server" , "database.history.consumer.sasl.jaas.config" : "org.apache.kafka.common.security.plain.PlainLoginModule required username=\" ${file:/etc/kafka/secrets/connect.properties:WORKER_USERNAME}\ " password=\" ${file:/etc/kafka/secrets/connect.properties:WORKER_PASSWORD}\ ";" , "database.history.consumer.sasl.mechanism" : "PLAIN" , "database.history.consumer.security.protocol" : "SASL_SSL" , "database.history.consumer.ssl.endpoint.identification.algorithm" : "https" , "database.history.kafka.bootstrap.servers" : "<hidden>:9092" , "database.history.kafka.topic" : "dbhistory.test_sql_server" , "database.history.producer.sasl.jaas.config" : "org.apache.kafka.common.security.plain.PlainLoginModule required username=\" ${file:/etc/kafka/secrets/connect.properties:WORKER_USERNAME}\ " password=\" ${file:/etc/kafka/secrets/connect.properties:WORKER_PASSWORD}\ ";" , "database.history.producer.sasl.mechanism" : "PLAIN" , "database.history.producer.security.protocol" : "SASL_SSL" , "database.history.producer.ssl.endpoint.identification.algorithm" : "https" , "database.hostname" : "10.220.20.184" , # localhost "database.password" : "${file:/etc/kafka/secrets/connect.properties:DB_PASSWORD}" , "database.port" : "5434" , "database.server.name" : "test_sql_server" , "database.user" : "${file:/etc/kafka/secrets/connect.properties:DB_USERNAME}" , "datatype.propagate.source.type" : ".+\\.decimal,.+\\. double ,.+\\. float ,.+\\.bytes,.+\\.binary,.+\\.varbinary" , "decimal.handling.mode" : "string" , "heartbeat.interval.ms" : "10000" , "key.converter" : "io.confluent.connect.avro.AvroConverter" , "key.converter.basic.auth.credentials.source" : "USER_INFO" , "key.converter.basic.auth.user.info" : "${file:/etc/kafka/secrets/connect.properties:SR_USERNAME}:${file:/etc/kafka/secrets/connect.properties:SR_PASSWORD}" , "key.converter.schema.registry.url" : "<hidden>" , "max.batch.size" : "100" , "max.queue.size" : "200" , "name" : "test_sql_server:debezium_sql_server" , "snapshot.fetch.size" : "10" , "snapshot.include.collection.list" : "dbo.customers" , "snapshot.mode" : "initial" , "table.include.list" : "^dbo.customers$" , "tasks.max" : "1" , "value.converter" : "io.confluent.connect.avro.AvroConverter" , "value.converter.basic.auth.credentials.source" : "USER_INFO" , "value.converter.basic.auth.user.info" : "${file:/etc/kafka/secrets/connect.properties:SR_USERNAME}:${file:/etc/kafka/secrets/connect.properties:SR_PASSWORD}" , "value.converter.schema.registry.url" : "<hidden>" }, "name" : "test_sql_server:debezium_sql_server" , "tasks" : [], "type" : "source" } I even tried adding snapshot.include.collection.list as you see but as per the docs, I shouldn't need to do that.

      I have a test table with 4 rows that is being picked up by CDC but is not being found in the snapshot phase.  The logs show that no changes will be captured for snapshot, then it finds the table for CDC.  Also, after I add 1 more row, a log message tells me that 4 records were sent during previous few seconds, which would seem to indicate I got snapshot records, when in fact I am not getting snapshot records in my kafka topic.

      connect_1  | 2021-08-11 20:53:41,526 INFO   SQL_Server|test_sql_server|snapshot  Snapshot step 1 - Preparing   [io.debezium.relational.RelationalSnapshotChangeEventSource]
      connect_1  | 2021-08-11 20:53:41,528 INFO   SQL_Server|test_sql_server|snapshot  Snapshot step 2 - Determining captured tables   [io.debezium.relational.RelationalSnapshotChangeEventSource]
      connect_1  | 2021-08-11 20:53:41,697 INFO   SQL_Server|test_sql_server|snapshot  Snapshot step 3 - Locking captured tables []   [io.debezium.relational.RelationalSnapshotChangeEventSource]
      connect_1  | 2021-08-11 20:53:41,697 INFO   SQL_Server|test_sql_server|snapshot  Setting locking timeout to 10 s   [io.debezium.connector.sqlserver.SqlServerSnapshotChangeEventSource]
      connect_1  | 2021-08-11 20:53:41,708 INFO   SQL_Server|test_sql_server|snapshot  Executing schema locking   [io.debezium.connector.sqlserver.SqlServerSnapshotChangeEventSource]
      connect_1  | 2021-08-11 20:53:41,708 INFO   SQL_Server|test_sql_server|snapshot  Snapshot step 4 - Determining snapshot offset   [io.debezium.relational.RelationalSnapshotChangeEventSource]
      connect_1  | 2021-08-11 20:53:41,719 INFO   SQL_Server|test_sql_server|snapshot  Snapshot step 5 - Reading structure of captured tables   [io.debezium.relational.RelationalSnapshotChangeEventSource]
      connect_1  | 2021-08-11 20:53:41,719 INFO   SQL_Server|test_sql_server|snapshot  Snapshot step 6 - Persisting schema history   [io.debezium.relational.RelationalSnapshotChangeEventSource]
      connect_1  | 2021-08-11 20:53:41,737 INFO   SQL_Server|test_sql_server|snapshot  Schema locks released.   [io.debezium.connector.sqlserver.SqlServerSnapshotChangeEventSource]
      connect_1  | 2021-08-11 20:53:41,737 INFO   SQL_Server|test_sql_server|snapshot  Snapshot step 7 - Snapshotting data   [io.debezium.relational.RelationalSnapshotChangeEventSource]
      connect_1  | 2021-08-11 20:53:41,738 INFO   SQL_Server|test_sql_server|snapshot  Snapshotting contents of 0 tables while still in transaction   [io.debezium.relational.RelationalSnapshotChangeEventSource]
      connect_1  | 2021-08-11 20:53:41,741 INFO   SQL_Server|test_sql_server|snapshot  Snapshot - Final stage   [io.debezium.pipeline.source.AbstractSnapshotChangeEventSource]
      connect_1  | 2021-08-11 20:53:41,744 INFO   SQL_Server|test_sql_server|snapshot  Removing locking timeout   [io.debezium.connector.sqlserver.SqlServerSnapshotChangeEventSource]
      connect_1  | 2021-08-11 20:53:41,750 INFO   SQL_Server|test_sql_server|snapshot  Snapshot ended with SnapshotResult [status=COMPLETED, offset=SqlServerOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.sqlserver.Source:STRUCT}, sourceInfo=SourceInfo [serverName=test_sql_server, changeLsn=NULL, commitLsn=00000026:00000238:0001, eventSerialNo=null, snapshot=FALSE, sourceTime=null], partition={server=test_sql_server}, snapshotCompleted=true, eventSerialNo=1]]   [io.debezium.pipeline.ChangeEventSourceCoordinator]
      connect_1  | 2021-08-11 20:53:41,750 WARN   SQL_Server|test_sql_server|snapshot  After applying the include/exclude list filters, no changes will be captured. Please check your configuration!   [io.debezium.relational.RelationalDatabaseSchema]
      connect_1  | 2021-08-11 20:53:41,750 INFO   SQL_Server|test_sql_server|streaming  Connected metrics set to 'true'   [io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics]
      connect_1  | 2021-08-11 20:53:41,751 INFO   SQL_Server|test_sql_server|streaming  Starting streaming   [io.debezium.pipeline.ChangeEventSourceCoordinator]
      connect_1  | 2021-08-11 20:53:41,751 INFO   SQL_Server|test_sql_server|streaming  No incremental snapshot in progress, no action needed on start   [io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotChangeEventSource]
      connect_1  | 2021-08-11 20:53:42,020 INFO   SQL_Server|test_sql_server|streaming  Table test_sql_server.dbo.customers is new to be monitored by capture instance dbo_customers   [io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource]
      connect_1  | 2021-08-11 20:53:43,194 INFO   SQL_Server|test_sql_server|streaming  Last position recorded in offsets is 00000026:00000238:0001(NULL)[1]   [io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource]
      connect_1  | 2021-08-11 20:54:09,012 INFO   ||  4 records sent during previous 00:00:29.0, last recorded offset: {transaction_id=null, event_serial_no=1, commit_lsn=00000026:00000270:0007, change_lsn=00000026:00000270:0006}   [io.debezium.connector.common.BaseSourceTask] 

      As you can see, it's only after it tells me "After applying the include/exclude list filters, no changes will be applied" that it then says "Table test_sql_server.dbo.customers is new to be monitored by capture instance dbo_customers".

      Any insight much appreciated!  Thank you!

       

            jpechane Jiri Pechanec
            jfinzel Jeremy Finzel (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

              Created:
              Updated:
              Resolved: