Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-2140

Avro schema doesn't change if a column default value is dropped

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Done
    • Icon: Major Major
    • 1.2.0.CR1
    • 1.2.0.Beta2
    • mysql-connector
    • None
    • Hide

      1. Configurations

      MySQL Connector configuration

      {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "max.queue.size": "327680",
        "database.history.kafka.topic": "schema-changes.inventory",
        "database.history.connector.id": "inventory_debezium",
        "include.schema.changes": "true",
        "table.whitelist": "inventory.customers",
        "decimal.handling.mode": "string",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "database.whitelist": "inventory",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "database.user": "binlog",
        "database.history.kafka.bootstrap.servers": "kafka:19092",
        "database.server.name": "inventory",
        "database.port": "3306",
        "column.propagate.source.type": ".*",
        "value.converter.schema.registry.url": "http://schema-registry:8081",
        "database.history.connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "database.hostname": "mysql",
        "database.password": "123456",
        "name": "inventory_debezium",
        "database.history.store.only.monitored.tables.ddl": "true",
        "max.batch.size": "81920",
        "include.query": "true",
        "key.converter.schema.registry.url": "http://schema-registry:8081",
        "snapshot.mode": "schema_only"
      }
      

      Schema Registry

      {"compatibilityLevel":"NONE"}
      

      Kafka Connect

      bootstrap.servers=kafka:9092
      group.id=connect-cluster
      
      key.converter=org.apache.kafka.connect.json.JsonConverter
      value.converter=org.apache.kafka.connect.json.JsonConverter
      
      key.converter.schemas.enable=true
      value.converter.schemas.enable=true
      
      offset.storage.topic=connect-offsets
      offset.storage.replication.factor=1
      offset.storage.partitions=1
      
      config.storage.topic=connect-configs
      config.storage.partition=1
      config.storage.replication.factor=1
      
      status.storage.topic=connect-status
      status.storage.replication.factor=1
      status.storage.partitions=1
      
      offset.flush.interval.ms=10000
      
      plugin.path=/kafka/connect
      
      access.control.allow.methods=GET,POST,PUT,DELETE,OPTIONS
      access.control.allow.origin=*
      

      2. Create the table

      CREATE TABLE customers(
      	id INT PRIMARY KEY AUTO_INCREMENT,
      	max_level INT DEFAULT 25
      );
      

      3. Insert a row

      INSERT INTO customers() VALUES();
      

      Kafka message

      {
        "before": null,
        "after": {
          "Value": {
            "id": 1,
            "max_level": {
              "int": 25
            }
          }
        },
        "source": {
          "version": "1.2.0.Beta2",
          "connector": "mysql",
          "name": "inventory",
          "ts_ms": 1591098326000,
          "snapshot": {
            "string": "false"
          },
          "db": "inventory",
          "table": {
            "string": "customers"
          },
          "server_id": 112233,
          "gtid": {
            "string": "df1ace0a-a4c5-11ea-a54c-0242ac160003:31"
          },
          "file": "mysql-bin.000003",
          "pos": 1618,
          "row": 0,
          "thread": {
            "long": 6
          },
          "query": null
        },
        "op": "c",
        "ts_ms": {
          "long": 1591098326228
        },
        "transaction": null
      }
      

      Schema v1

      {
        "type": "record",
        "name": "Envelope",
        "namespace": "inventory.inventory.customers",
        "fields": [
          {
            "name": "before",
            "type": [
              "null",
              {
                "type": "record",
                "name": "Value",
                "fields": [
                  {
                    "name": "id",
                    "type": {
                      "type": "int",
                      "connect.parameters": {
                        "__debezium.source.column.type": "INT"
                      }
                    }
                  },
                  {
                    "name": "max_level",
                    "type": [
                      {
                        "type": "int",
                        "connect.parameters": {
                          "__debezium.source.column.type": "INT"
                        },
                        "connect.default": 25
                      },
                      "null"
                    ],
                    "default": 25
                  }
                ],
                "connect.name": "inventory.inventory.customers.Value"
              }
            ],
            "default": null
          },
          {
            "name": "after",
            "type": [
              "null",
              "Value"
            ],
            "default": null
          },
          {
            "name": "source",
            "type": {
              "type": "record",
              "name": "Source",
              "namespace": "io.debezium.connector.mysql",
              "fields": [
                {
                  "name": "version",
                  "type": "string"
                },
                {
                  "name": "connector",
                  "type": "string"
                },
                {
                  "name": "name",
                  "type": "string"
                },
                {
                  "name": "ts_ms",
                  "type": "long"
                },
                {
                  "name": "snapshot",
                  "type": [
                    {
                      "type": "string",
                      "connect.version": 1,
                      "connect.parameters": {
                        "allowed": "true,last,false"
                      },
                      "connect.default": "false",
                      "connect.name": "io.debezium.data.Enum"
                    },
                    "null"
                  ],
                  "default": "false"
                },
                {
                  "name": "db",
                  "type": "string"
                },
                {
                  "name": "table",
                  "type": [
                    "null",
                    "string"
                  ],
                  "default": null
                },
                {
                  "name": "server_id",
                  "type": "long"
                },
                {
                  "name": "gtid",
                  "type": [
                    "null",
                    "string"
                  ],
                  "default": null
                },
                {
                  "name": "file",
                  "type": "string"
                },
                {
                  "name": "pos",
                  "type": "long"
                },
                {
                  "name": "row",
                  "type": "int"
                },
                {
                  "name": "thread",
                  "type": [
                    "null",
                    "long"
                  ],
                  "default": null
                },
                {
                  "name": "query",
                  "type": [
                    "null",
                    "string"
                  ],
                  "default": null
                }
              ],
              "connect.name": "io.debezium.connector.mysql.Source"
            }
          },
          {
            "name": "op",
            "type": "string"
          },
          {
            "name": "ts_ms",
            "type": [
              "null",
              "long"
            ],
            "default": null
          },
          {
            "name": "transaction",
            "type": [
              "null",
              {
                "type": "record",
                "name": "ConnectDefault",
                "namespace": "io.confluent.connect.avro",
                "fields": [
                  {
                    "name": "id",
                    "type": "string"
                  },
                  {
                    "name": "total_order",
                    "type": "long"
                  },
                  {
                    "name": "data_collection_order",
                    "type": "long"
                  }
                ]
              }
            ],
            "default": null
          }
        ],
        "connect.name": "inventory.inventory.customers.Envelope"
      }
      

      4. Drop default

      ALTER TABLE customers ALTER COLUMN max_level DROP DEFAULT;
      

      5. Insert another row

      INSERT INTO customers(max_level) VALUES(10);
      

      Kafka message

      {
        "before": null,
        "after": {
          "Value": {
            "id": 2,
            "max_level": {
              "int": 10
            }
          }
        },
        "source": {
          "version": "1.2.0.Beta2",
          "connector": "mysql",
          "name": "inventory",
          "ts_ms": 1591098354000,
          "snapshot": {
            "string": "false"
          },
          "db": "inventory",
          "table": {
            "string": "customers"
          },
          "server_id": 112233,
          "gtid": {
            "string": "df1ace0a-a4c5-11ea-a54c-0242ac160003:33"
          },
          "file": "mysql-bin.000003",
          "pos": 2099,
          "row": 0,
          "thread": {
            "long": 6
          },
          "query": null
        },
        "op": "c",
        "ts_ms": {
          "long": 1591098354757
        },
        "transaction": null
      }
      

      Schema is still v1, expecting v2

      {
        "type": "record",
        "name": "Envelope",
        "namespace": "inventory.inventory.customers",
        "fields": [
          {
            "name": "before",
            "type": [
              "null",
              {
                "type": "record",
                "name": "Value",
                "fields": [
                  {
                    "name": "id",
                    "type": {
                      "type": "int",
                      "connect.parameters": {
                        "__debezium.source.column.type": "INT"
                      }
                    }
                  },
                  {
                    "name": "max_level",
                    "type": [
                      {
                        "type": "int",
                        "connect.parameters": {
                          "__debezium.source.column.type": "INT"
                        },
                        "connect.default": 25
                      },
                      "null"
                    ],
                    "default": 25
                  }
                ],
                "connect.name": "inventory.inventory.customers.Value"
              }
            ],
            "default": null
          },
          {
            "name": "after",
            "type": [
              "null",
              "Value"
            ],
            "default": null
          },
          {
            "name": "source",
            "type": {
              "type": "record",
              "name": "Source",
              "namespace": "io.debezium.connector.mysql",
              "fields": [
                {
                  "name": "version",
                  "type": "string"
                },
                {
                  "name": "connector",
                  "type": "string"
                },
                {
                  "name": "name",
                  "type": "string"
                },
                {
                  "name": "ts_ms",
                  "type": "long"
                },
                {
                  "name": "snapshot",
                  "type": [
                    {
                      "type": "string",
                      "connect.version": 1,
                      "connect.parameters": {
                        "allowed": "true,last,false"
                      },
                      "connect.default": "false",
                      "connect.name": "io.debezium.data.Enum"
                    },
                    "null"
                  ],
                  "default": "false"
                },
                {
                  "name": "db",
                  "type": "string"
                },
                {
                  "name": "table",
                  "type": [
                    "null",
                    "string"
                  ],
                  "default": null
                },
                {
                  "name": "server_id",
                  "type": "long"
                },
                {
                  "name": "gtid",
                  "type": [
                    "null",
                    "string"
                  ],
                  "default": null
                },
                {
                  "name": "file",
                  "type": "string"
                },
                {
                  "name": "pos",
                  "type": "long"
                },
                {
                  "name": "row",
                  "type": "int"
                },
                {
                  "name": "thread",
                  "type": [
                    "null",
                    "long"
                  ],
                  "default": null
                },
                {
                  "name": "query",
                  "type": [
                    "null",
                    "string"
                  ],
                  "default": null
                }
              ],
              "connect.name": "io.debezium.connector.mysql.Source"
            }
          },
          {
            "name": "op",
            "type": "string"
          },
          {
            "name": "ts_ms",
            "type": [
              "null",
              "long"
            ],
            "default": null
          },
          {
            "name": "transaction",
            "type": [
              "null",
              {
                "type": "record",
                "name": "ConnectDefault",
                "namespace": "io.confluent.connect.avro",
                "fields": [
                  {
                    "name": "id",
                    "type": "string"
                  },
                  {
                    "name": "total_order",
                    "type": "long"
                  },
                  {
                    "name": "data_collection_order",
                    "type": "long"
                  }
                ]
              }
            ],
            "default": null
          }
        ],
        "connect.name": "inventory.inventory.customers.Envelope"
      }
      
      Show
      1. Configurations MySQL Connector configuration { "connector.class" : "io.debezium.connector.mysql.MySqlConnector" , "max.queue.size" : "327680" , "database.history.kafka.topic" : "schema-changes.inventory" , "database.history.connector.id" : "inventory_debezium" , "include.schema.changes" : " true " , "table.whitelist" : "inventory.customers" , "decimal.handling.mode" : "string" , "value.converter" : "io.confluent.connect.avro.AvroConverter" , "database.whitelist" : "inventory" , "key.converter" : "io.confluent.connect.avro.AvroConverter" , "database.user" : "binlog" , "database.history.kafka.bootstrap.servers" : "kafka:19092" , "database.server.name" : "inventory" , "database.port" : "3306" , "column.propagate.source.type" : ".*" , "value.converter.schema.registry.url" : "http: //schema-registry:8081" , "database.history.connector.class" : "io.debezium.connector.mysql.MySqlConnector" , "database.hostname" : "mysql" , "database.password" : "123456" , "name" : "inventory_debezium" , "database.history.store.only.monitored.tables.ddl" : " true " , "max.batch.size" : "81920" , "include.query" : " true " , "key.converter.schema.registry.url" : "http: //schema-registry:8081" , "snapshot.mode" : "schema_only" } Schema Registry { "compatibilityLevel" : "NONE" } Kafka Connect bootstrap.servers=kafka:9092 group.id=connect-cluster key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable= true value.converter.schemas.enable= true offset.storage.topic=connect-offsets offset.storage.replication.factor=1 offset.storage.partitions=1 config.storage.topic=connect-configs config.storage.partition=1 config.storage.replication.factor=1 status.storage.topic=connect-status status.storage.replication.factor=1 status.storage.partitions=1 offset.flush.interval.ms=10000 plugin.path=/kafka/connect access.control.allow.methods=GET,POST,PUT,DELETE,OPTIONS access.control.allow.origin=* 2. Create the table CREATE TABLE customers( id INT PRIMARY KEY AUTO_INCREMENT , max_level INT DEFAULT 25 ); 3. Insert a row INSERT INTO customers() VALUES (); Kafka message { "before" : null , "after" : { "Value" : { "id" : 1, "max_level" : { " int " : 25 } } }, "source" : { "version" : "1.2.0.Beta2" , "connector" : "mysql" , "name" : "inventory" , "ts_ms" : 1591098326000, "snapshot" : { "string" : " false " }, "db" : "inventory" , "table" : { "string" : "customers" }, "server_id" : 112233, "gtid" : { "string" : "df1ace0a-a4c5-11ea-a54c-0242ac160003:31" }, "file" : "mysql-bin.000003" , "pos" : 1618, "row" : 0, "thread" : { " long " : 6 }, "query" : null }, "op" : "c" , "ts_ms" : { " long " : 1591098326228 }, "transaction" : null } Schema v1 { "type" : "record" , "name" : "Envelope" , "namespace" : "inventory.inventory.customers" , "fields" : [ { "name" : "before" , "type" : [ " null " , { "type" : "record" , "name" : "Value" , "fields" : [ { "name" : "id" , "type" : { "type" : " int " , "connect.parameters" : { "__debezium.source.column.type" : "INT" } } }, { "name" : "max_level" , "type" : [ { "type" : " int " , "connect.parameters" : { "__debezium.source.column.type" : "INT" }, "connect. default " : 25 }, " null " ], " default " : 25 } ], "connect.name" : "inventory.inventory.customers.Value" } ], " default " : null }, { "name" : "after" , "type" : [ " null " , "Value" ], " default " : null }, { "name" : "source" , "type" : { "type" : "record" , "name" : "Source" , "namespace" : "io.debezium.connector.mysql" , "fields" : [ { "name" : "version" , "type" : "string" }, { "name" : "connector" , "type" : "string" }, { "name" : "name" , "type" : "string" }, { "name" : "ts_ms" , "type" : " long " }, { "name" : "snapshot" , "type" : [ { "type" : "string" , "connect.version" : 1, "connect.parameters" : { "allowed" : " true ,last, false " }, "connect. default " : " false " , "connect.name" : "io.debezium.data.Enum" }, " null " ], " default " : " false " }, { "name" : "db" , "type" : "string" }, { "name" : "table" , "type" : [ " null " , "string" ], " default " : null }, { "name" : "server_id" , "type" : " long " }, { "name" : "gtid" , "type" : [ " null " , "string" ], " default " : null }, { "name" : "file" , "type" : "string" }, { "name" : "pos" , "type" : " long " }, { "name" : "row" , "type" : " int " }, { "name" : "thread" , "type" : [ " null " , " long " ], " default " : null }, { "name" : "query" , "type" : [ " null " , "string" ], " default " : null } ], "connect.name" : "io.debezium.connector.mysql.Source" } }, { "name" : "op" , "type" : "string" }, { "name" : "ts_ms" , "type" : [ " null " , " long " ], " default " : null }, { "name" : "transaction" , "type" : [ " null " , { "type" : "record" , "name" : "ConnectDefault" , "namespace" : "io.confluent.connect.avro" , "fields" : [ { "name" : "id" , "type" : "string" }, { "name" : "total_order" , "type" : " long " }, { "name" : "data_collection_order" , "type" : " long " } ] } ], " default " : null } ], "connect.name" : "inventory.inventory.customers.Envelope" } 4. Drop default ALTER TABLE customers ALTER COLUMN max_level DROP DEFAULT ; 5. Insert another row INSERT INTO customers(max_level) VALUES (10); Kafka message { "before" : null , "after" : { "Value" : { "id" : 2, "max_level" : { " int " : 10 } } }, "source" : { "version" : "1.2.0.Beta2" , "connector" : "mysql" , "name" : "inventory" , "ts_ms" : 1591098354000, "snapshot" : { "string" : " false " }, "db" : "inventory" , "table" : { "string" : "customers" }, "server_id" : 112233, "gtid" : { "string" : "df1ace0a-a4c5-11ea-a54c-0242ac160003:33" }, "file" : "mysql-bin.000003" , "pos" : 2099, "row" : 0, "thread" : { " long " : 6 }, "query" : null }, "op" : "c" , "ts_ms" : { " long " : 1591098354757 }, "transaction" : null } Schema is still v1, expecting v2 { "type" : "record" , "name" : "Envelope" , "namespace" : "inventory.inventory.customers" , "fields" : [ { "name" : "before" , "type" : [ " null " , { "type" : "record" , "name" : "Value" , "fields" : [ { "name" : "id" , "type" : { "type" : " int " , "connect.parameters" : { "__debezium.source.column.type" : "INT" } } }, { "name" : "max_level" , "type" : [ { "type" : " int " , "connect.parameters" : { "__debezium.source.column.type" : "INT" }, "connect. default " : 25 }, " null " ], " default " : 25 } ], "connect.name" : "inventory.inventory.customers.Value" } ], " default " : null }, { "name" : "after" , "type" : [ " null " , "Value" ], " default " : null }, { "name" : "source" , "type" : { "type" : "record" , "name" : "Source" , "namespace" : "io.debezium.connector.mysql" , "fields" : [ { "name" : "version" , "type" : "string" }, { "name" : "connector" , "type" : "string" }, { "name" : "name" , "type" : "string" }, { "name" : "ts_ms" , "type" : " long " }, { "name" : "snapshot" , "type" : [ { "type" : "string" , "connect.version" : 1, "connect.parameters" : { "allowed" : " true ,last, false " }, "connect. default " : " false " , "connect.name" : "io.debezium.data.Enum" }, " null " ], " default " : " false " }, { "name" : "db" , "type" : "string" }, { "name" : "table" , "type" : [ " null " , "string" ], " default " : null }, { "name" : "server_id" , "type" : " long " }, { "name" : "gtid" , "type" : [ " null " , "string" ], " default " : null }, { "name" : "file" , "type" : "string" }, { "name" : "pos" , "type" : " long " }, { "name" : "row" , "type" : " int " }, { "name" : "thread" , "type" : [ " null " , " long " ], " default " : null }, { "name" : "query" , "type" : [ " null " , "string" ], " default " : null } ], "connect.name" : "io.debezium.connector.mysql.Source" } }, { "name" : "op" , "type" : "string" }, { "name" : "ts_ms" , "type" : [ " null " , " long " ], " default " : null }, { "name" : "transaction" , "type" : [ " null " , { "type" : "record" , "name" : "ConnectDefault" , "namespace" : "io.confluent.connect.avro" , "fields" : [ { "name" : "id" , "type" : "string" }, { "name" : "total_order" , "type" : " long " }, { "name" : "data_collection_order" , "type" : " long " } ] } ], " default " : null } ], "connect.name" : "inventory.inventory.customers.Envelope" }

      Avro schema doesn't change if an `ALTER <table> ALTER COLUMN <column> DROP DEFAULT` happens on a column with a default value. We've tested AvroConverter.

      Debezium MySQL Connector: 1.2.0.Beta2
      Kafka Connect: 2.3.0
      Schema Registry: 5.4.0
      MySQL: 5.7.21

              jpechane Jiri Pechanec
              igungor Ibrahim Gungor (Inactive)
              Votes:
              1 Vote for this issue
              Watchers:
              2 Start watching this issue

                Created:
                Updated:
                Resolved: