Uploaded image for project: 'Red Hat Fuse'
  1. Red Hat Fuse
  2. ENTESB-16950

Kameletbinding with aws-kinesis-sink throws NPE if not used insert-header-action kamelet

    XMLWordPrintable

    Details

    • Type: Bug
    • Status: Done
    • Priority: Critical
    • Resolution: Done
    • Affects Version/s: None
    • Fix Version/s: Camel-K-GA
    • Component/s: Camel-K
    • Labels:
      None
    • Steps to Reproduce:
      Hide

      On OCP 4.6, serverless operator v1.15, camel-K operator Camel K CK5 Build:
      1. Create InMemoryChannel:
      oc apply -f messages.yaml
      2. KameletBinding InMemoryChannel to aws-kinesis-sink:
      oc apply -f inmem-uri-to-aws-k (integration is started ok, phase Running)
      3. Integration from timer:x to InMemoryChannel with given body:
      kamel run timer-to-inmem-integ.groovy --dev
      4. kamel log inmem-uri-to-aws-k produces:

      [1] 2021-07-05 12:07:27,867 INFO  [io.quarkus] (main) Installed features: [camel-attachments, camel-aws2-commons, camel-aws2-kinesis, camel-bean, camel-core, camel-k-cloudevents, camel-k-core, camel-k-knative, camel-k-knative-consumer, camel-k-runtime, camel-kamelet, camel-platform-http, camel-support-common, camel-support-commons-logging, camel-yaml-dsl, cdi, mutiny, smallrye-context-propagation, vertx, vertx-web]
      [1] 2021-07-05 12:08:14,492 ERROR [org.apa.cam.pro.err.DefaultErrorHandler] (vert.x-worker-thread-0) Failed delivery for (MessageId: 540587FEA76EFAB-0000000000000000 on ExchangeId: 540587FEA76EFAB-0000000000000000). Exhausted after delivery attempt: 1 caught: java.lang.NullPointerException
      [1] 
      [1] Message History (complete message history is disabled)
      [1] ---------------------------------------------------------------------------------------------------------------------------------------
      [1] RouteId              ProcessorId          Processor                                                                        Elapsed (ms)
      [1] [route1            ] [route1            ] [from[knative://channel/messages?apiVersion=messaging.knative.dev/v1&kind=InMem] [        29]
      [1]     ...
      [1] [sink              ] [to2               ] [aws2-kinesis:{{stream}}?accessKey=RAW({{accessKey}})&region={{region}}&secretK] [         0]
      [1] 
      [1] Stacktrace
      [1] ---------------------------------------------------------------------------------------------------------------------------------------
      [1] : java.lang.NullPointerException
      [1]     at org.apache.camel.component.aws2.kinesis.Kinesis2Producer.createRequest(Kinesis2Producer.java:56)
      [1]     at org.apache.camel.component.aws2.kinesis.Kinesis2Producer.process(Kinesis2Producer.java:41)
      [1]     at org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:66)
      [1]     at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:172)
      [1]     at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:463)
      [1]     at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:179)
      [1]     at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:64)
      [1]     at org.apache.camel.processor.Pipeline.process(Pipeline.java:184)
      [1]     at org.apache.camel.component.knative.http.KnativeHttpConsumer.lambda$handleRequest$2(KnativeHttpConsumer.java:211)
      [1]     at io.vertx.core.impl.ContextImpl.lambda$executeBlocking$2(ContextImpl.java:313)
      [1]     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      [1]     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      [1]     at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
      [1]     at java.base/java.lang.Thread.run(Thread.java:829)
      [1] 
      [1] 2021-07-05 12:08:14,496 WARN  [org.apa.cam.com.kna.htt.KnativeHttpConsumer] (vert.x-eventloop-thread-0) Caused by: [java.lang.NullPointerException - null]: java.lang.NullPointerException
      [1]     at org.apache.camel.component.aws2.kinesis.Kinesis2Producer.createRequest(Kinesis2Producer.java:56)
      
      Show
      On OCP 4.6, serverless operator v1.15, camel-K operator Camel K CK5 Build: 1. Create InMemoryChannel: oc apply -f messages.yaml 2. KameletBinding InMemoryChannel to aws-kinesis-sink: oc apply -f inmem-uri-to-aws-k (integration is started ok, phase Running) 3. Integration from timer:x to InMemoryChannel with given body: kamel run timer-to-inmem-integ.groovy --dev 4. kamel log inmem-uri-to-aws-k produces: [1] 2021-07-05 12:07:27,867 INFO [io.quarkus] (main) Installed features: [camel-attachments, camel-aws2-commons, camel-aws2-kinesis, camel-bean, camel-core, camel-k-cloudevents, camel-k-core, camel-k-knative, camel-k-knative-consumer, camel-k-runtime, camel-kamelet, camel-platform-http, camel-support-common, camel-support-commons-logging, camel-yaml-dsl, cdi, mutiny, smallrye-context-propagation, vertx, vertx-web] [1] 2021-07-05 12:08:14,492 ERROR [org.apa.cam.pro.err.DefaultErrorHandler] (vert.x-worker-thread-0) Failed delivery for (MessageId: 540587FEA76EFAB-0000000000000000 on ExchangeId: 540587FEA76EFAB-0000000000000000). Exhausted after delivery attempt: 1 caught: java.lang.NullPointerException [1] [1] Message History (complete message history is disabled) [1] --------------------------------------------------------------------------------------------------------------------------------------- [1] RouteId ProcessorId Processor Elapsed (ms) [1] [route1 ] [route1 ] [from[knative: //channel/messages?apiVersion=messaging.knative.dev/v1&kind=InMem] [ 29] [1] ... [1] [sink ] [to2 ] [aws2-kinesis:{{stream}}?accessKey=RAW({{accessKey}})&region={{region}}&secretK] [ 0] [1] [1] Stacktrace [1] --------------------------------------------------------------------------------------------------------------------------------------- [1] : java.lang.NullPointerException [1] at org.apache.camel.component.aws2.kinesis.Kinesis2Producer.createRequest(Kinesis2Producer.java:56) [1] at org.apache.camel.component.aws2.kinesis.Kinesis2Producer.process(Kinesis2Producer.java:41) [1] at org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:66) [1] at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:172) [1] at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:463) [1] at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:179) [1] at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:64) [1] at org.apache.camel.processor.Pipeline.process(Pipeline.java:184) [1] at org.apache.camel.component.knative.http.KnativeHttpConsumer.lambda$handleRequest$2(KnativeHttpConsumer.java:211) [1] at io.vertx.core.impl.ContextImpl.lambda$executeBlocking$2(ContextImpl.java:313) [1] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [1] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [1] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [1] at java.base/java.lang. Thread .run( Thread .java:829) [1] [1] 2021-07-05 12:08:14,496 WARN [org.apa.cam.com.kna.htt.KnativeHttpConsumer] (vert.x-eventloop-thread-0) Caused by: [java.lang.NullPointerException - null ]: java.lang.NullPointerException [1] at org.apache.camel.component.aws2.kinesis.Kinesis2Producer.createRequest(Kinesis2Producer.java:56)
    • Regression Test:
      Todo

      Description

      I am observing that kameletBinding from InMemoryChannel to aws-kinesis-sink throws java.lang.NullPointerException.

      If I added into "steps" section the insert-header-action kamelet with header "partition" (as described in https://camel.apache.org/camel-kamelets/latest/insert-header-action.html), exception didn't occur.

      However the aws-kinesis-sink documentation mentions the header as optional, if I understand:

      The Kamelet expects the following header:
          partition / ce-partition: to set the Kinesis partition key
      If the header won’t be set the exchange ID will be used.
      

        Attachments

          Activity

            People

            Assignee:
            nibbio84 Nicola Ferraro
            Reporter:
            lfabriko Lucie Krejcirova
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

              Dates

              Created:
              Updated:
              Resolved: