Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-6104

java.lang.NullPointerException in MySQL connector with max.queue.size.in.bytes

XMLWordPrintable

    • False
    • None
    • False
    • Important

      Bug report

      Getting the below exception when checking connector status:

       

      java.lang.NullPointerException
          at io.debezium.connector.base.ChangeEventQueue.drainRecords(ChangeEventQueue.java:299)
          at io.debezium.connector.base.ChangeEventQueue.poll(ChangeEventQueue.java:258)
          at io.debezium.connector.mysql.MySqlConnectorTask.doPoll(MySqlConnectorTask.java:201)
          at io.debezium.connector.common.BaseSourceTask.poll(BaseSourceTask.java:170)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:452)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:346)
          at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
          at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:72)
          at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
          at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
          at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
          at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
          at java.base/java.lang.Thread.run(Thread.java:829)
      

      What Debezium connector do you use and what version?

      Official docker image for 2.1.2.Final with confluent libraries added, MySQL connector

      What is the connector configuration?

      The exception only happens when max.queue.size.in.bytes is set to non-zero value. After I removed this parameter the exception has gone. Below is configuration with max.queue.size.in.bytes set (failing configuration).

       

      {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "database.allowPublicKeyRetrieval": "true",
        "database.history.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required   username=\"mysql-user\"   password=\"mysql-password\";",
        "database.history.consumer.sasl.mechanism": "SCRAM-SHA-512",
        "database.history.consumer.security.protocol": "SASL_SSL",
        "database.history.consumer.ssl.truststore.location": "/certs/custom.jks",
        "database.history.consumer.ssl.truststore.password": "truststore-pass",
        "database.history.kafka.bootstrap.servers": "kafka-broker:9091,another-broker:9091",
        "database.history.kafka.recovery.poll.interval.ms": "30000",
        "database.history.kafka.topic": "database_name_stream.change-history",
        "database.history.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required   username=\"mysql-user\"   password=\"mysql-password\";",
        "database.history.producer.sasl.mechanism": "SCRAM-SHA-512",
        "database.history.producer.security.protocol": "SASL_SSL",
        "database.history.producer.ssl.truststore.location": "/certs/custom.jks",
        "database.history.producer.ssl.truststore.password": "truststore-pass",
        "database.history.sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"mysql-user\" password=\"mysql-password\";",
        "database.hostname": "mysql-server",
        "database.include.list": "database_name",
        "database.password": "db-password",
        "database.port": "3306",
        "database.server.id": "184200",
        "database.ssl.mode": "required",
        "database.server.name": "some-server-name",
        "database.user": "mysql-user",
        "include.schema.changes": "true",
        "inconsistent.schema.handling.mode": "fail",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.basic.auth.credentials.source": "USER_INFO",
        "key.converter.basic.auth.user.info": "mysql-user:mysql-password",
        "key.converter.schema.registry.ssl.truststore.location": "/certs/custom.jks",
        "key.converter.schema.registry.ssl.truststore.password": "truststore-pass",
        "key.converter.schema.registry.url": "https://schema-registry:443/",
        "key.converter.schemas.enable": "true",
        "max.batch.size": "8192",
        "max.queue.size": "32768",
        "max.queue.size.in.bytes": "268435456",
        "read.only": "true",
        "schema.history.internal.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required   username=\"mysql-user\"   password=\"mysql-password\";",
        "schema.history.internal.consumer.sasl.mechanism": "SCRAM-SHA-512",
        "schema.history.internal.consumer.security.protocol": "SASL_SSL",
        "schema.history.internal.consumer.ssl.truststore.location": "/certs/custom.jks",
        "schema.history.internal.consumer.ssl.truststore.password": "truststore-pass",
        "schema.history.internal.kafka.bootstrap.servers": "kafka-broker:9091,another-broker:9091",
        "schema.history.internal.kafka.topic": "main-stream.debezium-change-history",
        "schema.history.internal.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required   username=\"mysql-user\"   password=\"mysql-password\";",
        "schema.history.internal.producer.sasl.mechanism": "SCRAM-SHA-512",
        "schema.history.internal.producer.security.protocol": "SASL_SSL",
        "schema.history.internal.producer.ssl.truststore.location": "/certs/custom.jks",
        "schema.history.internal.producer.ssl.truststore.password": "truststore-pass",
        "schema.name.adjustment.mode": "avro",
        "signal.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required   username=\"mysql-user\"   password=\"mysql-password\";",
        "signal.consumer.sasl.mechanism": "SCRAM-SHA-512",
        "signal.consumer.security.protocol": "SASL_SSL",
        "signal.consumer.ssl.truststore.location": "/certs/custom.jks",
        "signal.consumer.ssl.truststore.password": "truststore-pass",
        "signal.kafka.bootstrap.servers": "kafka-broker:9091,another-broker:9091",
        "signal.kafka.topic": "main-stream.debezium-signals",
        "snapshot.locking.mode": "none",
        "snapshot.mode": "schema_only",
        "table.exclude.list": "database_name.excluded_table",
        "tasks.max": "1",
        "tombstones.on.delete": "false",
        "topic.creation.default.partitions": "6",
        "topic.creation.default.replication.factor": "3",
        "topic.creation.enable": "true",
        "topic.prefix": "main-stream",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.basic.auth.credentials.source": "USER_INFO",
        "value.converter.basic.auth.user.info": "mysql-user:mysql-password",
        "value.converter.schema.registry.ssl.truststore.location": "/certs/custom.jks",
        "value.converter.schema.registry.ssl.truststore.password": "truststore-pass",
        "value.converter.schema.registry.url": "https://schema-registry:443/",
        "value.converter.schemas.enable": "true"
      }

       

      What is the captured database version and mode of depoyment?

      Managed MySQL 5.7 in cloud

      What behaviour do you expect?

      Connector working

      What behaviour do you see?

      java.lang.NullPointerException

      Do you see the same behaviour using the latest relesead Debezium version?

      Tried to build custom image (same confluent libraries being added) with 2.2.0.Alpha1 - exception happens.

      Do you have the connector logs, ideally from start till finish?

      Nothing suspicious in logs

      How to reproduce the issue using our tutorial deployment?

      Run connector with configuration provided, pay attention to max.queue.size.in.bytes option.

              Unassigned Unassigned
              kirill.popov Kirill Popov (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

                Created:
                Updated:
                Resolved: