-
Bug
-
Resolution: Done
-
Major
-
1.1.0.Final
-
None
-
Hide
0. Configurations
MySQL Connector configuration
{ "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.user": "binlog", "max.queue.size": "327680", "database.history.kafka.bootstrap.servers": "kafka:19092", "database.history.kafka.topic": "schema-changes.inventory_db", "database.server.name": "inventory_db", "database.port": "3306", "include.schema.changes": "true", "column.propagate.source.type": ".*", "table.whitelist": "inventory.customers", "value.converter.schema.registry.url": "http://schema-registry:8081", "database.hostname": "mysql", "database.password": "123456", "name": "inventory_db_binlog_connector", "database.history.store.only.monitored.tables.ddl": "true", "max.batch.size": "81920", "value.converter": "io.confluent.connect.avro.AvroConverter", "database.whitelist": "inventory", "include.query": "true", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://schema-registry:8081", "snapshot.mode": "schema_only" }
Schema Registry
{"compatibilityLevel":"NONE"}
Kafka Connect
bootstrap.servers=kafka:9092 group.id=connect-cluster key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true offset.storage.topic=connect-offsets offset.storage.replication.factor=1 offset.storage.partitions=1 config.storage.topic=connect-configs config.storage.partition=1 config.storage.replication.factor=1 status.storage.topic=connect-status status.storage.replication.factor=1 status.storage.partitions=1 offset.flush.interval.ms=10000 plugin.path=/kafka/connect access.control.allow.methods=GET,POST,PUT,DELETE,OPTIONS access.control.allow.origin=*
1. Create the table
CREATE TABLE inventory.customers ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, first_name VARCHAR(255) NOT NULL);
2. Insert a row
INSERT INTO customers(first_name) VALUES("1");
{ "before": null, "after": { "Value": { "id": 1, "first_name": "1" } }, "source": { "version": "1.1.0.Final", "connector": "mysql", "name": "inventory_db", "ts_ms": 1588929295000, "snapshot": { "string": "false" }, "db": "inventory", "table": { "string": "customers" }, "server_id": 112233, "gtid": { "string": "aaab5f18-910b-11ea-965e-0242ac160003:31" }, "file": "mysql-bin.000003", "pos": 1561, "row": 0, "thread": { "long": 7 }, "query": null }, "op": "c", "ts_ms": { "long": 1588929295398 }, "transaction": null }
Schema v.1:
{ "type": "record", "name": "Envelope", "namespace": "inventory_db.inventory.customers", "fields": [ { "name": "before", "type": [ "null", { "type": "record", "name": "Value", "fields": [ { "name": "id", "type": { "type": "int", "connect.parameters": { "__debezium.source.column.type": "INT", "__debezium.source.column.length": "11" } } }, { "name": "first_name", "type": { "type": "string", "connect.parameters": { "__debezium.source.column.type": "VARCHAR", "__debezium.source.column.length": "255" } } } ], "connect.name": "inventory_db.inventory.customers.Value" } ], "default": null }, { "name": "after", "type": [ "null", "Value" ], "default": null }, { "name": "source", "type": { "type": "record", "name": "Source", "namespace": "io.debezium.connector.mysql", "fields": [ { "name": "version", "type": "string" }, { "name": "connector", "type": "string" }, { "name": "name", "type": "string" }, { "name": "ts_ms", "type": "long" }, { "name": "snapshot", "type": [ { "type": "string", "connect.version": 1, "connect.parameters": { "allowed": "true,last,false" }, "connect.default": "false", "connect.name": "io.debezium.data.Enum" }, "null" ], "default": "false" }, { "name": "db", "type": "string" }, { "name": "table", "type": [ "null", "string" ], "default": null }, { "name": "server_id", "type": "long" }, { "name": "gtid", "type": [ "null", "string" ], "default": null }, { "name": "file", "type": "string" }, { "name": "pos", "type": "long" }, { "name": "row", "type": "int" }, { "name": "thread", "type": [ "null", "long" ], "default": null }, { "name": "query", "type": [ "null", "string" ], "default": null } ], "connect.name": "io.debezium.connector.mysql.Source" } }, { "name": "op", "type": "string" }, { "name": "ts_ms", "type": [ "null", "long" ], "default": null }, { "name": "transaction", "type": [ "null", { "type": "record", "name": "ConnectDefault", "namespace": "io.confluent.connect.avro", "fields": [ { "name": "id", "type": "string" }, { "name": "total_order", "type": "long" }, { "name": "data_collection_order", "type": "long" } ] } ], "default": null } ], "connect.name": "inventory_db.inventory.customers.Envelope" }
3. Add new column with a default value
ALTER TABLE customers ADD COLUMN last_name VARCHAR(255) DEFAULT 'Doe'; INSERT INTO customers(first_name) VALUES("2");
{ "before": null, "after": { "Value": { "id": 2, "first_name": "2", "last_name": { "string": "Doe" } } }, "source": { "version": "1.1.0.Final", "connector": "mysql", "name": "inventory_db", "ts_ms": 1588929524000, "snapshot": { "string": "false" }, "db": "inventory", "table": { "string": "customers" }, "server_id": 112233, "gtid": { "string": "aaab5f18-910b-11ea-965e-0242ac160003:33" }, "file": "mysql-bin.000003", "pos": 2057, "row": 0, "thread": { "long": 7 }, "query": null }, "op": "c", "ts_ms": { "long": 1588929524763 }, "transaction": null }
Schema v.2
{ "type": "record", "name": "Envelope", "namespace": "inventory_db.inventory.customers", "fields": [ { "name": "before", "type": [ "null", { "type": "record", "name": "Value", "fields": [ { "name": "id", "type": { "type": "int", "connect.parameters": { "__debezium.source.column.type": "INT", "__debezium.source.column.length": "11" } } }, { "name": "first_name", "type": { "type": "string", "connect.parameters": { "__debezium.source.column.type": "VARCHAR", "__debezium.source.column.length": "255" } } }, { "name": "last_name", "type": [ { "type": "string", "connect.parameters": { "__debezium.source.column.type": "VARCHAR", "__debezium.source.column.length": "255" }, "connect.default": "Doe" }, "null" ], "default": "Doe" } ], "connect.name": "inventory_db.inventory.customers.Value" } ], "default": null }, { "name": "after", "type": [ "null", "Value" ], "default": null }, { "name": "source", "type": { "type": "record", "name": "Source", "namespace": "io.debezium.connector.mysql", "fields": [ { "name": "version", "type": "string" }, { "name": "connector", "type": "string" }, { "name": "name", "type": "string" }, { "name": "ts_ms", "type": "long" }, { "name": "snapshot", "type": [ { "type": "string", "connect.version": 1, "connect.parameters": { "allowed": "true,last,false" }, "connect.default": "false", "connect.name": "io.debezium.data.Enum" }, "null" ], "default": "false" }, { "name": "db", "type": "string" }, { "name": "table", "type": [ "null", "string" ], "default": null }, { "name": "server_id", "type": "long" }, { "name": "gtid", "type": [ "null", "string" ], "default": null }, { "name": "file", "type": "string" }, { "name": "pos", "type": "long" }, { "name": "row", "type": "int" }, { "name": "thread", "type": [ "null", "long" ], "default": null }, { "name": "query", "type": [ "null", "string" ], "default": null } ], "connect.name": "io.debezium.connector.mysql.Source" } }, { "name": "op", "type": "string" }, { "name": "ts_ms", "type": [ "null", "long" ], "default": null }, { "name": "transaction", "type": [ "null", { "type": "record", "name": "ConnectDefault", "namespace": "io.confluent.connect.avro", "fields": [ { "name": "id", "type": "string" }, { "name": "total_order", "type": "long" }, { "name": "data_collection_order", "type": "long" } ] } ], "default": null } ], "connect.name": "inventory_db.inventory.customers.Envelope" }
4. Alter columns default value
ALTER TABLE customers ALTER COLUMN last_name SET DEFAULT 'foo'; INSERT INTO customers(first_name) VALUES("3");
{ "before": null, "after": { "Value": { "id": 3, "first_name": "3", "last_name": { "string": "foo" } } }, "source": { "version": "1.1.0.Final", "connector": "mysql", "name": "inventory_db", "ts_ms": 1588929644000, "snapshot": { "string": "false" }, "db": "inventory", "table": { "string": "customers" }, "server_id": 112233, "gtid": { "string": "aaab5f18-910b-11ea-965e-0242ac160003:35" }, "file": "mysql-bin.000003", "pos": 2550, "row": 0, "thread": { "long": 7 }, "query": null }, "op": "c", "ts_ms": { "long": 1588929644598 }, "transaction": null }
Schema registry shows the schema is still v.2, not v.3
{ "type": "record", "name": "Envelope", "namespace": "inventory_db.inventory.customers", "fields": [ { "name": "before", "type": [ "null", { "type": "record", "name": "Value", "fields": [ { "name": "id", "type": { "type": "int", "connect.parameters": { "__debezium.source.column.type": "INT", "__debezium.source.column.length": "11" } } }, { "name": "first_name", "type": { "type": "string", "connect.parameters": { "__debezium.source.column.type": "VARCHAR", "__debezium.source.column.length": "255" } } }, { "name": "last_name", "type": [ { "type": "string", "connect.parameters": { "__debezium.source.column.type": "VARCHAR", "__debezium.source.column.length": "255" }, "connect.default": "Doe" }, "null" ], "default": "Doe" } ], "connect.name": "inventory_db.inventory.customers.Value" } ], "default": null }, { "name": "after", "type": [ "null", "Value" ], "default": null }, { "name": "source", "type": { "type": "record", "name": "Source", "namespace": "io.debezium.connector.mysql", "fields": [ { "name": "version", "type": "string" }, { "name": "connector", "type": "string" }, { "name": "name", "type": "string" }, { "name": "ts_ms", "type": "long" }, { "name": "snapshot", "type": [ { "type": "string", "connect.version": 1, "connect.parameters": { "allowed": "true,last,false" }, "connect.default": "false", "connect.name": "io.debezium.data.Enum" }, "null" ], "default": "false" }, { "name": "db", "type": "string" }, { "name": "table", "type": [ "null", "string" ], "default": null }, { "name": "server_id", "type": "long" }, { "name": "gtid", "type": [ "null", "string" ], "default": null }, { "name": "file", "type": "string" }, { "name": "pos", "type": "long" }, { "name": "row", "type": "int" }, { "name": "thread", "type": [ "null", "long" ], "default": null }, { "name": "query", "type": [ "null", "string" ], "default": null } ], "connect.name": "io.debezium.connector.mysql.Source" } }, { "name": "op", "type": "string" }, { "name": "ts_ms", "type": [ "null", "long" ], "default": null }, { "name": "transaction", "type": [ "null", { "type": "record", "name": "ConnectDefault", "namespace": "io.confluent.connect.avro", "fields": [ { "name": "id", "type": "string" }, { "name": "total_order", "type": "long" }, { "name": "data_collection_order", "type": "long" } ] } ], "default": null } ], "connect.name": "inventory_db.inventory.customers.Envelope" }
Show0. Configurations MySQL Connector configuration { "connector.class" : "io.debezium.connector.mysql.MySqlConnector" , "database.user" : "binlog" , "max.queue.size" : "327680" , "database.history.kafka.bootstrap.servers" : "kafka:19092" , "database.history.kafka.topic" : "schema-changes.inventory_db" , "database.server.name" : "inventory_db" , "database.port" : "3306" , "include.schema.changes" : " true " , "column.propagate.source.type" : ".*" , "table.whitelist" : "inventory.customers" , "value.converter.schema.registry.url" : "http: //schema-registry:8081" , "database.hostname" : "mysql" , "database.password" : "123456" , "name" : "inventory_db_binlog_connector" , "database.history.store.only.monitored.tables.ddl" : " true " , "max.batch.size" : "81920" , "value.converter" : "io.confluent.connect.avro.AvroConverter" , "database.whitelist" : "inventory" , "include.query" : " true " , "key.converter" : "io.confluent.connect.avro.AvroConverter" , "key.converter.schema.registry.url" : "http: //schema-registry:8081" , "snapshot.mode" : "schema_only" } Schema Registry { "compatibilityLevel" : "NONE" } Kafka Connect bootstrap.servers=kafka:9092 group.id=connect-cluster key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable= true value.converter.schemas.enable= true offset.storage.topic=connect-offsets offset.storage.replication.factor=1 offset.storage.partitions=1 config.storage.topic=connect-configs config.storage.partition=1 config.storage.replication.factor=1 status.storage.topic=connect-status status.storage.replication.factor=1 status.storage.partitions=1 offset.flush.interval.ms=10000 plugin.path=/kafka/connect access.control.allow.methods=GET,POST,PUT,DELETE,OPTIONS access.control.allow.origin=* 1. Create the table CREATE TABLE inventory.customers ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY , first_name VARCHAR (255) NOT NULL ); 2. Insert a row INSERT INTO customers(first_name) VALUES ( "1" ); { "before" : null , "after" : { "Value" : { "id" : 1, "first_name" : "1" } }, "source" : { "version" : "1.1.0.Final" , "connector" : "mysql" , "name" : "inventory_db" , "ts_ms" : 1588929295000, "snapshot" : { "string" : " false " }, "db" : "inventory" , "table" : { "string" : "customers" }, "server_id" : 112233, "gtid" : { "string" : "aaab5f18-910b-11ea-965e-0242ac160003:31" }, "file" : "mysql-bin.000003" , "pos" : 1561, "row" : 0, "thread" : { " long " : 7 }, "query" : null }, "op" : "c" , "ts_ms" : { " long " : 1588929295398 }, "transaction" : null } Schema v.1 : { "type" : "record" , "name" : "Envelope" , "namespace" : "inventory_db.inventory.customers" , "fields" : [ { "name" : "before" , "type" : [ " null " , { "type" : "record" , "name" : "Value" , "fields" : [ { "name" : "id" , "type" : { "type" : " int " , "connect.parameters" : { "__debezium.source.column.type" : "INT" , "__debezium.source.column.length" : "11" } } }, { "name" : "first_name" , "type" : { "type" : "string" , "connect.parameters" : { "__debezium.source.column.type" : "VARCHAR" , "__debezium.source.column.length" : "255" } } } ], "connect.name" : "inventory_db.inventory.customers.Value" } ], " default " : null }, { "name" : "after" , "type" : [ " null " , "Value" ], " default " : null }, { "name" : "source" , "type" : { "type" : "record" , "name" : "Source" , "namespace" : "io.debezium.connector.mysql" , "fields" : [ { "name" : "version" , "type" : "string" }, { "name" : "connector" , "type" : "string" }, { "name" : "name" , "type" : "string" }, { "name" : "ts_ms" , "type" : " long " }, { "name" : "snapshot" , "type" : [ { "type" : "string" , "connect.version" : 1, "connect.parameters" : { "allowed" : " true ,last, false " }, "connect. default " : " false " , "connect.name" : "io.debezium.data.Enum" }, " null " ], " default " : " false " }, { "name" : "db" , "type" : "string" }, { "name" : "table" , "type" : [ " null " , "string" ], " default " : null }, { "name" : "server_id" , "type" : " long " }, { "name" : "gtid" , "type" : [ " null " , "string" ], " default " : null }, { "name" : "file" , "type" : "string" }, { "name" : "pos" , "type" : " long " }, { "name" : "row" , "type" : " int " }, { "name" : "thread" , "type" : [ " null " , " long " ], " default " : null }, { "name" : "query" , "type" : [ " null " , "string" ], " default " : null } ], "connect.name" : "io.debezium.connector.mysql.Source" } }, { "name" : "op" , "type" : "string" }, { "name" : "ts_ms" , "type" : [ " null " , " long " ], " default " : null }, { "name" : "transaction" , "type" : [ " null " , { "type" : "record" , "name" : "ConnectDefault" , "namespace" : "io.confluent.connect.avro" , "fields" : [ { "name" : "id" , "type" : "string" }, { "name" : "total_order" , "type" : " long " }, { "name" : "data_collection_order" , "type" : " long " } ] } ], " default " : null } ], "connect.name" : "inventory_db.inventory.customers.Envelope" } 3. Add new column with a default value ALTER TABLE customers ADD COLUMN last_name VARCHAR (255) DEFAULT 'Doe' ; INSERT INTO customers(first_name) VALUES ( "2" ); { "before" : null , "after" : { "Value" : { "id" : 2, "first_name" : "2" , "last_name" : { "string" : "Doe" } } }, "source" : { "version" : "1.1.0.Final" , "connector" : "mysql" , "name" : "inventory_db" , "ts_ms" : 1588929524000, "snapshot" : { "string" : " false " }, "db" : "inventory" , "table" : { "string" : "customers" }, "server_id" : 112233, "gtid" : { "string" : "aaab5f18-910b-11ea-965e-0242ac160003:33" }, "file" : "mysql-bin.000003" , "pos" : 2057, "row" : 0, "thread" : { " long " : 7 }, "query" : null }, "op" : "c" , "ts_ms" : { " long " : 1588929524763 }, "transaction" : null } Schema v.2 { "type" : "record" , "name" : "Envelope" , "namespace" : "inventory_db.inventory.customers" , "fields" : [ { "name" : "before" , "type" : [ " null " , { "type" : "record" , "name" : "Value" , "fields" : [ { "name" : "id" , "type" : { "type" : " int " , "connect.parameters" : { "__debezium.source.column.type" : "INT" , "__debezium.source.column.length" : "11" } } }, { "name" : "first_name" , "type" : { "type" : "string" , "connect.parameters" : { "__debezium.source.column.type" : "VARCHAR" , "__debezium.source.column.length" : "255" } } }, { "name" : "last_name" , "type" : [ { "type" : "string" , "connect.parameters" : { "__debezium.source.column.type" : "VARCHAR" , "__debezium.source.column.length" : "255" }, "connect. default " : "Doe" }, " null " ], " default " : "Doe" } ], "connect.name" : "inventory_db.inventory.customers.Value" } ], " default " : null }, { "name" : "after" , "type" : [ " null " , "Value" ], " default " : null }, { "name" : "source" , "type" : { "type" : "record" , "name" : "Source" , "namespace" : "io.debezium.connector.mysql" , "fields" : [ { "name" : "version" , "type" : "string" }, { "name" : "connector" , "type" : "string" }, { "name" : "name" , "type" : "string" }, { "name" : "ts_ms" , "type" : " long " }, { "name" : "snapshot" , "type" : [ { "type" : "string" , "connect.version" : 1, "connect.parameters" : { "allowed" : " true ,last, false " }, "connect. default " : " false " , "connect.name" : "io.debezium.data.Enum" }, " null " ], " default " : " false " }, { "name" : "db" , "type" : "string" }, { "name" : "table" , "type" : [ " null " , "string" ], " default " : null }, { "name" : "server_id" , "type" : " long " }, { "name" : "gtid" , "type" : [ " null " , "string" ], " default " : null }, { "name" : "file" , "type" : "string" }, { "name" : "pos" , "type" : " long " }, { "name" : "row" , "type" : " int " }, { "name" : "thread" , "type" : [ " null " , " long " ], " default " : null }, { "name" : "query" , "type" : [ " null " , "string" ], " default " : null } ], "connect.name" : "io.debezium.connector.mysql.Source" } }, { "name" : "op" , "type" : "string" }, { "name" : "ts_ms" , "type" : [ " null " , " long " ], " default " : null }, { "name" : "transaction" , "type" : [ " null " , { "type" : "record" , "name" : "ConnectDefault" , "namespace" : "io.confluent.connect.avro" , "fields" : [ { "name" : "id" , "type" : "string" }, { "name" : "total_order" , "type" : " long " }, { "name" : "data_collection_order" , "type" : " long " } ] } ], " default " : null } ], "connect.name" : "inventory_db.inventory.customers.Envelope" } 4. Alter columns default value ALTER TABLE customers ALTER COLUMN last_name SET DEFAULT 'foo' ; INSERT INTO customers(first_name) VALUES ( "3" ); { "before" : null , "after" : { "Value" : { "id" : 3, "first_name" : "3" , "last_name" : { "string" : "foo" } } }, "source" : { "version" : "1.1.0.Final" , "connector" : "mysql" , "name" : "inventory_db" , "ts_ms" : 1588929644000, "snapshot" : { "string" : " false " }, "db" : "inventory" , "table" : { "string" : "customers" }, "server_id" : 112233, "gtid" : { "string" : "aaab5f18-910b-11ea-965e-0242ac160003:35" }, "file" : "mysql-bin.000003" , "pos" : 2550, "row" : 0, "thread" : { " long " : 7 }, "query" : null }, "op" : "c" , "ts_ms" : { " long " : 1588929644598 }, "transaction" : null } Schema registry shows the schema is still v.2, not v.3 { "type" : "record" , "name" : "Envelope" , "namespace" : "inventory_db.inventory.customers" , "fields" : [ { "name" : "before" , "type" : [ " null " , { "type" : "record" , "name" : "Value" , "fields" : [ { "name" : "id" , "type" : { "type" : " int " , "connect.parameters" : { "__debezium.source.column.type" : "INT" , "__debezium.source.column.length" : "11" } } }, { "name" : "first_name" , "type" : { "type" : "string" , "connect.parameters" : { "__debezium.source.column.type" : "VARCHAR" , "__debezium.source.column.length" : "255" } } }, { "name" : "last_name" , "type" : [ { "type" : "string" , "connect.parameters" : { "__debezium.source.column.type" : "VARCHAR" , "__debezium.source.column.length" : "255" }, "connect. default " : "Doe" }, " null " ], " default " : "Doe" } ], "connect.name" : "inventory_db.inventory.customers.Value" } ], " default " : null }, { "name" : "after" , "type" : [ " null " , "Value" ], " default " : null }, { "name" : "source" , "type" : { "type" : "record" , "name" : "Source" , "namespace" : "io.debezium.connector.mysql" , "fields" : [ { "name" : "version" , "type" : "string" }, { "name" : "connector" , "type" : "string" }, { "name" : "name" , "type" : "string" }, { "name" : "ts_ms" , "type" : " long " }, { "name" : "snapshot" , "type" : [ { "type" : "string" , "connect.version" : 1, "connect.parameters" : { "allowed" : " true ,last, false " }, "connect. default " : " false " , "connect.name" : "io.debezium.data.Enum" }, " null " ], " default " : " false " }, { "name" : "db" , "type" : "string" }, { "name" : "table" , "type" : [ " null " , "string" ], " default " : null }, { "name" : "server_id" , "type" : " long " }, { "name" : "gtid" , "type" : [ " null " , "string" ], " default " : null }, { "name" : "file" , "type" : "string" }, { "name" : "pos" , "type" : " long " }, { "name" : "row" , "type" : " int " }, { "name" : "thread" , "type" : [ " null " , " long " ], " default " : null }, { "name" : "query" , "type" : [ " null " , "string" ], " default " : null } ], "connect.name" : "io.debezium.connector.mysql.Source" } }, { "name" : "op" , "type" : "string" }, { "name" : "ts_ms" , "type" : [ " null " , " long " ], " default " : null }, { "name" : "transaction" , "type" : [ " null " , { "type" : "record" , "name" : "ConnectDefault" , "namespace" : "io.confluent.connect.avro" , "fields" : [ { "name" : "id" , "type" : "string" }, { "name" : "total_order" , "type" : " long " }, { "name" : "data_collection_order" , "type" : " long " } ] } ], " default " : null } ], "connect.name" : "inventory_db.inventory.customers.Envelope" }
Avro schema doesn't change if an ALTER <table> ALTER COLUMN <column> SET DEFAULT <new-value> happens on a column with a default value. We've tested with JsonConverter instead of AvroConverter, and schema still doesn't change.
Debezium MySQL Connector: 1.1.0.Final
Kafka Connect: 2.3.0
Schema Registry: 5.4.0
MySQL: 5.7.21