Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-2254

Problem removing tables from "table.whitelist" while the connector is not running.

    Details

    • Steps to Reproduce:
      Hide
      1. Create debezium connector with config "table.whitelist=A, B"
      2. Insert rows into mysql table. Change events (inserts) of table A and B are sent to respective kafka topics.
      3. Delete debezium connector
      4. Insert more rows into table A and B
      5. Create debezium connector with config "table.whitelist=A" (B is removed)
      6. Although table B is not whitelisted change events of table B, generated while the connector is not running,  is sent to kafka topic. (this is not expected)
      7. Change events that are generated after the connector is created is not sent to kafka topic. (this is expected)
      Show
      Create debezium connector with config "table.whitelist=A, B" Insert rows into mysql table. Change events (inserts) of table A and B are sent to respective kafka topics. Delete debezium connector Insert more rows into table A and B Create debezium connector with config "table.whitelist=A" (B is removed) Although table B is not whitelisted change events of table B, generated while the connector is not running,  is sent to kafka topic. (this is not expected) Change events that are generated after the connector is created is not sent to kafka topic. (this is expected)

      Description

      "table.whitelist" config is not working as expected. Details are in the `Steps to Reproduce` section.

      I have also attached kafka connect logs.

      In the logs, MySqlConnectorTask seems to start twice with two different `table.whitelist` configurations.

       

       

      2020-06-21 07:17:52,854 INFO   ||  Kafka version: 2.3.0   [org.apache.kafka.common.utils.AppInfoParser]
      2020-06-21 07:17:52,855 INFO   ||  Kafka commitId: fc1aaa116b661c8a   [org.apache.kafka.common.utils.AppInfoParser]
      2020-06-21 07:17:52,855 INFO   ||  Kafka startTimeMs: 1592723872854   [org.apache.kafka.common.utils.AppInfoParser]
      2020-06-21 07:17:52,861 INFO   ||  Starting MySqlConnectorTask with configuration:   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:52,866 INFO   ||     connector.class = io.debezium.connector.mysql.MySqlConnector   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:52,867 INFO   ||     max.queue.size = 327680   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:52,867 INFO   ||     database.history.kafka.topic = dbhistory.app_inventory   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:52,867 INFO   ||     database.history.connector.id = app_inventory   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:52,867 INFO   ||     include.schema.changes = true   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:52,867 INFO   ||     table.whitelist = inventory.customers,inventory.products   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:52,867 INFO   ||     decimal.handling.mode = string   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:52,867 INFO   ||     value.converter = io.confluent.connect.avro.AvroConverter   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:52,868 INFO   ||     database.whitelist = inventory   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:52,868 INFO   ||     key.converter = io.confluent.connect.avro.AvroConverter   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:52,868 INFO   ||     database.user = binlog   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:52,868 INFO   ||     database.history.kafka.bootstrap.servers = kafka:19092   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:52,868 INFO   ||     database.server.name = app_inventory   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:52,868 INFO   ||     database.port = 3306   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:52,868 INFO   ||     column.propagate.source.type = .*   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:52,868 INFO   ||     value.converter.schema.registry.url = http://schema-registry:8081   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:52,868 INFO   ||     task.class = io.debezium.connector.mysql.MySqlConnectorTask   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:52,868 INFO   ||     database.history.connector.class = io.debezium.connector.mysql.MySqlConnector   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:52,868 INFO   ||     database.hostname = mysql   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:52,869 INFO   ||     database.password = ********   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:52,869 INFO   ||     name = app_inventory_debezium   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:52,869 INFO   ||     database.history.store.only.monitored.tables.ddl = true   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:52,869 INFO   ||     max.batch.size = 81920   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:52,869 INFO   ||     include.query = true   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:52,869 INFO   ||     key.converter.schema.registry.url = http://schema-registry:8081   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:52,870 INFO   ||     snapshot.mode = schema_only_recovery   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:52,955 INFO   ||  [Producer clientId=connector-producer-app_inventory_debezium-0] Cluster ID: tUD2MFDgRyKUjhUdUhXWGQ   [org.apache.kafka.clients.Metadata].....
      
      ...
      
      2020-06-21 07:17:53,421 INFO   ||  Kafka version: 2.3.0   [org.apache.kafka.common.utils.AppInfoParser]
      2020-06-21 07:17:53,421 INFO   ||  Kafka commitId: fc1aaa116b661c8a   [org.apache.kafka.common.utils.AppInfoParser]
      2020-06-21 07:17:53,421 INFO   ||  Kafka startTimeMs: 1592723873421   [org.apache.kafka.common.utils.AppInfoParser]
      2020-06-21 07:17:53,423 INFO   ||  [Worker clientId=connect-1, groupId=connect-cluster] Finished starting connectors and tasks   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
      2020-06-21 07:17:53,425 INFO   ||  Starting MySqlConnectorTask with configuration:   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:53,425 INFO   ||     connector.class = io.debezium.connector.mysql.MySqlConnector   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:53,426 INFO   ||     max.queue.size = 327680   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:53,428 INFO   ||     database.history.kafka.topic = dbhistory.app_inventory   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:53,429 INFO   ||     database.history.connector.id = app_inventory   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:53,429 INFO   ||     include.schema.changes = true   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:53,429 INFO   ||     table.whitelist = inventory.customers   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:53,429 INFO   ||     decimal.handling.mode = string   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:53,429 INFO   ||     value.converter = io.confluent.connect.avro.AvroConverter   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:53,429 INFO   ||     database.whitelist = inventory   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:53,429 INFO   ||     key.converter = io.confluent.connect.avro.AvroConverter   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:53,430 INFO   ||     database.user = binlog   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:53,430 INFO   ||     database.history.kafka.bootstrap.servers = kafka:19092   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:53,430 INFO   ||     database.server.name = app_inventory   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:53,430 INFO   ||     database.port = 3306   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:53,430 INFO   ||     column.propagate.source.type = .*   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:53,430 INFO   ||     value.converter.schema.registry.url = http://schema-registry:8081   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:53,430 INFO   ||     task.class = io.debezium.connector.mysql.MySqlConnectorTask   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:53,430 INFO   ||     database.history.connector.class = io.debezium.connector.mysql.MySqlConnector   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:53,430 INFO   ||     database.hostname = mysql   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:53,431 INFO   ||     database.password = ********   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:53,431 INFO   ||     name = app_inventory_debezium   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:53,431 INFO   ||     database.history.store.only.monitored.tables.ddl = true   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:53,431 INFO   ||     max.batch.size = 81920   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:53,431 INFO   ||     include.query = true   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:53,431 INFO   ||     key.converter.schema.registry.url = http://schema-registry:8081   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:53,431 INFO   ||     snapshot.mode = schema_only_recovery   [io.debezium.connector.common.BaseSourceTask]
      2020-06-21 07:17:53,524 INFO   ||  [Producer clientId=connector-producer-app_inventory_debezium-0] Cluster ID: tUD2MFDgRyKUjhUdUhXWGQ   [org.apache.kafka.clients.Metadata]
      

       

       

      KAFKA_VERSION=2.3.0

        Gliffy Diagrams

          Attachments

            Issue Links

              Activity

                People

                • Assignee:
                  Unassigned
                  Reporter:
                  ilkinulas İlkin Balkanay
                • Votes:
                  0 Vote for this issue
                  Watchers:
                  2 Start watching this issue

                  Dates

                  • Created:
                    Updated: