-
Bug
-
Resolution: Done
-
Major
-
0.9.0.Beta1
-
None
I'm trying to get the SQL Server connector to work but there seems to be some problems when connecting sinks to the topics produced by the source connector. The source connector crashes with the error message below.
The error:
[2018-11-29 15:57:33,288] ERROR Unexpected exception while processing record 'ConsumerRecord(topic = SQLSERVER01.history, partition = 0, offset = 25, CreateTime = 1543506974896, serialized key size = -1, serialized value size = 203, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = { "source" : { "server" : "SQLSERVER01" }, "position" : { "snapshot" : true, "snapshot_completed" : true }, "databaseName" : "DATABASE01", "schemaName" : "dbo", "ddl" : "N/A" })' (io.debezium.relational.history.KafkaDatabaseHistory) java.lang.NullPointerException at io.debezium.relational.history.AbstractDatabaseHistory.lambda$recover$1(AbstractDatabaseHistory.java:92) at io.debezium.relational.history.KafkaDatabaseHistory.recoverRecords(KafkaDatabaseHistory.java:238) at io.debezium.relational.history.AbstractDatabaseHistory.recover(AbstractDatabaseHistory.java:73) at io.debezium.relational.HistorizedRelationalDatabaseSchema.recover(HistorizedRelationalDatabaseSchema.java:43) at io.debezium.connector.sqlserver.SqlServerConnectorTask.start(SqlServerConnectorTask.java:106) at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:47) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:198) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) [2018-11-29 15:57:33,291] INFO WorkerSourceTask{id=sqlserver-source-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask) [2018-11-29 15:57:33,291] INFO WorkerSourceTask{id=sqlserver-source-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask) [2018-11-29 15:57:33,292] ERROR WorkerSourceTask{id=sqlserver-source-connector-0} Exception thrown while calling task.commit() (org.apache.kafka.connect.runtime.WorkerSourceTask) java.lang.NullPointerException at io.debezium.connector.sqlserver.SqlServerConnectorTask.commit(SqlServerConnectorTask.java:167) at org.apache.kafka.connect.runtime.WorkerSourceTask.commitSourceTask(WorkerSourceTask.java:506) at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:447) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:238) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) [2018-11-29 15:57:33,292] ERROR WorkerSourceTask{id=sqlserver-source-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask) java.lang.NullPointerException at io.debezium.relational.history.AbstractDatabaseHistory.lambda$recover$1(AbstractDatabaseHistory.java:92) at io.debezium.relational.history.KafkaDatabaseHistory.recoverRecords(KafkaDatabaseHistory.java:238) at io.debezium.relational.history.AbstractDatabaseHistory.recover(AbstractDatabaseHistory.java:73) at io.debezium.relational.HistorizedRelationalDatabaseSchema.recover(HistorizedRelationalDatabaseSchema.java:43) at io.debezium.connector.sqlserver.SqlServerConnectorTask.start(SqlServerConnectorTask.java:106) at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:47) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:198) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) [2018-11-29 15:57:33,292] ERROR WorkerSourceTask{id=sqlserver-source-connector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
My configuration
My source connector:
{ "name": "sqlserver-source-connector", "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", "value.converter.schema.registry.url": "http://kafka-schema-registry:8081", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schema.registry.url": "http://kafka-schema-registry:8081", "database.hostname": "host.docker.internal", "database.port": "1433", "database.user": "dbusername", "database.password": "dbpassword", "database.dbname": "dbname", "database.history.kafka.bootstrap.servers": "kafka1:19092", "database.history.kafka.topic": "SQLSERVER01.history", "database.server.name": "SQLSERVER01", "table.whitelist": "^dbo\\.table1$,^dbo\\.table2$,^dbo\\.table3$" }
My sink connectors:
{ "name": "table1-sink-connector", "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "value.converter.schema.registry.url": "http://kafka-schema-registry:8081", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schema.registry.url": "http://kafka-schema-registry:8081", "transforms": "route,unwrap", "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)", "transforms.route.replacement": "$3", "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope", "tasks.max": "1", "topics": "SQLSERVER01.dbo.table1", "auto.create": "true", "connection.url": "jdbc:mysql://dbservice.db/dbname?user=dbuser&password=r4ndom", "insert.mode": "upsert", "pk.mode": "record_key", "pk.fields": "PK_table1" } { "name": "table2-sink-connector", "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "value.converter.schema.registry.url": "http://kafka-schema-registry:8081", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schema.registry.url": "http://kafka-schema-registry:8081", "transforms": "route,unwrap", "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)", "transforms.route.replacement": "$3", "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope", "tasks.max": "1", "topics": "SQLSERVER01.dbo.table2", "auto.create": "true", "connection.url": "jdbc:mysql://dbservice.db/dbname?user=dbuser&password=r4ndom", "insert.mode": "upsert", "pk.mode": "record_key", "pk.fields": "PK_table2" }