Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-3535

SchemaNameAdjuster is too restrictive by default

    XMLWordPrintable

Details

    • Bug
    • Resolution: Done
    • Major
    • 1.9.0.CR1
    • 1.5.0.Final
    • core-library
    • None
    • False
    • False
    • Undefined
    • Hide

      Start a MySQL connector using theĀ tutorial:

      $ env DEBEZIUM_VERSION=1.5 docker-compose -f docker-compose-mysql.yaml up
      Starting tutorial_zookeeper_1 ... done
      Starting tutorial_mysql_1     ... done
      Starting tutorial_kafka_1     ... done
      Starting tutorial_connect_1   ... done
      ...
      
      $ curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json
      HTTP/1.1 201 Created
      ...
      

      Execute the following statements:

      CREATE TABLE `foo-bar-baz` (
          id INT
      );
      
      INSERT INTO `foo-bar-baz` VALUES ();
      
      RENAME TABLE `foo-bar-baz` TO `foo-bar_baz`;
      

      Observe the connector failure:

      io.debezium.DebeziumException: Error processing binlog event
      	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:369)
      	at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1118)
      	at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:966)
      	at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606)
      	at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850)
      	at java.base/java.lang.Thread.run(Thread.java:834)
      Caused by: io.debezium.DebeziumException: org.apache.kafka.connect.errors.ConnectException: The Kafka Connect schema name 'dbserver1.inventory.foo-bar_baz.Value' is not a valid Avro schema name and its replacement 'dbserver1.inventory.foo_bar_baz.Value' conflicts with another different schema 'dbserver1.inventory.foo-bar-baz.Value'
      	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleQueryEvent$1(MySqlStreamingChangeEventSource.java:584)
      	at io.debezium.pipeline.EventDispatcher.dispatchSchemaChangeEvent(EventDispatcher.java:276)
      	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleQueryEvent(MySqlStreamingChangeEventSource.java:579)
      	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:352)
      	... 5 more
      Caused by: org.apache.kafka.connect.errors.ConnectException: The Kafka Connect schema name 'dbserver1.inventory.foo-bar_baz.Value' is not a valid Avro schema name and its replacement 'dbserver1.inventory.foo_bar_baz.Value' conflicts with another different schema 'dbserver1.inventory.foo-bar-baz.Value'
      	at io.debezium.util.SchemaNameAdjuster.lambda$create$0(SchemaNameAdjuster.java:151)
      	at io.debezium.util.SchemaNameAdjuster.lambda$create$1(SchemaNameAdjuster.java:168)
      	at io.debezium.util.SchemaNameAdjuster$ReplacementOccurred.lambda$firstTimeOnly$0(SchemaNameAdjuster.java:103)
      	at io.debezium.util.SchemaNameAdjuster.validFullname(SchemaNameAdjuster.java:331)
      	at io.debezium.util.SchemaNameAdjuster.lambda$create$5(SchemaNameAdjuster.java:201)
      	at io.debezium.relational.TableSchemaBuilder.create(TableSchemaBuilder.java:104)
      	at io.debezium.relational.RelationalDatabaseSchema.buildAndRegisterSchema(RelationalDatabaseSchema.java:130)
      	at io.debezium.connector.mysql.MySqlDatabaseSchema.lambda$applySchemaChange$2(MySqlDatabaseSchema.java:168)
      	at java.base/java.lang.Iterable.forEach(Iterable.java:75)
      	at io.debezium.connector.mysql.MySqlDatabaseSchema.applySchemaChange(MySqlDatabaseSchema.java:168)
      	at io.debezium.pipeline.EventDispatcher$SchemaChangeEventReceiver.schemaChangeEvent(EventDispatcher.java:460)
      	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleQueryEvent$1(MySqlStreamingChangeEventSource.java:581)
      	... 8 more
      

      Note that the connector uses the JSON converter, so there's nothing wrong with the table/schema names above.

      Show
      Start a MySQL connector using theĀ  tutorial : $ env DEBEZIUM_VERSION=1.5 docker-compose -f docker-compose-mysql.yaml up Starting tutorial_zookeeper_1 ... done Starting tutorial_mysql_1 ... done Starting tutorial_kafka_1 ... done Starting tutorial_connect_1 ... done ... $ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json HTTP/1.1 201 Created ... Execute the following statements: CREATE TABLE `foo-bar-baz` ( id INT ); INSERT INTO `foo-bar-baz` VALUES (); RENAME TABLE `foo-bar-baz` TO `foo-bar_baz`; Observe the connector failure: io.debezium.DebeziumException: Error processing binlog event at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:369) at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1118) at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:966) at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606) at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: io.debezium.DebeziumException: org.apache.kafka.connect.errors.ConnectException: The Kafka Connect schema name 'dbserver1.inventory.foo-bar_baz.Value' is not a valid Avro schema name and its replacement 'dbserver1.inventory.foo_bar_baz.Value' conflicts with another different schema 'dbserver1.inventory.foo-bar-baz.Value' at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleQueryEvent$1(MySqlStreamingChangeEventSource.java:584) at io.debezium.pipeline.EventDispatcher.dispatchSchemaChangeEvent(EventDispatcher.java:276) at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleQueryEvent(MySqlStreamingChangeEventSource.java:579) at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:352) ... 5 more Caused by: org.apache.kafka.connect.errors.ConnectException: The Kafka Connect schema name 'dbserver1.inventory.foo-bar_baz.Value' is not a valid Avro schema name and its replacement 'dbserver1.inventory.foo_bar_baz.Value' conflicts with another different schema 'dbserver1.inventory.foo-bar-baz.Value' at io.debezium.util.SchemaNameAdjuster.lambda$create$0(SchemaNameAdjuster.java:151) at io.debezium.util.SchemaNameAdjuster.lambda$create$1(SchemaNameAdjuster.java:168) at io.debezium.util.SchemaNameAdjuster$ReplacementOccurred.lambda$firstTimeOnly$0(SchemaNameAdjuster.java:103) at io.debezium.util.SchemaNameAdjuster.validFullname(SchemaNameAdjuster.java:331) at io.debezium.util.SchemaNameAdjuster.lambda$create$5(SchemaNameAdjuster.java:201) at io.debezium.relational.TableSchemaBuilder.create(TableSchemaBuilder.java:104) at io.debezium.relational.RelationalDatabaseSchema.buildAndRegisterSchema(RelationalDatabaseSchema.java:130) at io.debezium.connector.mysql.MySqlDatabaseSchema.lambda$applySchemaChange$2(MySqlDatabaseSchema.java:168) at java.base/java.lang.Iterable.forEach(Iterable.java:75) at io.debezium.connector.mysql.MySqlDatabaseSchema.applySchemaChange(MySqlDatabaseSchema.java:168) at io.debezium.pipeline.EventDispatcher$SchemaChangeEventReceiver.schemaChangeEvent(EventDispatcher.java:460) at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleQueryEvent$1(MySqlStreamingChangeEventSource.java:581) ... 8 more Note that the connector uses the JSON converter, so there's nothing wrong with the table/schema names above.

    Description

      There are a few issues about the SchemaNameAdjuster logic:

      1. The default implementation is AVRO-specific (see DBZ-2511).
      2. There's no configuration to provide an implementation that would correspond to the converter being used (e.g. JSON).
      3. Although SchemaNameAdjuster is declared an interface, the core framework uses its static API (e.g. SchemaNameAdjuster#isValidFullname(String)) which is also AVRO-specific.

      Specifically, it becomes a problem under the following circumstances:

      1. A JSON converter is used.
      2. The team that runs the connector is not in control of the source database schema. In a multi-tenant environment, customers and other technical specialists can execute arbitrary statements on the database (e.g. create and rename tables).
      Expected behavior:
      1. The default adjuster implementation is as permissive as possible. Ideally, a null object.
      2. The adjuster implementation is configurable.
      3. By default, the adjuster implementation is derived from the converter (e.g. AVRO, JSON, etc.)

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              sergeimorozov Sergei Morozov
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: