-
Bug
-
Resolution: Unresolved
-
Minor
-
3.2.0.Final
-
None
-
False
-
-
False
-start bash:
bin/connect-standalone.sh config/connect-standalone.properties config/mysql-sink.properties
-config/connect-standalone.properties
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=1000
plugin.path=/Users/lyq/Desktop/debezium-connector-jdbc
-config/mysql-sink.properties
name=mysql-sink
connector.class=io.debezium.connector.jdbc.JdbcSinkConnector
consumer.override.auto.offset.reset=latest
tasks.max=1
database.time_zone=UTC
connection.url=jdbc:mysql://localhost:3306/betalpha
connection.username=root
connection.password=1234567890
schema.evolution=basic
primary.key.mode=record_key
primary.key.fields=day_id,shop_id,course_id,article_id
insert.mode=upsert
topics=order_statistics
-arroyo pipline
CREATE TABLE payment_source ( order_id STRING, pay_time TIMESTAMP, shop_id STRING, user_id BIGINT, course_id BIGINT, article_id BIGINT, count INT, actual_payment BIGINT, total_price BIGINT ) WITH ( connector = 'kafka', bootstrap_servers = '127.0.0.1:9092', topic = 'payment-success', type = 'source', format = 'json' ); CREATE TABLE order_statistics_sink ( day_id BIGINT, shop_id STRING, course_id BIGINT, article_id BIGINT, order_cnt BIGINT, order_amount BIGINT, order_user_cnt BIGINT ) WITH ( connector = 'kafka', bootstrap_servers = '127.0.0.1:9092', topic = 'order_statistics', type = 'sink', format = 'debezium_json', 'json.include_schema' = 'true' ); INSERT INTO order_statistics_sink SELECT CAST( date_part ('epoch', date_trunc ('day', pay_time)) / 86400 AS BIGINT ) AS day_id, shop_id, course_id, article_id, SUM(count) AS order_cnt, SUM(actual_payment) AS order_amount, COUNT(DISTINCT user_id) AS order_user_cnt FROM payment_source GROUP BY CAST( date_part ('epoch', date_trunc ('day', pay_time)) / 86400 AS BIGINT ), shop_id, course_id, article_id;
-order_statistics topic in kafka:(key is null, the before is null)
null: { "payload": { "after": { "article_id": 600000, "course_id": 500000, "day_id": 20294, "order_amount": 509869, "order_cnt": 10, "order_user_cnt": 1, "shop_id": "SHOP100000" }, "before": null, "op": "c" }, "schema": { "fields": [ { "field": "before", "fields": [ { "field": "day_id", "optional": true, "type": "int64" }, { "field": "shop_id", "optional": true, "type": "string" }, { "field": "course_id", "optional": true, "type": "int64" }, { "field": "article_id", "optional": true, "type": "int64" }, { "field": "order_cnt", "optional": true, "type": "int32" }, { "field": "order_amount", "optional": true, "type": "int64" }, { "field": "order_user_cnt", "optional": false, "type": "int32" } ], "optional": true, "type": "struct" }, { "field": "after", "fields": [ { "field": "day_id", "optional": true, "type": "int64" }, { "field": "shop_id", "optional": true, "type": "string" }, { "field": "course_id", "optional": true, "type": "int64" }, { "field": "article_id", "optional": true, "type": "int64" }, { "field": "order_cnt", "optional": true, "type": "int32" }, { "field": "order_amount", "optional": true, "type": "int64" }, { "field": "order_user_cnt", "optional": false, "type": "int32" } ], "optional": true, "type": "struct" }, { "field": "op", "optional": true, "type": "string" } ], "name": "ArroyoJson", "optional": false, "type": "struct" } }
-kafka connector log(I pulled down the source code and printed the log, only to find that it was caused by before=null):(The log I added with 【】)
[2025-07-25 17:41:16,047] INFO [mysql-sink|task-0] 【】Filtered payload: Struct{after=Struct{day_id=20294,shop_id=SHOP100000,course_id=500000,article_id=600000,order_cnt=10,order_amount=509869,order_user_cnt=1},op=c} (io.debezium.connector.jdbc.JdbcKafkaSinkRecord:101)[2025-07-25 17:41:16,048] INFO [mysql-sink|task-0] 【】Field: Field{name=before, index=0, schema=Schema{STRUCT}} (io.debezium.connector.jdbc.JdbcKafkaSinkRecord:103)[2025-07-25 17:41:16,048] INFO [mysql-sink|task-0] 【】allFields: {} (io.debezium.connector.jdbc.JdbcKafkaSinkRecord:105)[2025-07-25 17:41:16,051] INFO [mysql-sink|task-0] 【】descriptor: FieldDescriptor{schema=Schema{STRUCT}, name='before', isKey='false', columnName='before'} (io.debezium.connector.jdbc.JdbcKafkaSinkRecord:111)[2025-07-25 17:41:16,051] INFO [mysql-sink|task-0] 【】dialect: io.debezium.connector.jdbc.dialect.mysql.MySqlDatabaseDialect@415fc844 (io.debezium.connector.jdbc.JdbcKafkaSinkRecord:112)[2025-07-25 17:41:16,051] INFO [mysql-sink|task-0] 【】Resolving schema type for schema 'Schema{STRUCT}' (io.debezium.connector.jdbc.dialect.GeneralDatabaseDialect:454)[2025-07-25 17:41:16,051] INFO [mysql-sink|task-0] 【】Resolving schema type for schema 'null' (io.debezium.connector.jdbc.dialect.GeneralDatabaseDialect:455)[2025-07-25 17:41:16,053] ERROR [mysql-sink|task-0] Failed to process record: Cannot invoke "String.hashCode()" because "<local3>" is null (io.debezium.connector.jdbc.JdbcSinkConnectorTask:137)java.lang.NullPointerException: Cannot invoke "String.hashCode()" because "<local3>" is null at io.debezium.connector.jdbc.dialect.GeneralDatabaseDialect.getSchemaType(GeneralDatabaseDialect.java:489) at io.debezium.connector.jdbc.JdbcKafkaSinkRecord.lambda$nonKeyFieldNames$2(JdbcKafkaSinkRecord.java:113) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)