-
Bug
-
Resolution: Done
-
Major
-
3.1.1.Final, 3.2.0.Alpha1
-
None
-
False
-
-
False
-
-
When supplying the following configuration for a KafkaSignalChannel.
"signal.enabled.channels": "source,kafka", "signal.kafka.topic": "_web_app_debezium_signal", "signal.data.collection": "public.debezium_signal", "signal.kafka.consumer.value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer", "signal.kafka.consumer.key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
The connector will throw a NullPointerException:
ERROR [web-app-source-connector|task-0] WorkerSourceTask{id=web-app-source-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:212) java.lang.NullPointerException at java.base/java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1011) at java.base/java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:1006) at java.base/java.util.Properties.put(Properties.java:1301) at java.base/java.util.Properties.setProperty(Properties.java:229) at io.debezium.config.Configuration$Builder.withDefault(Configuration.java:697) at io.debezium.pipeline.signal.channels.KafkaSignalChannel.buildKafkaConfiguration(KafkaSignalChannel.java:161) at io.debezium.pipeline.signal.channels.KafkaSignalChannel.init(KafkaSignalChannel.java:152) at io.debezium.pipeline.signal.SignalProcessor.lambda$new$0(SignalProcessor.java:84) at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) at io.debezium.pipeline.signal.SignalProcessor.<init>(SignalProcessor.java:84) at io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:186) at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:255) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.initializeAndStart(AbstractWorkerSourceTask.java:280) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:840)
This is because we do not apply any validation to SignalChannelReader instances.