-
Bug
-
Resolution: Done
-
Major
-
None
-
None
-
False
-
None
-
False
It looks like when the SnapshottingTask is created, it is fed a filterQueries map that contains unescaped table identifiers. This means that for SQL Server, if square brackets are used, then they are stripped out and when RelationalSnapshotChangeEventSource#doExecute calls snapshottingTask.getFilterQueries() -> TableId.parse we get an error if the table identifier has a character that needs to be escaped, such as a space.
What Debezium connector do you use and what version?
SQL Server 2.6.1
What is the connector configuration?
connector.class = io.debezium.connector.sqlserver.SqlServerConnector include.schema.changes = false schema.history.internal.store.only.captured.tables.ddl = true schema.history.internal.file.filename = /path/to/dat tombstones.on.delete = false topic.prefix = Northwind offset.storage.file.filename = /path/to/dat errors.retry.delay.initial.ms = 30000 snapshot.select.statement.overrides.[dbo].[Order Details] = SELECT * FROM [Northwind].[dbo].[Order Details] WHERE Quantity > 10 value.converter = org.apache.kafka.connect.json.JsonConverter key.converter = org.apache.kafka.connect.json.JsonConverter database.databaseName = Northwind database.user = admin database.names = Northwind offset.storage = org.apache.kafka.connect.storage.FileOffsetBackingStore snapshot.select.statement.overrides = [dbo].[Order Details] time.precision.mode = connect errors.retry.delay.max.ms = 60000 offset.flush.timeout.ms = 5000 event.processing.failure.handling.mode = fail offset.flush.interval.ms = 0 schema.history.internal = io.debezium.storage.file.history.FileSchemaHistory errors.max.retries = 6 database.hostname = hostname database.password = ******** name = foobar task.id = 0 table.include.list = ^dbo.Categories$,^dbo.CustomerDemographics$,^dbo.Customers$,^dbo.EmployeeTerritories$,^dbo.Employees$,^dbo.Northwind.Demo$,^dbo.Order Details$,^dbo.Orders$,^dbo.Products$,^dbo.Region$,^dbo.Shippers$,^dbo.Suppliers$,^dbo.Territories$ database.trustServerCertificate = true snapshot.mode = initial_only
What is the captured database version and mode of depoyment?
SQL Server on Docker
What behaviour do you expect?
A snapshot override statement can successfully be provided for SQL Server.
What behaviour do you see?
io.debezium.DebeziumException: java.lang.IllegalArgumentException: Unexpected input: D at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:103) at io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:253) at io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:237) at io.debezium.connector.sqlserver.SqlServerChangeEventSourceCoordinator.executeChangeEventSources(SqlServerChangeEventSourceCoordinator.java:84) at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:140) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.IllegalArgumentException: Unexpected input: D at io.debezium.relational.TableIdParser$ParsingState$3.handleCharacter(TableIdParser.java:154) at io.debezium.relational.TableIdParser$TableIdTokenizer.tokenize(TableIdParser.java:71) at io.debezium.text.TokenStream.start(TokenStream.java:446) at io.debezium.relational.TableIdParser.parse(TableIdParser.java:36) at io.debezium.relational.TableIdParser.parse(TableIdParser.java:30) at io.debezium.relational.TableId.parseParts(TableId.java:80) at io.debezium.relational.TableId.parse(TableId.java:54) at io.debezium.relational.TableId.parse(TableId.java:30) at io.debezium.relational.RelationalSnapshotChangeEventSource.lambda$doExecute$0(RelationalSnapshotChangeEventSource.java:122) at java.base/java.util.stream.Collectors.lambda$uniqKeysMapAccumulator$1(Collectors.java:177) at java.base/java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) at java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1764) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) at io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:122)
Do you see the same behaviour using the latest relesead Debezium version?
Yes
Do you have the connector logs, ideally from start till finish?
Yup
Implementation ideas (optional)
Not sure why SnapshottingTask has a
private final Map<String, String> filterQueries;
perhaps it needs to use:
private final Map<DataCollectionId, String> filterQueries;
so that the re-parsing using TableId.parse doesn't take place in RelationalSnapshotChangeEventSource#doExecute. Otherwise, the strings need to be quoted properly and we need to know that we're dealing with SQL Server so that a new SqlServerTableIdPredicates() can be passed in there as well.
- links to
-
RHEA-2024:139598 Red Hat build of Debezium 2.5.4 release