Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-5674

Keyword virtual can be used as an identifier

XMLWordPrintable

    • False
    • None
    • False

      Bug report

      "DDL statement couldn't be parsed. Please open a Jira issue with the statement 'CREATE DEFINER"

      Background:  

      Pulling  data from Latest version Mariadb in AWS.

      For each databases in a list based on rexex

      pull listed tables and route to shared topics , 

      schema_a.table_1  -> union_topic_tabble_1

      schema_b.table_1 -> union_topic_tabble_1

      Number of schemas: 1380

      Tabbles pr schema: 80

      Total tables for connector: 1380*80 =  110.400

      Topics:  80 + 2 , schema and dbhistory.

       Server properties: 

      DBMS: MariaDB (ver. 10.5.15-MariaDB-log)  Case sensitivity: plain=exact, delimited=exact Driver: MariaDB Connector/J (ver. 3.0.7, JDBC4.2

      What Debezium connector do you use and what version?

      "class": "io.debezium.connector.mysql.MySqlConnector",
       "type": "source",
       "version": "1.9.5.Final"

      What is the connector configuration?

       "name": "mariadb-src-client-master-c4",
      "config":
        {
          "connector.class": "io.debezium.connector.mysql.MySqlConnector",
          "database.history.consumer.sasl.jaas.config": "${file:/etc/secrets/env.variables:JAAS_CONFIG}",
          "database.history.consumer.sasl.mechanism": "SCRAM-SHA-512",
          "database.history.consumer.security.protocol": "SASL_SSL",
          "database.history.kafka.bootstrap.servers": "${file:/etc/secrets/env.variables:BOOTSTRAP_SERVERS_CONFIG}",
          "database.history.kafka.topic": "saltdata-kafka-connect-dbhistory.mariadb-client-master-c40",                                                                                
          "database.history.producer.sasl.jaas.config": "${file:/etc/secrets/env.variables:JAAS_CONFIG}",
          "database.history.producer.sasl.mechanism": "SCRAM-SHA-512",
          "database.history.producer.security.protocol": "SASL_SSL",
          "database.hostname": "the-host.rds.amazonaws.com",
          "database.include.list": "prod_client_.*",
          "database.password": "${file:/etc/secrets/master-db-src.properties:PASSWORD}",
          "database.server.name": "mariadb-master-c4",
          "database.user": "${file:/etc/secrets/client-db-src.properties:USERNAME}",
          "header.converter": "org.apache.kafka.connect.storage.StringConverter",
          "key.converter": "io.confluent.connect.avro.AvroConverter",
          "key.converter.schema.registry.url": "http://arch-artifacts-schemaregistry.schemaregistry.svc.cluster.local:8082",
          "value.converter.schema.registry.url": "http://arch-artifacts-schemaregistry.schemaregistry.svc.cluster.local:8082"
          "value.converter": "io.confluent.connect.avro.AvroConverter",                        
          "key.converter.schemas.enable": "true",                
          "snapshot.locking.mode": "none",
          "snapshot.mode": "when_needed",
          "table.include.list": ".\\.account_relations,.\\.table1,.\\.table2,.
      .table3,.
      .desk,.",
          "poll.interval.ms": "240000",
          "errors.retry.timeout": "480000",                        
          "tasks.max": "1",
          "transforms": "Reroute",
          "transforms.Reroute.topic.regex": "(?:.?)\.(?:.?)\.(.*)",
          "transforms.Reroute.topic.replacement": "connect_prefix-$1__$2",
          "transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",                        
        }
      }'<Your answer>

      What is the captured database version and mode of depoyment?

      AWS - RDS

      DBMS: MariaDB (ver. 10.5.15-MariaDB-log)  Case sensitivity: plain=exact, delimited=exact Driver: MariaDB Connector/J (ver. 3.0.7, JDBC4.2

      What behaviour do you expect?

      Expect selected tables for all databases based on regex to be registered and pulled to respective topics.

      What behaviour do you see?

      Partial registering of tables  with this error coming after a random period of time. 

       

      // code placeholder
      org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
      	at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:50)
      	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:369)
      	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$25(MySqlStreamingChangeEventSource.java:869)
      	at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1125)
      	at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:973)
      	at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:599)
      	at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:857)
      	at java.base/java.lang.Thread.run(Thread.java:829)
      Caused by: io.debezium.DebeziumException: Error processing binlog event
      	... 7 more
      Caused by: io.debezium.text.ParsingException: DDL statement couldn't be parsed. Please open a Jira issue with the statement 'CREATE DEFINER=`prod_migrations`@`%` PROCEDURE `upsert_virtual_item`(IN name VARCHAR(45), IN type TINYINT UNSIGNED)
      BEGIN
                      SET @merchantId := (SELECT merchant_id FROM merchant LIMIT 1);
                          IF @merchantId > 0 THEN
                              SET @rows := (SELECT COUNT(*) FROM item WHERE item_type = type);
                              IF @rows > 0 THEN
                                  UPDATE item SET
                                      merchant_id = @merchantId,
                                      cz_title = name,
                                      price = 0,
                                      orderer = 2,
                                      takeaway = 0,
                                      currency_id = (
                                          SELECT currency_currency_id
                                          FROM merchant
                                          WHERE merchant_id = @merchantId
                                      ),
                                      tax_vat_id = (
                                          SELECT tax_vat.tax_vat_id
                                          FROM tax_vat
                                          JOIN merchant
                                              ON merchant.place_country_id = tax_vat.country_id
                                              AND merchant.merchant_id = @merchantId
                                          WHERE tax_vat.default = 1
                                      ),
                                      item_measure_id = 1,
                                      kitchen_print = 0,
                                      deleted = 0,
                                      virtual = 1
                                  WHERE item_type = type;
                              ELSE
                                  INSERT INTO item SET
                                      merchant_id = @merchantId,
                                      cz_title = name,
                                      price = 0,
                                      orderer = 2,
                                      takeaway = 0,
                                      currency_id = (
                                          SELECT currency_currency_id
                                          FROM merchant
                                          WHERE merchant_id = @merchantId
                                      ),
                                      tax_vat_id = (
                                          SELECT tax_vat.tax_vat_id
                                          FROM tax_vat
                                          JOIN merchant
                                              ON merchant.place_country_id = tax_vat.country_id
                                              AND merchant.merchant_id = @merchantId
                                          WHERE tax_vat.default = 1
                                      ),
                                      item_measure_id = 1,
                                      kitchen_print = 0,
                                      deleted = 0,
                                      virtual = 1,
                                      item_type = type
                                  ;
                              END IF;
                          END IF;
                  END'
      no viable alternative at input 'CREATE DEFINER=`prod_migrations`@`%` PROCEDURE `upsert_virtual_item`(IN name VARCHAR(45), IN type TINYINT UNSIGNED)\nBEGIN\n                SET @merchantId := (SELECT merchant_id FROM merchant LIMIT 1);\n                    IF @merchantId > 0 THEN\n                        SET @rows := (SELECT COUNT(*) FROM item WHERE item_type = type);\n                        IF @rows > 0 THEN\n                            UPDATE item SET\n                                merchant_id = @merchantId,\n                                cz_title = name,\n                                price = 0,\n                                orderer = 2,\n                                takeaway = 0,\n                                currency_id = (\n                                    SELECT currency_currency_id\n                                    FROM merchant\n                                    WHERE merchant_id = @merchantId\n                                ),\n                                tax_vat_id = (\n                                    SELECT tax_vat.tax_vat_id\n                                    FROM tax_vat\n                                    JOIN merchant\n                                        ON merchant.place_country_id = tax_vat.country_id\n                                        AND merchant.merchant_id = @merchantId\n                                    WHERE tax_vat.default = 1\n                                ),\n                                item_measure_id = 1,\n                                kitchen_print = 0,\n                                deleted = 0,\n                                virtual ='
      	at io.debezium.antlr.ParsingErrorListener.syntaxError(ParsingErrorListener.java:43)
      	at org.antlr.v4.runtime.ProxyErrorListener.syntaxError(ProxyErrorListener.java:41)
      	at org.antlr.v4.runtime.Parser.notifyErrorListeners(Parser.java:544)
      	at org.antlr.v4.runtime.DefaultErrorStrategy.reportNoViableAlternative(DefaultErrorStrategy.java:310)
      	at org.antlr.v4.runtime.DefaultErrorStrategy.reportError(DefaultErrorStrategy.java:136)
      	at io.debezium.ddl.parser.mysql.generated.MySqlParser.sqlStatements(MySqlParser.java:1228)
      	at io.debezium.ddl.parser.mysql.generated.MySqlParser.root(MySqlParser.java:950)
      	at io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser.parseTree(MySqlAntlrDdlParser.java:73)
      	at io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser.parseTree(MySqlAntlrDdlParser.java:45)
      	at io.debezium.antlr.AntlrDdlParser.parse(AntlrDdlParser.java:82)
      	at io.debezium.connector.mysql.MySqlDatabaseSchema.parseDdl(MySqlDatabaseSchema.java:224)
      	at io.debezium.connector.mysql.MySqlDatabaseSchema.parseStreamingDdl(MySqlDatabaseSchema.java:210)
      	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleQueryEvent(MySqlStreamingChangeEventSource.java:566)
      	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$14(MySqlStreamingChangeEventSource.java:841)
      	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:349)
      	... 6 more
      Caused by: org.antlr.v4.runtime.NoViableAltException
      	at org.antlr.v4.runtime.atn.ParserATNSimulator.noViableAlt(ParserATNSimulator.java:2026)
      	at org.antlr.v4.runtime.atn.ParserATNSimulator.execATN(ParserATNSimulator.java:467)
      	at org.antlr.v4.runtime.atn.ParserATNSimulator.adaptivePredict(ParserATNSimulator.java:393)
      	at io.debezium.ddl.parser.mysql.generated.MySqlParser.sqlStatements(MySqlParser.java:1026)
      	... 15 more 

       

       

      Do you see the same behaviour using the latest relesead Debezium version?

      Have not tried with latest version.

      Do you have the connector logs, ideally from start till finish?

      (You might be asked later to provide DEBUG/TRACE level log)

      I do not have access to modify log level .  Kafka cluster is 

      How to reproduce the issue using our tutorial deployment?

      TBD

      Feature request or enhancement

      For feature requests or enhancements, provide this information, please:

      Which use case/requirement will be addressed by the proposed feature?

      Integration cluster of single tenant databases to multi tenant DW.

      Implementation ideas (optional)

      <Your answer>

              Unassigned Unassigned
              einarkann Einar Kristinsson (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

                Created:
                Updated:
                Resolved: