-
Bug
-
Resolution: Done
-
Critical
-
1.2.0.Final
-
None
Running the following in MySQL would fail the connector task:
CREATE DATABASE `my_db`; USE `my_db`; CREATE TABLE IF NOT EXISTS `my_db`.`my_table` ( `id` BIGINT(20) NOT NULL AUTO_INCREMENT, `ts_col` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, `ts_col2` TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL, `ts_col3` TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (`id`)) ENGINE = InnoDB; INSERT INTO `my_db`.`my_table` VALUES (); -- ok ALTER TABLE `my_db`.`my_table` ADD `ts_col4` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP; -- ok INSERT INTO `my_db`.`my_table` VALUES (); -- task fail ALTER TABLE `my_db`.`my_table` ADD `ts_col5` TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL;
The log error is:
[2020-07-11 15:51:47,389] ERROR WorkerSourceTask{id=source-mysql-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)[2020-07-11 15:51:47,389] ERROR WorkerSourceTask{id=source-mysql-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)org.apache.kafka.connect.errors.ConnectException: Invalid default value at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230) at io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207) at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:600) at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1130) at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:978) at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:581) at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:860) at java.lang.Thread.run(Thread.java:748)Caused by: org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default value at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131) at io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:381) at io.debezium.relational.TableSchemaBuilder.lambda$create$2(TableSchemaBuilder.java:120) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) at io.debezium.relational.TableSchemaBuilder.create(TableSchemaBuilder.java:118) at io.debezium.relational.RelationalDatabaseSchema.buildAndRegisterSchema(RelationalDatabaseSchema.java:130) at io.debezium.connector.mysql.MySqlSchema.lambda$applyDdl$4(MySqlSchema.java:380) at java.lang.Iterable.forEach(Iterable.java:75) at io.debezium.connector.mysql.MySqlSchema.applyDdl(MySqlSchema.java:374) at io.debezium.connector.mysql.BinlogReader.handleQueryEvent(BinlogReader.java:793) at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:583) ... 5 moreCaused by: org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field: "null", schema type: STRING at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:220) at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213) at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129) ... 23 more
The position of `NOT NULL` in the `ALTER TABLE ... ADD COLUMN` DDL should not matter, being it's before or after the `DEFAULT CURRENT_TIMESTAMP`. As you can see, the similar column definition in the `CREATE TABLE` DDL works well: ``ts_col2` TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL`. This error happens only when `ALTER TABLE` is applied, not when `CREATE TABLE` is applied.
The default value is `null` even though the column defined in the DLL as `NOT NULL`. The default value should have been `1970-01-01 00:00:00` (String type) according to the documentation].
I believe the root cause is that in case the `NOT NULL` is positioned at the end of the DDL, the following code is invoked when we don't have the knowledge of whether the column is nullable or not, therefore by default it is always nullable.
if (column.isOptional()) { return null; }
Proposed solution:
Delay invoking convertDefaultValueToSchemaType() to exitColumnDefinition() so that we're sure if the column is optional or not.
See more in the PR.
Alternative solution:
Removing the above code (here and here) would fix the bug, but it would also change the existing behavior of:
Using `null` as default in the AVRO schema if the column is nullable __ to Using `1970-01-01 00:00:00` as default in the AVRO schema if the column is nullable.