Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-9052

Missing configuration properties for signal channel readers can lead to NullPointerException

XMLWordPrintable

      When supplying the following configuration for a KafkaSignalChannel.

      "signal.enabled.channels": "source,kafka",
      "signal.kafka.topic": "_web_app_debezium_signal",
      "signal.data.collection": "public.debezium_signal",
      "signal.kafka.consumer.value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
      "signal.kafka.consumer.key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
      

      The connector will throw a NullPointerException:

      ERROR [web-app-source-connector|task-0] WorkerSourceTask{id=web-app-source-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:212)
      java.lang.NullPointerException
          at java.base/java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1011)
          at java.base/java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:1006)
          at java.base/java.util.Properties.put(Properties.java:1301)
          at java.base/java.util.Properties.setProperty(Properties.java:229)
          at io.debezium.config.Configuration$Builder.withDefault(Configuration.java:697)
          at io.debezium.pipeline.signal.channels.KafkaSignalChannel.buildKafkaConfiguration(KafkaSignalChannel.java:161)
          at io.debezium.pipeline.signal.channels.KafkaSignalChannel.init(KafkaSignalChannel.java:152)
          at io.debezium.pipeline.signal.SignalProcessor.lambda$new$0(SignalProcessor.java:84)
          at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
          at io.debezium.pipeline.signal.SignalProcessor.<init>(SignalProcessor.java:84)
          at io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:186)
          at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:255)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.initializeAndStart(AbstractWorkerSourceTask.java:280)
          at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
          at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)
          at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
          at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
          at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
          at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
          at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
          at java.base/java.lang.Thread.run(Thread.java:840)
      

      This is because we do not apply any validation to SignalChannelReader instances.

              ccranfor@redhat.com Chris Cranford
              ccranfor@redhat.com Chris Cranford
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

                Created:
                Updated:
                Resolved: