-
Task
-
Resolution: Not a Bug
-
Major
-
None
-
2.1.4.Final
-
None
-
False
-
None
-
False
I'm using version 2.1 of the Debezium MySQL connector and I'm trying to implement read-only ad-hoc snapshots. I was able to get ad-hoc snapshots working using the signal table, but I can't get the topic consumer properly configured for the read-only version.
I have the following configuration to connect the signal consumer to my kafka cluster using SASL:
signal.consumer.sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="kc-test" password="${secrets:strimzi-connect-secrets:kc-test-password}"; signal.consumer.security.protocol: SASL_SSL signal.consumer.security.sasl.mechanism: SCRAM-SHA-512 signal.data.collection: inventory.debezium_signal signal.kafka.bootstrap.servers: strimzi-kafka-bootstrap.staging.example.com:9096 signal.kafka.topic: debezium.inventory.debezium_signal
These are the same settings that are working for my `schema.history.internal.consumer` configuration.
If I set `read.only = true` on the connector I get the following exception on startup:
org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.\n\tat io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:53)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:116)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:833)\nCaused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:830)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:665)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:646)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:626)\n\tat io.debezium.connector.mysql.signal.KafkaSignalThread.<init>(KafkaSignalThread.java:116)\n\tat io.debezium.connector.mysql.MySqlReadOnlyIncrementalSnapshotChangeEventSource.<init>(MySqlReadOnlyIncrementalSnapshotChangeEventSource.java:99)\n\tat io.debezium.connector.mysql.MySqlChangeEventSourceFactory.getIncrementalSnapshotChangeEventSource(MySqlChangeEventSourceFactory.java:91)\n\tat io.debezium.connector.mysql.MySqlChangeEventSourceFactory.getIncrementalSnapshotChangeEventSource(MySqlChangeEventSourceFactory.java:29)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.initStreamEvents(ChangeEventSourceCoordinator.java:185)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:172)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:141)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:109)\n\t... 5 more\nCaused by: org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config\n\tat org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:184)\n\tat org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)\n\tat org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81)\n\tat org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:737)\n\t... 16 more\nCaused by: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config\n\tat org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:303)\n\tat org.apache.kafka.common.security.kerberos.KerberosLogin.configure(KerberosLogin.java:92)\n\tat org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:61)\n\tat org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:105)\n\tat org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:170)\n\t... 20 more\n