-
Bug
-
Resolution: Done
-
Blocker
-
0.9.0.Final
-
None
Our SQL Server tables which we are trying to replicate to Kafka have following pattern:
tenant name$table name
e.g.: UAT WAG CZ$Fixed Asset
But DBZ fails to work with them:
Unable to find source-code formatter for language: log. Available languages are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, perl, php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, yaml
[2019-02-19 08:47:24,956] ERROR WorkerSourceTask{id=cdc-dbz-nav4uat-02-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172) org.apache.kafka.connect.errors.ConnectException: An exception ocurred in the change event producer. This connector will be stopped. at io.debezium.connector.base.ChangeEventQueue.throwProducerFailureIfPresent(ChangeEventQueue.java:170) at io.debezium.connector.base.ChangeEventQueue.poll(ChangeEventQueue.java:151) at io.debezium.connector.sqlserver.SqlServerConnectorTask.poll(SqlServerConnectorTask.java:152) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:204) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: Incorrect syntax near 'CZ$Fixed'. at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:259) at com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1547) at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.doExecutePreparedStatement(SQLServerPreparedStatement.java:548) at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement$PrepStmtExecCmd.doExecute(SQLServerPreparedStatement.java:479) at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7344) at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:2713) at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:224) at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:204) at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.executeQuery(SQLServerPreparedStatement.java:401) at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:482) at io.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:143) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:130) at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:86) ... 5 more [2019-02-19 08:47:24,956] ERROR WorkerSourceTask{id=cdc-dbz-nav4uat-02-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:173)
After checking the source code, problem lies in SqlServerConnection.GET_ALL_CHANGES_FOR_TABLE, where square brackets are missing around function name. It can be changed to:
private static final String GET_ALL_CHANGES_FOR_TABLE = "SELECT * FROM cdc.[fn_cdc_get_all_changes_#](ISNULL(?,sys.fn_cdc_get_min_lsn('#')), ?, N'all update old')";
After locally changed query I went into another problem, apparently related to "special character" (space this time) too:
Unable to find source-code formatter for language: log. Available languages are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, perl, php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, yaml
[2019-02-19 09:15:31,157] ERROR WorkerSourceTask{id=cdc-dbz-nav4uat-03-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172) org.apache.kafka.connect.errors.ConnectException: An exception ocurred in the change event producer. This connector will be stopped. at io.debezium.connector.base.ChangeEventQueue.throwProducerFailureIfPresent(ChangeEventQueue.java:170) at io.debezium.connector.base.ChangeEventQueue.poll(ChangeEventQueue.java:151) at io.debezium.connector.sqlserver.SqlServerConnectorTask.poll(SqlServerConnectorTask.java:158) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:204) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ArrayIndexOutOfBoundsException: 2 at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$2(TableSchemaBuilder.java:210) at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:135) at io.debezium.relational.RelationalChangeRecordEmitter.emitUpdateRecord(RelationalChangeRecordEmitter.java:89) at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:46) at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:125) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$execute$1(SqlServerStreamingChangeEventSource.java:203) at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:485) at io.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:143) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:137) at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:86) ... 5 more [2019-02-19 09:15:31,157] ERROR WorkerSourceTask{id=cdc-dbz-nav4uat-03-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:173)
And after restarting Kafka Connect there is another ERROR log:
Unable to find source-code formatter for language: log. Available languages are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, perl, php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, yaml
[2019-02-19 09:17:38,172] ERROR WorkerSourceTask{id=cdc-dbz-nav4uat-03-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172) java.lang.IllegalArgumentException: Unexpected input: W at io.debezium.relational.TableIdParser$ParsingState$3.handleCharacter(TableIdParser.java:138) at io.debezium.relational.TableIdParser$TableIdTokenizer.tokenize(TableIdParser.java:57) at io.debezium.text.TokenStream.start(TokenStream.java:445) at io.debezium.relational.TableIdParser.parse(TableIdParser.java:28) at io.debezium.relational.TableId.parse(TableId.java:39) at io.debezium.relational.TableId.parse(TableId.java:26) at io.debezium.relational.history.TableChanges$TableChange.fromDocument(TableChanges.java:120) at io.debezium.relational.history.TableChanges.fromArray(TableChanges.java:43) at io.debezium.relational.history.AbstractDatabaseHistory.lambda$recover$1(AbstractDatabaseHistory.java:79) at io.debezium.relational.history.KafkaDatabaseHistory.recoverRecords(KafkaDatabaseHistory.java:238) at io.debezium.relational.history.AbstractDatabaseHistory.recover(AbstractDatabaseHistory.java:73) at io.debezium.relational.HistorizedRelationalDatabaseSchema.recover(HistorizedRelationalDatabaseSchema.java:43) at io.debezium.connector.sqlserver.SqlServerConnectorTask.start(SqlServerConnectorTask.java:98) at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:47) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:182) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) [2019-02-19 09:17:38,172] ERROR WorkerSourceTask{id=cdc-dbz-nav4uat-03-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:173)
Used configuration for DBZ connector:
{ "name": "cdc-dbz-nav4uat-03", "config": { "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", "tasks.max": "1", "database.hostname": "***", "database.port": "1433", "database.user": "***", "database.password": "***", "database.dbname": "NAV4_UAT", "database.server.name": "cdc-dbz-nav4uat", "table.whitelist": "dbo\\.UAT WAG CZ\\$Fixed Asset", "database.history.kafka.bootstrap.servers": "localhost:9991", "database.history.kafka.topic": "cdc-dbz-nav4uat.history" } }