Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-9284

Mixing a Debezium event payload with a Struct without the ".Envelope" suffix causes obscure NPE

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Unresolved
    • Icon: Minor Minor
    • Backlog
    • 3.2.0.Final
    • jdbc-connector
    • None
    • False
    • Hide

      None

      Show
      None
    • False

      -start bash:

      bin/connect-standalone.sh config/connect-standalone.properties config/mysql-sink.properties

      -config/connect-standalone.properties

      bootstrap.servers=localhost:9092
      key.converter=org.apache.kafka.connect.storage.StringConverter
      value.converter.schemas.enable=true
      value.converter=org.apache.kafka.connect.json.JsonConverter offset.storage.file.filename=/tmp/connect.offsets
      offset.flush.interval.ms=1000
      plugin.path=/Users/lyq/Desktop/debezium-connector-jdbc

      -config/mysql-sink.properties

      name=mysql-sink
      connector.class=io.debezium.connector.jdbc.JdbcSinkConnector
      consumer.override.auto.offset.reset=latest
      tasks.max=1
      database.time_zone=UTC
      connection.url=jdbc:mysql://localhost:3306/betalpha
      connection.username=root
      connection.password=1234567890
      schema.evolution=basic
      primary.key.mode=record_key
      primary.key.fields=day_id,shop_id,course_id,article_id
      insert.mode=upsert
      topics=order_statistics

      -arroyo pipline

       

      CREATE TABLE
          payment_source (
              order_id STRING,
              pay_time TIMESTAMP,
              shop_id STRING,
              user_id BIGINT,
              course_id BIGINT,
              article_id BIGINT,
              count INT,
              actual_payment BIGINT,
              total_price BIGINT
          )
      WITH
          (
              connector = 'kafka',
              bootstrap_servers = '127.0.0.1:9092',
              topic = 'payment-success',
              type = 'source',
              format = 'json'
          );
      CREATE TABLE
          order_statistics_sink (
              day_id BIGINT,
              shop_id STRING,
              course_id BIGINT,
              article_id BIGINT,
              order_cnt BIGINT,
              order_amount BIGINT,
              order_user_cnt BIGINT
          )
      WITH
          (
              connector = 'kafka',
              bootstrap_servers = '127.0.0.1:9092',
              topic = 'order_statistics',
              type = 'sink',
              format = 'debezium_json',
              'json.include_schema' = 'true'
          );
      INSERT INTO
          order_statistics_sink
      SELECT
          CAST(
              date_part ('epoch', date_trunc ('day', pay_time)) / 86400 AS BIGINT
          ) AS day_id,
          shop_id,
          course_id,
          article_id,
          SUM(count) AS order_cnt,
          SUM(actual_payment) AS order_amount,
          COUNT(DISTINCT user_id) AS order_user_cnt
      FROM
          payment_source
      GROUP BY
          CAST(
              date_part ('epoch', date_trunc ('day', pay_time)) / 86400 AS BIGINT
          ),
          shop_id,
          course_id,
          article_id;
      

      -order_statistics topic in kafka:(key is null, the before is null)

      null: {
          "payload": {
              "after": {
                  "article_id": 600000,
                  "course_id": 500000,
                  "day_id": 20294,
                  "order_amount": 509869,
                  "order_cnt": 10,
                  "order_user_cnt": 1,
                  "shop_id": "SHOP100000"
              },
              "before": null,
              "op": "c"
          },
          "schema": {
              "fields": [
                  {
                      "field": "before",
                      "fields": [
                          {
                              "field": "day_id",
                              "optional": true,
                              "type": "int64"
                          },
                          {
                              "field": "shop_id",
                              "optional": true,
                              "type": "string"
                          },
                          {
                              "field": "course_id",
                              "optional": true,
                              "type": "int64"
                          },
                          {
                              "field": "article_id",
                              "optional": true,
                              "type": "int64"
                          },
                          {
                              "field": "order_cnt",
                              "optional": true,
                              "type": "int32"
                          },
                          {
                              "field": "order_amount",
                              "optional": true,
                              "type": "int64"
                          },
                          {
                              "field": "order_user_cnt",
                              "optional": false,
                              "type": "int32"
                          }
                      ],
                      "optional": true,
                      "type": "struct"
                  },
                  {
                      "field": "after",
                      "fields": [
                          {
                              "field": "day_id",
                              "optional": true,
                              "type": "int64"
                          },
                          {
                              "field": "shop_id",
                              "optional": true,
                              "type": "string"
                          },
                          {
                              "field": "course_id",
                              "optional": true,
                              "type": "int64"
                          },
                          {
                              "field": "article_id",
                              "optional": true,
                              "type": "int64"
                          },
                          {
                              "field": "order_cnt",
                              "optional": true,
                              "type": "int32"
                          },
                          {
                              "field": "order_amount",
                              "optional": true,
                              "type": "int64"
                          },
                          {
                              "field": "order_user_cnt",
                              "optional": false,
                              "type": "int32"
                          }
                      ],
                      "optional": true,
                      "type": "struct"
                  },
                  {
                      "field": "op",
                      "optional": true,
                      "type": "string"
                  }
              ],
              "name": "ArroyoJson",
              "optional": false,
              "type": "struct"
          }
      }

      -kafka connector log(I pulled down the source code and printed the log, only to find that it was caused by before=null):(The log I added with 【】)

      [2025-07-25 17:41:16,047] INFO [mysql-sink|task-0] 【】Filtered payload: Struct{after=Struct{day_id=20294,shop_id=SHOP100000,course_id=500000,article_id=600000,order_cnt=10,order_amount=509869,order_user_cnt=1},op=c} (io.debezium.connector.jdbc.JdbcKafkaSinkRecord:101)[2025-07-25 17:41:16,048] INFO [mysql-sink|task-0] 【】Field: Field{name=before, index=0, schema=Schema{STRUCT}} (io.debezium.connector.jdbc.JdbcKafkaSinkRecord:103)[2025-07-25 17:41:16,048] INFO [mysql-sink|task-0] 【】allFields: {} (io.debezium.connector.jdbc.JdbcKafkaSinkRecord:105)[2025-07-25 17:41:16,051] INFO [mysql-sink|task-0] 【】descriptor: FieldDescriptor{schema=Schema{STRUCT}, name='before', isKey='false', columnName='before'} (io.debezium.connector.jdbc.JdbcKafkaSinkRecord:111)[2025-07-25 17:41:16,051] INFO [mysql-sink|task-0] 【】dialect: io.debezium.connector.jdbc.dialect.mysql.MySqlDatabaseDialect@415fc844 (io.debezium.connector.jdbc.JdbcKafkaSinkRecord:112)[2025-07-25 17:41:16,051] INFO [mysql-sink|task-0] 【】Resolving schema type for schema 'Schema{STRUCT}' (io.debezium.connector.jdbc.dialect.GeneralDatabaseDialect:454)[2025-07-25 17:41:16,051] INFO [mysql-sink|task-0] 【】Resolving schema type for schema 'null' (io.debezium.connector.jdbc.dialect.GeneralDatabaseDialect:455)[2025-07-25 17:41:16,053] ERROR [mysql-sink|task-0] Failed to process record: Cannot invoke "String.hashCode()" because "<local3>" is null (io.debezium.connector.jdbc.JdbcSinkConnectorTask:137)java.lang.NullPointerException: Cannot invoke "String.hashCode()" because "<local3>" is null	at io.debezium.connector.jdbc.dialect.GeneralDatabaseDialect.getSchemaType(GeneralDatabaseDialect.java:489)	at io.debezium.connector.jdbc.JdbcKafkaSinkRecord.lambda$nonKeyFieldNames$2(JdbcKafkaSinkRecord.java:113)	at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)	at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)	at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)

              Unassigned Unassigned
              eken6 eken L (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

                Created:
                Updated: