Details
-
Bug
-
Resolution: Obsolete
-
Major
-
None
-
1.2.5.Final
-
None
-
False
-
False
Description
I want to use Debezium to replicate MySQL 5.7.26-29-log Percona Server table.
When running connector I'm getting following exception.
org.apache.kafka.connect.errors.ConnectException: Multiple parsing errorsorg.apache.kafka.connect.errors.ConnectException: Multiple parsing errorsio.debezium.text.ParsingException: no viable alternative at input 'CREATE DEFINER=`dev`@`%` PROCEDURE `usp_InsertDiscountTransactions`(\n\tp_storeId BIGINT,\n\tp_discountId BIGINT,\n p_tranDate DATETIME(3),\n p_typeId INT,\n p_refId BIGINT,\n p_refNumber VARCHAR(200),\n p_discountAmount DECIMAL(18, 4),\n p_customerId BIGINT,\n p_userId BIGINT,\n p_now DATETIME(3)\n)\nBEGIN\n\tDECLARE limitt INT;\n\tDECLARE isLimitt BIT;\n\tDECLARE currentUsed INT;\n DECLARE discountStatus INT;\n DECLARE isOncePerCustomer BIT;\n DECLARE discountId BIGINT;\n\tDECLARE isDataChanged INT;\n\tSET isDataChanged = 0;\n \n\tSELECT DiscountId, LimitTimes , IsLimit , IFNULL(Used, 0), OncePerCustomer, DiscountStatus\n INTO discountId, limitt , isLimitt , currentUsed, isOncePerCustomer, discountStatus\n\tFROM Discounts \n\tWHERE Id = p_discountId \n\tAND StoreId = p_storeId FOR UPDATE ;\n \n IF (discountId IS NULL OR discountId = 0) THEN\n\t\tSET isDataChanged = 2; \n ELSEIF (discountStatus = 2 OR discountStatus = 4) THEN\n\t\tSET isDataChanged = 3; \n END IF;\n\n\tIF (isDataChanged <> 0 AND (( isLimitt = 0 AND limitt IS NOT NULL AND currentUsed IS NOT NULL AND currentUsed < limitt ) OR isLimitt = 1)) THEN\n\t\tUPDATE Discounts\n\t\tSET Used = currentUsed + 1\n\t\tWHERE Id = p_discountId\n\t\tAND StoreId = p_storeId AND Used = currentUsed ;\n\n\t\tIF ROW_COUNT() = 0 THEN \n\t\t\tSET isDataChanged = 3;\n END IF ;\n\tELSEIF (isOncePerCustomer = 1) THEN\n\t\tINSERT INTO DiscountTransactions (`StoreId`, `DiscountId`, `TranDate`, `TypeId`, `RefId`, `RefNumber`, `DiscountAmount`, `CustomerId`, `CreatedUser`, `CreatedDate`, `UpdatedUser`, `UpdatedDate`)\n\t\tSELECT * \n FROM ( SELECT \n p_storeId as StoreId\n , p_discountId as DiscountId\n , p_tranDate as TranDate\n , p_typeId as TypeId\n , p_refId as RefId\n , p_refNumber as RefNumber\n , p_discountAmount as DiscountAmount\n , p_customerId as CustomerId\n , p_userId as CreatedUser\n , p_now as CreatedDate\n , p_userId as UpdatedUser\n , p_now as UpdatedDate) AS tmp\n\t\tWHERE NOT EXISTS (\n\t\t\tSELECT Id FROM DiscountTransactions WHERE StoreId = p_storeId AND DiscountId = p_discountId AND TypeId = p_typeId AND CustomerId = p_customerId\n\t\t) LIMIT 1;\n\t\t-- IF row_count() = 0 THEN \n\t\t-'io.debezium.text.ParsingException: mismatched input 'SET' expecting <EOF> at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:298) at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:101) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:214) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:178) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:228) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)Caused by: Multiple parsing errorsio.debezium.text.ParsingException: no viable alternative at input 'CREATE DEFINER=`dev`@`%` PROCEDURE `usp_InsertDiscountTransactions`(\n\tp_storeId BIGINT,\n\tp_discountId BIGINT,\n p_tranDate DATETIME(3),\n p_typeId INT,\n p_refId BIGINT,\n p_refNumber VARCHAR(200),\n p_discountAmount DECIMAL(18, 4),\n p_customerId BIGINT,\n p_userId BIGINT,\n p_now DATETIME(3)\n)\nBEGIN\n\tDECLARE limitt INT;\n\tDECLARE isLimitt BIT;\n\tDECLARE currentUsed INT;\n DECLARE discountStatus INT;\n DECLARE isOncePerCustomer BIT;\n DECLARE discountId BIGINT;\n\tDECLARE isDataChanged INT;\n\tSET isDataChanged = 0;\n \n\tSELECT DiscountId, LimitTimes , IsLimit , IFNULL(Used, 0), OncePerCustomer, DiscountStatus\n INTO discountId, limitt , isLimitt , currentUsed, isOncePerCustomer, discountStatus\n\tFROM Discounts \n\tWHERE Id = p_discountId \n\tAND StoreId = p_storeId FOR UPDATE ;\n \n IF (discountId IS NULL OR discountId = 0) THEN\n\t\tSET isDataChanged = 2; \n ELSEIF (discountStatus = 2 OR discountStatus = 4) THEN\n\t\tSET isDataChanged = 3; \n END IF;\n\n\tIF (isDataChanged <> 0 AND (( isLimitt = 0 AND limitt IS NOT NULL AND currentUsed IS NOT NULL AND currentUsed < limitt ) OR isLimitt = 1)) THEN\n\t\tUPDATE Discounts\n\t\tSET Used = currentUsed + 1\n\t\tWHERE Id = p_discountId\n\t\tAND StoreId = p_storeId AND Used = currentUsed ;\n\n\t\tIF ROW_COUNT() = 0 THEN \n\t\t\tSET isDataChanged = 3;\n END IF ;\n\tELSEIF (isOncePerCustomer = 1) THEN\n\t\tINSERT INTO DiscountTransactions (`StoreId`, `DiscountId`, `TranDate`, `TypeId`, `RefId`, `RefNumber`, `DiscountAmount`, `CustomerId`, `CreatedUser`, `CreatedDate`, `UpdatedUser`, `UpdatedDate`)\n\t\tSELECT * \n FROM ( SELECT \n p_storeId as StoreId\n , p_discountId as DiscountId\n , p_tranDate as TranDate\n , p_typeId as TypeId\n , p_refId as RefId\n , p_refNumber as RefNumber\n , p_discountAmount as DiscountAmount\n , p_customerId as CustomerId\n , p_userId as CreatedUser\n , p_now as CreatedDate\n , p_userId as UpdatedUser\n , p_now as UpdatedDate) AS tmp\n\t\tWHERE NOT EXISTS (\n\t\t\tSELECT Id FROM DiscountTransactions WHERE StoreId = p_storeId AND DiscountId = p_discountId AND TypeId = p_typeId AND CustomerId = p_customerId\n\t\t) LIMIT 1;\n\t\t- IF row_count() = 0 THEN \n\t\t--'io.debezium.text.ParsingException: mismatched input 'SET' expecting <EOF> at io.debezium.antlr.AntlrDdlParser.throwParsingException(AntlrDdlParser.java:351) at io.debezium.antlr.AntlrDdlParser.parse(AntlrDdlParser.java:93) at io.debezium.relational.history.AbstractDatabaseHistory.lambda$recover$1(AbstractDatabaseHistory.java:115) at io.debezium.relational.history.KafkaDatabaseHistory.recoverRecords(KafkaDatabaseHistory.java:307) at io.debezium.relational.history.AbstractDatabaseHistory.recover(AbstractDatabaseHistory.java:82) at io.debezium.connector.mysql.MySqlSchema.loadHistory(MySqlSchema.java:256) at io.debezium.connector.mysql.MySqlTaskContext.loadHistory(MySqlTaskContext.java:164) at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:110) ... 9 more