Uploaded image for project: 'Red Hat Fuse'
  1. Red Hat Fuse
  2. ENTESB-17759

Knative broker to AWS SQS Sink Number of message attributes exceeds the allowed maximum

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Done
    • Icon: Major Major
    • camel-k-1.10
    • camel-k-1.6-GA
    • Camel-K
    • None
    • False
    • False
    • % %
    • +
    • Automated

      I have two integrations with a knative broker in between: timer -> broker event and broker event -> aws-sqs-sink. Sending the message to sqs fails with:

      2021-11-04 09:51:53,686 ERROR [io.qua.ver.htt.run.QuarkusErrorHandler] (vert.x-eventloop-thread-0) HTTP Request to /events/custom-event failed, error id: ddd35cdf-fbcc-49c0-8c1e-207606b91503-5: software.amazon.awssdk.services.sqs.model.SqsException: Number of message attributes [11] exceeds the allowed maximum [10]. (Service: Sqs, Status Code: 400, Request ID: 3b14a830-5730-5da4-9d54-f1cdf8495c4b, Extended Request ID: null)
      	at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleErrorResponse(CombinedResponseHandler.java:123)
      	at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleResponse(CombinedResponseHandler.java:79)
      	at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:59)
      	at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:40)
      	at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:40)
      	at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:30)
      	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
      	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:73)
      	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42)
      	at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78)
      	at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40)
      	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:50)
      	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:36)
      	at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:64)
      	at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:34)
      	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
      	at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:56)
      	at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:36)
      	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:80)
      	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:60)
      	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:42)
      	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:48)
      	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:31)
      	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
      	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
      	at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37)
      	at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26)
      	at software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:193)
      	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:135)
      	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:161)
      	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:114)
      	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:169)
      	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:95)
      	at software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45)
      	at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:55)
      	at software.amazon.awssdk.services.sqs.DefaultSqsClient.sendMessage(DefaultSqsClient.java:1528)
      	at org.apache.camel.component.aws2.sqs.Sqs2Producer.processSingleMessage(Sqs2Producer.java:103)
      	at org.apache.camel.component.aws2.sqs.Sqs2Producer.process(Sqs2Producer.java:73)
      	at org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:66)
      	at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:172)
      	at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:463)
      	at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:179)
      	at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:64)
      	at org.apache.camel.processor.Pipeline.process(Pipeline.java:184)
      	at org.apache.camel.component.knative.http.KnativeHttpConsumer.lambda$handleRequest$2(KnativeHttpConsumer.java:211)
      	at io.vertx.core.impl.ContextImpl.lambda$null$0(ContextImpl.java:159)
      	at io.vertx.core.impl.AbstractContext.dispatch(AbstractContext.java:100)
      	at io.vertx.core.impl.ContextImpl.lambda$executeBlocking$1(ContextImpl.java:157)
      	at io.quarkus.vertx.core.runtime.VertxCoreRecorder$13.runWith(VertxCoreRecorder.java:543)
      	at org.jboss.threads.EnhancedQueueExecutor$Task.run(EnhancedQueueExecutor.java:2449)
      	at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1478)
      	at org.jboss.threads.DelegatingRunnable.run(DelegatingRunnable.java:29)
      	at org.jboss.threads.ThreadLocalResettingRunnable.run(ThreadLocalResettingRunnable.java:29)
      	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
      	at java.base/java.lang.Thread.run(Thread.java:829)
      
      

      integration timer -> broker:

            // camel-k: language=java
            package com.test;
      
            import org.apache.camel.builder.RouteBuilder;
      
            public class MyRouteBuilder extends RouteBuilder {
                private final String body = "Hello SQS Sink";
      
                @Override
                public void configure() throws Exception {
                    from("timer:x?repeatCount=5").setBody(constant(body)).to("knative:event/custom-event?apiVersion=eventing.knative.dev/v1&kind=Broker");
                }
            }
      
      

      kameletbinding broker -> sqs sink:

      spec:
        sink:
          properties:
            accessKey: <key>
            autoCreateQueue: true
            queueNameOrArn: <queue>
            region: us-west-1
            secretKey: <secret>
          ref:
            apiVersion: camel.apache.org/v1alpha1
            kind: Kamelet
            name: aws-sqs-sink
            namespace: avano-tnb-test-namespace
        source:
          properties:
            type: custom-event
          ref:
            apiVersion: eventing.knative.dev/v1
            kind: Broker
            name: default
        steps: []
      

      The exchange looks like this (used log-sink instead aws-sqs-sink):

      Exchange[ExchangePattern: InOnly, Headers: {Accept-Encoding=gzip, CamelCloudEventID=D9474440744ADEF-0000000000000004, CamelCloudEventSource=route1, CamelCloudEventTime=2021-11-04T10:19:37.877Z, CamelCloudEventType=custom-event, CamelCloudEventVersion=1.0, CamelHttpMethod=POST, CamelHttpPath=, CamelHttpQuery=null, CamelHttpUri=/events/custom-event, Ce-Knativearrivaltime=2021-11-04T10:19:37.879758Z, content-length=14, Forwarded=for=10.128.2.158;proto=http, Host=log-sink-kbbroker-broker-to-kamelet.avano-tnb-test-namespace.svc.cluster.local, K-Proxy-Request=activator, Traceparent=00-06b71f8df2a155805b2bade66488777f-568d280b5536e10d-00, User-Agent=Go-http-client/1.1, X-Forwarded-For=10.128.2.158, 10.129.2.197, X-Forwarded-Proto=http, X-Request-Id=0cf1d0a6-6465-4971-aa92-5c697a17bedc}, BodyType: byte[], Body: Hello SQS Sink]
      

      I dont know how it is mapped to the message attributes, as there seems to be more headers than 11 as stated in the error message

              acosenti Andrea Cosentino
              avano@redhat.com Andrej Vano
              Lucie Krejcirova Lucie Krejcirova
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

                Created:
                Updated:
                Resolved: