Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-7828

Sql Server incorrectly applying quoted snapshot statement overrides

XMLWordPrintable

      It looks like when the SnapshottingTask is created, it is fed a filterQueries map that contains unescaped table identifiers. This means that for SQL Server, if square brackets are used, then they are stripped out and when RelationalSnapshotChangeEventSource#doExecute calls snapshottingTask.getFilterQueries() -> TableId.parse we get an error if the table identifier has a character that needs to be escaped, such as a space.

       

      What Debezium connector do you use and what version?

      SQL Server 2.6.1

      What is the connector configuration?

       

      connector.class = io.debezium.connector.sqlserver.SqlServerConnector
      include.schema.changes = false
      schema.history.internal.store.only.captured.tables.ddl = true
      schema.history.internal.file.filename = /path/to/dat
      tombstones.on.delete = false
      topic.prefix = Northwind
      offset.storage.file.filename = /path/to/dat
      errors.retry.delay.initial.ms = 30000
      snapshot.select.statement.overrides.[dbo].[Order Details] = SELECT * FROM [Northwind].[dbo].[Order Details] WHERE Quantity > 10
      value.converter = org.apache.kafka.connect.json.JsonConverter
      key.converter = org.apache.kafka.connect.json.JsonConverter
      database.databaseName = Northwind
      database.user = admin
      database.names = Northwind
      offset.storage = org.apache.kafka.connect.storage.FileOffsetBackingStore
      snapshot.select.statement.overrides = [dbo].[Order Details]
      time.precision.mode = connect
      errors.retry.delay.max.ms = 60000
      offset.flush.timeout.ms = 5000
      event.processing.failure.handling.mode = fail
      offset.flush.interval.ms = 0
      schema.history.internal = io.debezium.storage.file.history.FileSchemaHistory
      errors.max.retries = 6
      database.hostname = hostname
      database.password = ********
      name = foobar
      task.id = 0
      table.include.list = ^dbo.Categories$,^dbo.CustomerDemographics$,^dbo.Customers$,^dbo.EmployeeTerritories$,^dbo.Employees$,^dbo.Northwind.Demo$,^dbo.Order Details$,^dbo.Orders$,^dbo.Products$,^dbo.Region$,^dbo.Shippers$,^dbo.Suppliers$,^dbo.Territories$
      database.trustServerCertificate = true
      snapshot.mode = initial_only
      
      
      

       

       

      What is the captured database version and mode of depoyment?

      SQL Server on Docker

      What behaviour do you expect?

      A snapshot override statement can successfully be provided for SQL Server.

      What behaviour do you see?

       

      io.debezium.DebeziumException: java.lang.IllegalArgumentException: Unexpected input: D
          at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:103)
          at io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:253)
          at io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:237)
          at io.debezium.connector.sqlserver.SqlServerChangeEventSourceCoordinator.executeChangeEventSources(SqlServerChangeEventSourceCoordinator.java:84)
          at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:140)
          at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
          at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
          at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
          at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
          at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
          at java.base/java.lang.Thread.run(Thread.java:829)
      Caused by: java.lang.IllegalArgumentException: Unexpected input: D
          at io.debezium.relational.TableIdParser$ParsingState$3.handleCharacter(TableIdParser.java:154)
          at io.debezium.relational.TableIdParser$TableIdTokenizer.tokenize(TableIdParser.java:71)
          at io.debezium.text.TokenStream.start(TokenStream.java:446)
          at io.debezium.relational.TableIdParser.parse(TableIdParser.java:36)
          at io.debezium.relational.TableIdParser.parse(TableIdParser.java:30)
          at io.debezium.relational.TableId.parseParts(TableId.java:80)
          at io.debezium.relational.TableId.parse(TableId.java:54)
          at io.debezium.relational.TableId.parse(TableId.java:30)
          at io.debezium.relational.RelationalSnapshotChangeEventSource.lambda$doExecute$0(RelationalSnapshotChangeEventSource.java:122)
          at java.base/java.util.stream.Collectors.lambda$uniqKeysMapAccumulator$1(Collectors.java:177)
          at java.base/java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
          at java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1764)
          at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
          at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
          at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
          at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
          at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
          at io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:122)
      
      
      

       

       

      Do you see the same behaviour using the latest relesead Debezium version?

      Yes

      Do you have the connector logs, ideally from start till finish?

      Yup

      Implementation ideas (optional)

      Not sure why SnapshottingTask has a 

      private final Map<String, String> filterQueries; 

      perhaps it needs to use:

      private final Map<DataCollectionId, String> filterQueries; 

      so that the re-parsing using TableId.parse doesn't take place in RelationalSnapshotChangeEventSource#doExecute. Otherwise, the strings need to be quoted properly and we need to know that we're dealing with SQL Server so that a  new SqlServerTableIdPredicates() can be passed in there as well.

              rh-ee-mvitale Mario Fiore Vitale
              mark.bereznitsky Mark Bereznitsky (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

                Created:
                Updated:
                Resolved: