-
Bug
-
Resolution: Done
-
Major
-
1.2.0.Beta2
-
None
-
Hide
1. Configurations
MySQL Connector configuration
{ "connector.class": "io.debezium.connector.mysql.MySqlConnector", "max.queue.size": "327680", "database.history.kafka.topic": "schema-changes.inventory", "database.history.connector.id": "inventory_debezium", "include.schema.changes": "true", "table.whitelist": "inventory.customers", "decimal.handling.mode": "string", "value.converter": "io.confluent.connect.avro.AvroConverter", "database.whitelist": "inventory", "key.converter": "io.confluent.connect.avro.AvroConverter", "database.user": "binlog", "database.history.kafka.bootstrap.servers": "kafka:19092", "database.server.name": "inventory", "database.port": "3306", "column.propagate.source.type": ".*", "value.converter.schema.registry.url": "http://schema-registry:8081", "database.history.connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "mysql", "database.password": "123456", "name": "inventory_debezium", "database.history.store.only.monitored.tables.ddl": "true", "max.batch.size": "81920", "include.query": "true", "key.converter.schema.registry.url": "http://schema-registry:8081", "snapshot.mode": "schema_only" }
Schema Registry
{"compatibilityLevel":"NONE"}
Kafka Connect
bootstrap.servers=kafka:9092 group.id=connect-cluster key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true offset.storage.topic=connect-offsets offset.storage.replication.factor=1 offset.storage.partitions=1 config.storage.topic=connect-configs config.storage.partition=1 config.storage.replication.factor=1 status.storage.topic=connect-status status.storage.replication.factor=1 status.storage.partitions=1 offset.flush.interval.ms=10000 plugin.path=/kafka/connect access.control.allow.methods=GET,POST,PUT,DELETE,OPTIONS access.control.allow.origin=*
2. Create the table
CREATE TABLE customers( id INT PRIMARY KEY AUTO_INCREMENT, max_level INT DEFAULT 25 );
3. Insert a row
INSERT INTO customers() VALUES();
Kafka message
{ "before": null, "after": { "Value": { "id": 1, "max_level": { "int": 25 } } }, "source": { "version": "1.2.0.Beta2", "connector": "mysql", "name": "inventory", "ts_ms": 1591098326000, "snapshot": { "string": "false" }, "db": "inventory", "table": { "string": "customers" }, "server_id": 112233, "gtid": { "string": "df1ace0a-a4c5-11ea-a54c-0242ac160003:31" }, "file": "mysql-bin.000003", "pos": 1618, "row": 0, "thread": { "long": 6 }, "query": null }, "op": "c", "ts_ms": { "long": 1591098326228 }, "transaction": null }
Schema v1
{ "type": "record", "name": "Envelope", "namespace": "inventory.inventory.customers", "fields": [ { "name": "before", "type": [ "null", { "type": "record", "name": "Value", "fields": [ { "name": "id", "type": { "type": "int", "connect.parameters": { "__debezium.source.column.type": "INT" } } }, { "name": "max_level", "type": [ { "type": "int", "connect.parameters": { "__debezium.source.column.type": "INT" }, "connect.default": 25 }, "null" ], "default": 25 } ], "connect.name": "inventory.inventory.customers.Value" } ], "default": null }, { "name": "after", "type": [ "null", "Value" ], "default": null }, { "name": "source", "type": { "type": "record", "name": "Source", "namespace": "io.debezium.connector.mysql", "fields": [ { "name": "version", "type": "string" }, { "name": "connector", "type": "string" }, { "name": "name", "type": "string" }, { "name": "ts_ms", "type": "long" }, { "name": "snapshot", "type": [ { "type": "string", "connect.version": 1, "connect.parameters": { "allowed": "true,last,false" }, "connect.default": "false", "connect.name": "io.debezium.data.Enum" }, "null" ], "default": "false" }, { "name": "db", "type": "string" }, { "name": "table", "type": [ "null", "string" ], "default": null }, { "name": "server_id", "type": "long" }, { "name": "gtid", "type": [ "null", "string" ], "default": null }, { "name": "file", "type": "string" }, { "name": "pos", "type": "long" }, { "name": "row", "type": "int" }, { "name": "thread", "type": [ "null", "long" ], "default": null }, { "name": "query", "type": [ "null", "string" ], "default": null } ], "connect.name": "io.debezium.connector.mysql.Source" } }, { "name": "op", "type": "string" }, { "name": "ts_ms", "type": [ "null", "long" ], "default": null }, { "name": "transaction", "type": [ "null", { "type": "record", "name": "ConnectDefault", "namespace": "io.confluent.connect.avro", "fields": [ { "name": "id", "type": "string" }, { "name": "total_order", "type": "long" }, { "name": "data_collection_order", "type": "long" } ] } ], "default": null } ], "connect.name": "inventory.inventory.customers.Envelope" }
4. Drop default
ALTER TABLE customers ALTER COLUMN max_level DROP DEFAULT;
5. Insert another row
INSERT INTO customers(max_level) VALUES(10);
Kafka message
{ "before": null, "after": { "Value": { "id": 2, "max_level": { "int": 10 } } }, "source": { "version": "1.2.0.Beta2", "connector": "mysql", "name": "inventory", "ts_ms": 1591098354000, "snapshot": { "string": "false" }, "db": "inventory", "table": { "string": "customers" }, "server_id": 112233, "gtid": { "string": "df1ace0a-a4c5-11ea-a54c-0242ac160003:33" }, "file": "mysql-bin.000003", "pos": 2099, "row": 0, "thread": { "long": 6 }, "query": null }, "op": "c", "ts_ms": { "long": 1591098354757 }, "transaction": null }
Schema is still v1, expecting v2
{ "type": "record", "name": "Envelope", "namespace": "inventory.inventory.customers", "fields": [ { "name": "before", "type": [ "null", { "type": "record", "name": "Value", "fields": [ { "name": "id", "type": { "type": "int", "connect.parameters": { "__debezium.source.column.type": "INT" } } }, { "name": "max_level", "type": [ { "type": "int", "connect.parameters": { "__debezium.source.column.type": "INT" }, "connect.default": 25 }, "null" ], "default": 25 } ], "connect.name": "inventory.inventory.customers.Value" } ], "default": null }, { "name": "after", "type": [ "null", "Value" ], "default": null }, { "name": "source", "type": { "type": "record", "name": "Source", "namespace": "io.debezium.connector.mysql", "fields": [ { "name": "version", "type": "string" }, { "name": "connector", "type": "string" }, { "name": "name", "type": "string" }, { "name": "ts_ms", "type": "long" }, { "name": "snapshot", "type": [ { "type": "string", "connect.version": 1, "connect.parameters": { "allowed": "true,last,false" }, "connect.default": "false", "connect.name": "io.debezium.data.Enum" }, "null" ], "default": "false" }, { "name": "db", "type": "string" }, { "name": "table", "type": [ "null", "string" ], "default": null }, { "name": "server_id", "type": "long" }, { "name": "gtid", "type": [ "null", "string" ], "default": null }, { "name": "file", "type": "string" }, { "name": "pos", "type": "long" }, { "name": "row", "type": "int" }, { "name": "thread", "type": [ "null", "long" ], "default": null }, { "name": "query", "type": [ "null", "string" ], "default": null } ], "connect.name": "io.debezium.connector.mysql.Source" } }, { "name": "op", "type": "string" }, { "name": "ts_ms", "type": [ "null", "long" ], "default": null }, { "name": "transaction", "type": [ "null", { "type": "record", "name": "ConnectDefault", "namespace": "io.confluent.connect.avro", "fields": [ { "name": "id", "type": "string" }, { "name": "total_order", "type": "long" }, { "name": "data_collection_order", "type": "long" } ] } ], "default": null } ], "connect.name": "inventory.inventory.customers.Envelope" }
Show1. Configurations MySQL Connector configuration { "connector.class" : "io.debezium.connector.mysql.MySqlConnector" , "max.queue.size" : "327680" , "database.history.kafka.topic" : "schema-changes.inventory" , "database.history.connector.id" : "inventory_debezium" , "include.schema.changes" : " true " , "table.whitelist" : "inventory.customers" , "decimal.handling.mode" : "string" , "value.converter" : "io.confluent.connect.avro.AvroConverter" , "database.whitelist" : "inventory" , "key.converter" : "io.confluent.connect.avro.AvroConverter" , "database.user" : "binlog" , "database.history.kafka.bootstrap.servers" : "kafka:19092" , "database.server.name" : "inventory" , "database.port" : "3306" , "column.propagate.source.type" : ".*" , "value.converter.schema.registry.url" : "http: //schema-registry:8081" , "database.history.connector.class" : "io.debezium.connector.mysql.MySqlConnector" , "database.hostname" : "mysql" , "database.password" : "123456" , "name" : "inventory_debezium" , "database.history.store.only.monitored.tables.ddl" : " true " , "max.batch.size" : "81920" , "include.query" : " true " , "key.converter.schema.registry.url" : "http: //schema-registry:8081" , "snapshot.mode" : "schema_only" } Schema Registry { "compatibilityLevel" : "NONE" } Kafka Connect bootstrap.servers=kafka:9092 group.id=connect-cluster key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable= true value.converter.schemas.enable= true offset.storage.topic=connect-offsets offset.storage.replication.factor=1 offset.storage.partitions=1 config.storage.topic=connect-configs config.storage.partition=1 config.storage.replication.factor=1 status.storage.topic=connect-status status.storage.replication.factor=1 status.storage.partitions=1 offset.flush.interval.ms=10000 plugin.path=/kafka/connect access.control.allow.methods=GET,POST,PUT,DELETE,OPTIONS access.control.allow.origin=* 2. Create the table CREATE TABLE customers( id INT PRIMARY KEY AUTO_INCREMENT , max_level INT DEFAULT 25 ); 3. Insert a row INSERT INTO customers() VALUES (); Kafka message { "before" : null , "after" : { "Value" : { "id" : 1, "max_level" : { " int " : 25 } } }, "source" : { "version" : "1.2.0.Beta2" , "connector" : "mysql" , "name" : "inventory" , "ts_ms" : 1591098326000, "snapshot" : { "string" : " false " }, "db" : "inventory" , "table" : { "string" : "customers" }, "server_id" : 112233, "gtid" : { "string" : "df1ace0a-a4c5-11ea-a54c-0242ac160003:31" }, "file" : "mysql-bin.000003" , "pos" : 1618, "row" : 0, "thread" : { " long " : 6 }, "query" : null }, "op" : "c" , "ts_ms" : { " long " : 1591098326228 }, "transaction" : null } Schema v1 { "type" : "record" , "name" : "Envelope" , "namespace" : "inventory.inventory.customers" , "fields" : [ { "name" : "before" , "type" : [ " null " , { "type" : "record" , "name" : "Value" , "fields" : [ { "name" : "id" , "type" : { "type" : " int " , "connect.parameters" : { "__debezium.source.column.type" : "INT" } } }, { "name" : "max_level" , "type" : [ { "type" : " int " , "connect.parameters" : { "__debezium.source.column.type" : "INT" }, "connect. default " : 25 }, " null " ], " default " : 25 } ], "connect.name" : "inventory.inventory.customers.Value" } ], " default " : null }, { "name" : "after" , "type" : [ " null " , "Value" ], " default " : null }, { "name" : "source" , "type" : { "type" : "record" , "name" : "Source" , "namespace" : "io.debezium.connector.mysql" , "fields" : [ { "name" : "version" , "type" : "string" }, { "name" : "connector" , "type" : "string" }, { "name" : "name" , "type" : "string" }, { "name" : "ts_ms" , "type" : " long " }, { "name" : "snapshot" , "type" : [ { "type" : "string" , "connect.version" : 1, "connect.parameters" : { "allowed" : " true ,last, false " }, "connect. default " : " false " , "connect.name" : "io.debezium.data.Enum" }, " null " ], " default " : " false " }, { "name" : "db" , "type" : "string" }, { "name" : "table" , "type" : [ " null " , "string" ], " default " : null }, { "name" : "server_id" , "type" : " long " }, { "name" : "gtid" , "type" : [ " null " , "string" ], " default " : null }, { "name" : "file" , "type" : "string" }, { "name" : "pos" , "type" : " long " }, { "name" : "row" , "type" : " int " }, { "name" : "thread" , "type" : [ " null " , " long " ], " default " : null }, { "name" : "query" , "type" : [ " null " , "string" ], " default " : null } ], "connect.name" : "io.debezium.connector.mysql.Source" } }, { "name" : "op" , "type" : "string" }, { "name" : "ts_ms" , "type" : [ " null " , " long " ], " default " : null }, { "name" : "transaction" , "type" : [ " null " , { "type" : "record" , "name" : "ConnectDefault" , "namespace" : "io.confluent.connect.avro" , "fields" : [ { "name" : "id" , "type" : "string" }, { "name" : "total_order" , "type" : " long " }, { "name" : "data_collection_order" , "type" : " long " } ] } ], " default " : null } ], "connect.name" : "inventory.inventory.customers.Envelope" } 4. Drop default ALTER TABLE customers ALTER COLUMN max_level DROP DEFAULT ; 5. Insert another row INSERT INTO customers(max_level) VALUES (10); Kafka message { "before" : null , "after" : { "Value" : { "id" : 2, "max_level" : { " int " : 10 } } }, "source" : { "version" : "1.2.0.Beta2" , "connector" : "mysql" , "name" : "inventory" , "ts_ms" : 1591098354000, "snapshot" : { "string" : " false " }, "db" : "inventory" , "table" : { "string" : "customers" }, "server_id" : 112233, "gtid" : { "string" : "df1ace0a-a4c5-11ea-a54c-0242ac160003:33" }, "file" : "mysql-bin.000003" , "pos" : 2099, "row" : 0, "thread" : { " long " : 6 }, "query" : null }, "op" : "c" , "ts_ms" : { " long " : 1591098354757 }, "transaction" : null } Schema is still v1, expecting v2 { "type" : "record" , "name" : "Envelope" , "namespace" : "inventory.inventory.customers" , "fields" : [ { "name" : "before" , "type" : [ " null " , { "type" : "record" , "name" : "Value" , "fields" : [ { "name" : "id" , "type" : { "type" : " int " , "connect.parameters" : { "__debezium.source.column.type" : "INT" } } }, { "name" : "max_level" , "type" : [ { "type" : " int " , "connect.parameters" : { "__debezium.source.column.type" : "INT" }, "connect. default " : 25 }, " null " ], " default " : 25 } ], "connect.name" : "inventory.inventory.customers.Value" } ], " default " : null }, { "name" : "after" , "type" : [ " null " , "Value" ], " default " : null }, { "name" : "source" , "type" : { "type" : "record" , "name" : "Source" , "namespace" : "io.debezium.connector.mysql" , "fields" : [ { "name" : "version" , "type" : "string" }, { "name" : "connector" , "type" : "string" }, { "name" : "name" , "type" : "string" }, { "name" : "ts_ms" , "type" : " long " }, { "name" : "snapshot" , "type" : [ { "type" : "string" , "connect.version" : 1, "connect.parameters" : { "allowed" : " true ,last, false " }, "connect. default " : " false " , "connect.name" : "io.debezium.data.Enum" }, " null " ], " default " : " false " }, { "name" : "db" , "type" : "string" }, { "name" : "table" , "type" : [ " null " , "string" ], " default " : null }, { "name" : "server_id" , "type" : " long " }, { "name" : "gtid" , "type" : [ " null " , "string" ], " default " : null }, { "name" : "file" , "type" : "string" }, { "name" : "pos" , "type" : " long " }, { "name" : "row" , "type" : " int " }, { "name" : "thread" , "type" : [ " null " , " long " ], " default " : null }, { "name" : "query" , "type" : [ " null " , "string" ], " default " : null } ], "connect.name" : "io.debezium.connector.mysql.Source" } }, { "name" : "op" , "type" : "string" }, { "name" : "ts_ms" , "type" : [ " null " , " long " ], " default " : null }, { "name" : "transaction" , "type" : [ " null " , { "type" : "record" , "name" : "ConnectDefault" , "namespace" : "io.confluent.connect.avro" , "fields" : [ { "name" : "id" , "type" : "string" }, { "name" : "total_order" , "type" : " long " }, { "name" : "data_collection_order" , "type" : " long " } ] } ], " default " : null } ], "connect.name" : "inventory.inventory.customers.Envelope" }
Avro schema doesn't change if an `ALTER <table> ALTER COLUMN <column> DROP DEFAULT` happens on a column with a default value. We've tested AvroConverter.
Debezium MySQL Connector: 1.2.0.Beta2
Kafka Connect: 2.3.0
Schema Registry: 5.4.0
MySQL: 5.7.21