-
Bug
-
Resolution: Done
-
Major
-
0.8.1.Final
-
None
Steps to reproduce :
1. Start the connector with the config -
{ "name": "demo-mysql-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.history.kafka.bootstrap.servers": "localhost:9092", "database.history.kafka.topic": "debz.schema-changes.demo", "database.hostname": "localhost", "database.user": "dbarr", "database.password": "xxx", "database.port": "3306", "database.server.id": "999", "database.server.name": "debz-demo", "name": "demo-mysql-connector", "database.whitelist": "demo", "tasks.max": "1" } }
2. Create a table with json column NOT NULL field.
CREATE TABLE `test_table` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, `name` varchar(127) NOT NULL, `json_col` json NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB
3. Disable mysql strict mode if enabled.
4. Do inserts
INSERT INTO `demo`.`test_table` (`id`, `name`) VALUES ('1', 'abc', cast('null' as json));
No Error. All good.
INSERT INTO `demo`.`test_table` (`id`, `name`) VALUES ('2', 'def');
Connector fails. Connector status -
{ name: "demo-mysql-connector", connector: { state: "RUNNING", worker_id: "10.2.43.46:8083" }, tasks: [ { state: "FAILED", trace: "org.apache.kafka.connect.errors.ConnectException: Invalid value: null used for required field: "json_col", schema type: STRING at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:200) at io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:178) at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:451) at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1055) at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:913) at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:559) at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:793) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field: "json_col", schema type: STRING at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:220) at org.apache.kafka.connect.data.Struct.validate(Struct.java:233) at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:252) at org.apache.kafka.connect.data.Struct.put(Struct.java:216) at org.apache.kafka.connect.data.Struct.put(Struct.java:203) at io.debezium.data.Envelope.create(Envelope.java:262) at io.debezium.connector.mysql.RecordMakers$1.insert(RecordMakers.java:219) at io.debezium.connector.mysql.RecordMakers$RecordsForTable.create(RecordMakers.java:419) at io.debezium.connector.mysql.BinlogReader.handleInsert(BinlogReader.java:765) at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:435) ... 5 more ", id: 0, worker_id: "10.2.43.46:8083" } ], type: "source" }
Stacktrace attached.
This is happening because in the second insert, we don't provide any value for json_col, therefore, mysql inserts an implicit null value for the column. However, this implicit null is different from cast('null' as json). That is why the first insert does not throw any error.
I have seen the code and I think I know whats happening. Its just that cast('null' as json) produces a null which is handled but the implicit null inserted by mysql in the second insert statement is not handled.
I will be happy to provide a patch fix for this one.