-
Bug
-
Resolution: Done
-
Major
-
0.8.1.Final
-
None
Steps to reproduce :
1. Start the connector with the config -
{
"name": "demo-mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "debz.schema-changes.demo",
"database.hostname": "localhost",
"database.user": "dbarr",
"database.password": "xxx",
"database.port": "3306",
"database.server.id": "999",
"database.server.name": "debz-demo",
"name": "demo-mysql-connector",
"database.whitelist": "demo",
"tasks.max": "1"
}
}
2. Create a table with json column NOT NULL field.
CREATE TABLE `test_table` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, `name` varchar(127) NOT NULL, `json_col` json NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB
3. Disable mysql strict mode if enabled.
4. Do inserts
INSERT INTO `demo`.`test_table` (`id`, `name`) VALUES ('1', 'abc', cast('null' as json));
No Error. All good.
INSERT INTO `demo`.`test_table` (`id`, `name`) VALUES ('2', 'def');
Connector fails. Connector status -
{
name: "demo-mysql-connector",
connector: {
state: "RUNNING",
worker_id: "10.2.43.46:8083"
},
tasks: [
{
state: "FAILED",
trace: "org.apache.kafka.connect.errors.ConnectException: Invalid value: null used for required field: "json_col", schema type: STRING at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:200) at io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:178) at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:451) at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1055) at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:913) at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:559) at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:793) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field: "json_col", schema type: STRING at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:220) at org.apache.kafka.connect.data.Struct.validate(Struct.java:233) at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:252) at org.apache.kafka.connect.data.Struct.put(Struct.java:216) at org.apache.kafka.connect.data.Struct.put(Struct.java:203) at io.debezium.data.Envelope.create(Envelope.java:262) at io.debezium.connector.mysql.RecordMakers$1.insert(RecordMakers.java:219) at io.debezium.connector.mysql.RecordMakers$RecordsForTable.create(RecordMakers.java:419) at io.debezium.connector.mysql.BinlogReader.handleInsert(BinlogReader.java:765) at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:435) ... 5 more ",
id: 0,
worker_id: "10.2.43.46:8083"
}
],
type: "source"
}
Stacktrace attached.
This is happening because in the second insert, we don't provide any value for json_col, therefore, mysql inserts an implicit null value for the column. However, this implicit null is different from cast('null' as json). That is why the first insert does not throw any error.
I have seen the code and I think I know whats happening. Its just that cast('null' as json) produces a null which is handled but the implicit null inserted by mysql in the second insert statement is not handled.
I will be happy to provide a patch fix for this one.