Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-4166

Producer failure NullPointerException

XMLWordPrintable

    • False
    • False

      I got this error and then the connector stopped. Log file is attached.
      I really appreciate any help or workaround to make it work.

      Unable to find source-code formatter for language: log. Available languages are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, perl, php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, yaml
      connect_1  | 2021-10-19 09:21:38,722 INFO   MySQL|dbserver|snapshot  Snapshot step 7 - Snapshotting data   [io.debezium.relational.RelationalSnapshotChangeEventSource]
      connect_1  | 2021-10-19 09:21:38,726 INFO   MySQL|dbserver|snapshot  Snapshotting contents of 1 tables while still in transaction   [io.debezium.relational.RelationalSnapshotChangeEventSource]
      connect_1  | 2021-10-19 09:21:38,751 INFO   MySQL|dbserver|snapshot  Snapshot - Final stage   [io.debezium.pipeline.source.AbstractSnapshotChangeEventSource]
      connect_1  | 2021-10-19 09:21:38,752 ERROR  MySQL|dbserver|snapshot  Producer failure   [io.debezium.pipeline.ErrorHandler]
      connect_1  | io.debezium.DebeziumException: java.lang.NullPointerException
      connect_1  | 	at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:79)
      connect_1  | 	at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:118)
      connect_1  | 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
      connect_1  | 	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      connect_1  | 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      connect_1  | 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      connect_1  | 	at java.base/java.lang.Thread.run(Thread.java:829)
      connect_1  | Caused by: java.lang.NullPointerException
      connect_1  | 	at io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEventsForTable(RelationalSnapshotChangeEventSource.java:340)
      connect_1  | 	at io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEvents(RelationalSnapshotChangeEventSource.java:315)
      connect_1  | 	at io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:135)
      connect_1  | 	at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:70)
      connect_1  | 	... 6 more
      connect_1  | 2021-10-19 09:21:38,834 INFO   ||  WorkerSourceTask{id=inventory-connector-0} flushing 17 outstanding messages for offset commit   [org.apache.kafka.connect.runtime.WorkerSourceTask]
      connect_1  | 2021-10-19 09:21:39,377 ERROR  ||  WorkerSourceTask{id=inventory-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted   [org.apache.kafka.connect.runtime.WorkerTask]
      connect_1  | org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
      
      

      DDL statement couldn't be parsed. Please open a Jira issue with the statement:

      CREATE TABLE `cached_sales` (
         `id` bigint unsigned NOT NULL AUTO_INCREMENT,
         `sale_id` bigint unsigned NOT NULL,
         `sale_item_id` bigint unsigned NOT NULL,
         `retailer_id` bigint unsigned DEFAULT NULL,
         `retailer_branch_id` bigint unsigned DEFAULT NULL,
         `retailer_branch_location_id` bigint unsigned NOT NULL,
         `sales_area_id` bigint unsigned DEFAULT NULL,
         `product_id` bigint unsigned DEFAULT NULL,
         `product_variation_id` bigint unsigned NOT NULL,
         `season_ids` json DEFAULT NULL,
         `category_ids` json DEFAULT NULL,
         `color_ids` json DEFAULT NULL,
         `size_ids` json DEFAULT NULL,
         `gender_ids` json DEFAULT NULL,
         `city` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
         `country_code` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
         `location_type` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
         `edi_enabled` tinyint(1) DEFAULT NULL,
         `quantity` int DEFAULT NULL,
         `brand_normalized_purchase_price_net` int DEFAULT NULL,
         `brand_normalized_selling_price_gross` int DEFAULT NULL,
         `item_updated_at` datetime DEFAULT NULL,
         `sold_at` datetime DEFAULT NULL,
         `created_at` timestamp NULL DEFAULT NULL,
         `updated_at` timestamp NULL DEFAULT NULL,
         `tenant_id` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
         PRIMARY KEY (`id`),
         UNIQUE KEY `cached_sales_sale_item_id_unique` (`sale_item_id`),
         KEY `cached_sales_sale_id_foreign` (`sale_id`),
         KEY `cached_sales_retailer_id_foreign` (`retailer_id`),
         KEY `cached_sales_retailer_branch_id_foreign` (`retailer_branch_id`),
         KEY `cached_sales_retailer_branch_location_id_foreign` (`retailer_branch_location_id`),
         KEY `cached_sales_sales_area_id_foreign` (`sales_area_id`),
         KEY `cached_sales_product_id_foreign` (`product_id`),
         KEY `cached_sales_product_variation_id_foreign` (`product_variation_id`),
         KEY `cached_sales_city_index` (`city`),
         KEY `cached_sales_country_code_index` (`country_code`),
         KEY `cached_sales_location_type_index` (`location_type`),
         KEY `cached_sales_sold_at_index` (`sold_at`),
         KEY `cached_sales_season_ids_index` ((cast(json_extract(`season_ids`,_utf8mb4'$') as unsigned array))),
         KEY `cached_sales_category_ids_index` ((cast(json_extract(`category_ids`,_utf8mb4'$') as unsigned array))),
         KEY `cached_sales_color_ids_index` ((cast(json_extract(`color_ids`,_utf8mb4'$') as unsigned array))),
         KEY `cached_sales_size_ids_index` ((cast(json_extract(`size_ids`,_utf8mb4'$') as unsigned array))),
         KEY `cached_sales_gender_ids_index` ((cast(json_extract(`gender_ids`,_utf8mb4'$') as unsigned array))),
         CONSTRAINT `cached_sales_product_id_foreign` FOREIGN KEY (`product_id`) REFERENCES `products` (`id`),
         CONSTRAINT `cached_sales_product_variation_id_foreign` FOREIGN KEY (`product_variation_id`) REFERENCES `product_variations` (`id`),
         CONSTRAINT `cached_sales_retailer_branch_id_foreign` FOREIGN KEY (`retailer_branch_id`) REFERENCES `retailer_branches` (`id`),
         CONSTRAINT `cached_sales_retailer_branch_location_id_foreign` FOREIGN KEY (`retailer_branch_location_id`) REFERENCES `retailer_branch_locations` (`id`),
         CONSTRAINT `cached_sales_retailer_id_foreign` FOREIGN KEY (`retailer_id`) REFERENCES `retailers` (`id`),
         CONSTRAINT `cached_sales_sale_id_foreign` FOREIGN KEY (`sale_id`) REFERENCES `sales` (`id`),
         CONSTRAINT `cached_sales_sale_item_id_foreign` FOREIGN KEY (`sale_item_id`) REFERENCES `sale_items` (`id`) ON DELETE CASCADE,
         CONSTRAINT `cached_sales_sales_area_id_foreign` FOREIGN KEY (`sales_area_id`) REFERENCES `sales_areas` (`id`)
       ) ENGINE=InnoDB AUTO_INCREMENT=13594436 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
      

      Create Connector:

      curl --location --request POST 'http://localhost:8083/connectors' \
      --header 'Content-Type: application/json' \
      --data-raw '{
          "name": "inventory-connector",
          "config": {
              "connector.class": "io.debezium.connector.mysql.MySqlConnector",
              "tasks.max": "1",
              "database.hostname": "127.0.0.1",
              "database.port": "3306",
              "database.user": "******",
              "database.password": "******",
              "database.server.id": "184054",
              "database.server.name": "dbserver",
              "database.include.list": "mydb",
              "table.include.list":"mydb.cached_sales",
              "database.history.kafka.bootstrap.servers": "confluentcloud:9092",
              "database.history.kafka.topic": "schema-changes-we.products",
              "database.history.producer.ssl.endpoint.identification.algorithm": "https",
              "database.history.producer.security.protocol": "SASL_SSL",
              "database.history.producer.sasl.mechanism": "PLAIN",
              "database.history.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username='\''******'\'' password='\''******'\'';",
              "database.history.consumer.ssl.endpoint.identification.algorithm": "https",
              "database.history.consumer.security.protocol": "SASL_SSL",
              "database.history.consumer.sasl.mechanism": "PLAIN",
              "database.history.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username='\''*****'\'' password='\''********'\'';",
              "auto.create.topics.enable": true,
              "database.history.skip.unparseable.ddl": true,
              "event.deserialization.failure.handling.mode": "warn",
              "topic.creation.enabled": true,
              "request.timeout.ms": "20000",
              "producer.request.timeout.ms": "20000",
              "consumer.request.timeout.ms": "20000",
              "offset.flush.interval.ms": "10000",
              "offset.flush.timeout.ms":"50000",
              "producer.retry.backoff.ms": "500",
              "consumer.retry.backoff.ms": "500",
              "topic.creation.default.replication.factor": -1,
              "topic.creation.default.partitions": 1,
              "snapshot.locking.mode": "none",
              "event.processing.failure.handling.mode": "warn",
              "snapshot.mode": "initial",
              "producer.override.max.request.size": "7000000"
          }
      }'
      

        1. docker-compose.yml
          4 kB
          Payam Yousefi
        2. output.log
          23 kB
          Payam Yousefi

              Unassigned Unassigned
              payam_ysf Payam Yousefi (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

                Created:
                Updated:
                Resolved: