-
Bug
-
Resolution: Done
-
Major
-
2.1.2.Final, 2.2.0.Alpha1
-
None
Bug report
Getting the below exception when checking connector status:
java.lang.NullPointerException at io.debezium.connector.base.ChangeEventQueue.drainRecords(ChangeEventQueue.java:299) at io.debezium.connector.base.ChangeEventQueue.poll(ChangeEventQueue.java:258) at io.debezium.connector.mysql.MySqlConnectorTask.doPoll(MySqlConnectorTask.java:201) at io.debezium.connector.common.BaseSourceTask.poll(BaseSourceTask.java:170) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:452) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:346) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:72) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829)
What Debezium connector do you use and what version?
Official docker image for 2.1.2.Final with confluent libraries added, MySQL connector
What is the connector configuration?
The exception only happens when max.queue.size.in.bytes is set to non-zero value. After I removed this parameter the exception has gone. Below is configuration with max.queue.size.in.bytes set (failing configuration).
{ "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.allowPublicKeyRetrieval": "true", "database.history.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"mysql-user\" password=\"mysql-password\";", "database.history.consumer.sasl.mechanism": "SCRAM-SHA-512", "database.history.consumer.security.protocol": "SASL_SSL", "database.history.consumer.ssl.truststore.location": "/certs/custom.jks", "database.history.consumer.ssl.truststore.password": "truststore-pass", "database.history.kafka.bootstrap.servers": "kafka-broker:9091,another-broker:9091", "database.history.kafka.recovery.poll.interval.ms": "30000", "database.history.kafka.topic": "database_name_stream.change-history", "database.history.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"mysql-user\" password=\"mysql-password\";", "database.history.producer.sasl.mechanism": "SCRAM-SHA-512", "database.history.producer.security.protocol": "SASL_SSL", "database.history.producer.ssl.truststore.location": "/certs/custom.jks", "database.history.producer.ssl.truststore.password": "truststore-pass", "database.history.sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"mysql-user\" password=\"mysql-password\";", "database.hostname": "mysql-server", "database.include.list": "database_name", "database.password": "db-password", "database.port": "3306", "database.server.id": "184200", "database.ssl.mode": "required", "database.server.name": "some-server-name", "database.user": "mysql-user", "include.schema.changes": "true", "inconsistent.schema.handling.mode": "fail", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.basic.auth.credentials.source": "USER_INFO", "key.converter.basic.auth.user.info": "mysql-user:mysql-password", "key.converter.schema.registry.ssl.truststore.location": "/certs/custom.jks", "key.converter.schema.registry.ssl.truststore.password": "truststore-pass", "key.converter.schema.registry.url": "https://schema-registry:443/", "key.converter.schemas.enable": "true", "max.batch.size": "8192", "max.queue.size": "32768", "max.queue.size.in.bytes": "268435456", "read.only": "true", "schema.history.internal.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"mysql-user\" password=\"mysql-password\";", "schema.history.internal.consumer.sasl.mechanism": "SCRAM-SHA-512", "schema.history.internal.consumer.security.protocol": "SASL_SSL", "schema.history.internal.consumer.ssl.truststore.location": "/certs/custom.jks", "schema.history.internal.consumer.ssl.truststore.password": "truststore-pass", "schema.history.internal.kafka.bootstrap.servers": "kafka-broker:9091,another-broker:9091", "schema.history.internal.kafka.topic": "main-stream.debezium-change-history", "schema.history.internal.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"mysql-user\" password=\"mysql-password\";", "schema.history.internal.producer.sasl.mechanism": "SCRAM-SHA-512", "schema.history.internal.producer.security.protocol": "SASL_SSL", "schema.history.internal.producer.ssl.truststore.location": "/certs/custom.jks", "schema.history.internal.producer.ssl.truststore.password": "truststore-pass", "schema.name.adjustment.mode": "avro", "signal.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"mysql-user\" password=\"mysql-password\";", "signal.consumer.sasl.mechanism": "SCRAM-SHA-512", "signal.consumer.security.protocol": "SASL_SSL", "signal.consumer.ssl.truststore.location": "/certs/custom.jks", "signal.consumer.ssl.truststore.password": "truststore-pass", "signal.kafka.bootstrap.servers": "kafka-broker:9091,another-broker:9091", "signal.kafka.topic": "main-stream.debezium-signals", "snapshot.locking.mode": "none", "snapshot.mode": "schema_only", "table.exclude.list": "database_name.excluded_table", "tasks.max": "1", "tombstones.on.delete": "false", "topic.creation.default.partitions": "6", "topic.creation.default.replication.factor": "3", "topic.creation.enable": "true", "topic.prefix": "main-stream", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.basic.auth.credentials.source": "USER_INFO", "value.converter.basic.auth.user.info": "mysql-user:mysql-password", "value.converter.schema.registry.ssl.truststore.location": "/certs/custom.jks", "value.converter.schema.registry.ssl.truststore.password": "truststore-pass", "value.converter.schema.registry.url": "https://schema-registry:443/", "value.converter.schemas.enable": "true" }
What is the captured database version and mode of depoyment?
Managed MySQL 5.7 in cloud
What behaviour do you expect?
Connector working
What behaviour do you see?
java.lang.NullPointerException
Do you see the same behaviour using the latest relesead Debezium version?
Tried to build custom image (same confluent libraries being added) with 2.2.0.Alpha1 - exception happens.
Do you have the connector logs, ideally from start till finish?
Nothing suspicious in logs
How to reproduce the issue using our tutorial deployment?
Run connector with configuration provided, pay attention to max.queue.size.in.bytes option.