Uploaded image for project: 'Red Hat Fuse'
  1. Red Hat Fuse
  2. ENTESB-17759

Knative broker to AWS SQS Sink Number of message attributes exceeds the allowed maximum

    XMLWordPrintable

Details

    • Bug
    • Resolution: Done
    • Major
    • camel-k-1.10
    • camel-k-1.6-GA
    • Camel-K
    • None
    • False
    • False
    • % %
    • +
    • Automated

    Description

      I have two integrations with a knative broker in between: timer -> broker event and broker event -> aws-sqs-sink. Sending the message to sqs fails with:

      2021-11-04 09:51:53,686 ERROR [io.qua.ver.htt.run.QuarkusErrorHandler] (vert.x-eventloop-thread-0) HTTP Request to /events/custom-event failed, error id: ddd35cdf-fbcc-49c0-8c1e-207606b91503-5: software.amazon.awssdk.services.sqs.model.SqsException: Number of message attributes [11] exceeds the allowed maximum [10]. (Service: Sqs, Status Code: 400, Request ID: 3b14a830-5730-5da4-9d54-f1cdf8495c4b, Extended Request ID: null)
      	at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleErrorResponse(CombinedResponseHandler.java:123)
      	at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleResponse(CombinedResponseHandler.java:79)
      	at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:59)
      	at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:40)
      	at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:40)
      	at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:30)
      	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
      	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:73)
      	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42)
      	at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78)
      	at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40)
      	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:50)
      	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:36)
      	at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:64)
      	at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:34)
      	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
      	at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:56)
      	at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:36)
      	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:80)
      	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:60)
      	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:42)
      	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:48)
      	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:31)
      	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
      	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
      	at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37)
      	at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26)
      	at software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:193)
      	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:135)
      	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:161)
      	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:114)
      	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:169)
      	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:95)
      	at software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45)
      	at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:55)
      	at software.amazon.awssdk.services.sqs.DefaultSqsClient.sendMessage(DefaultSqsClient.java:1528)
      	at org.apache.camel.component.aws2.sqs.Sqs2Producer.processSingleMessage(Sqs2Producer.java:103)
      	at org.apache.camel.component.aws2.sqs.Sqs2Producer.process(Sqs2Producer.java:73)
      	at org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:66)
      	at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:172)
      	at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:463)
      	at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:179)
      	at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:64)
      	at org.apache.camel.processor.Pipeline.process(Pipeline.java:184)
      	at org.apache.camel.component.knative.http.KnativeHttpConsumer.lambda$handleRequest$2(KnativeHttpConsumer.java:211)
      	at io.vertx.core.impl.ContextImpl.lambda$null$0(ContextImpl.java:159)
      	at io.vertx.core.impl.AbstractContext.dispatch(AbstractContext.java:100)
      	at io.vertx.core.impl.ContextImpl.lambda$executeBlocking$1(ContextImpl.java:157)
      	at io.quarkus.vertx.core.runtime.VertxCoreRecorder$13.runWith(VertxCoreRecorder.java:543)
      	at org.jboss.threads.EnhancedQueueExecutor$Task.run(EnhancedQueueExecutor.java:2449)
      	at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1478)
      	at org.jboss.threads.DelegatingRunnable.run(DelegatingRunnable.java:29)
      	at org.jboss.threads.ThreadLocalResettingRunnable.run(ThreadLocalResettingRunnable.java:29)
      	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
      	at java.base/java.lang.Thread.run(Thread.java:829)
      
      

      integration timer -> broker:

            // camel-k: language=java
            package com.test;
      
            import org.apache.camel.builder.RouteBuilder;
      
            public class MyRouteBuilder extends RouteBuilder {
                private final String body = "Hello SQS Sink";
      
                @Override
                public void configure() throws Exception {
                    from("timer:x?repeatCount=5").setBody(constant(body)).to("knative:event/custom-event?apiVersion=eventing.knative.dev/v1&kind=Broker");
                }
            }
      
      

      kameletbinding broker -> sqs sink:

      spec:
        sink:
          properties:
            accessKey: <key>
            autoCreateQueue: true
            queueNameOrArn: <queue>
            region: us-west-1
            secretKey: <secret>
          ref:
            apiVersion: camel.apache.org/v1alpha1
            kind: Kamelet
            name: aws-sqs-sink
            namespace: avano-tnb-test-namespace
        source:
          properties:
            type: custom-event
          ref:
            apiVersion: eventing.knative.dev/v1
            kind: Broker
            name: default
        steps: []
      

      The exchange looks like this (used log-sink instead aws-sqs-sink):

      Exchange[ExchangePattern: InOnly, Headers: {Accept-Encoding=gzip, CamelCloudEventID=D9474440744ADEF-0000000000000004, CamelCloudEventSource=route1, CamelCloudEventTime=2021-11-04T10:19:37.877Z, CamelCloudEventType=custom-event, CamelCloudEventVersion=1.0, CamelHttpMethod=POST, CamelHttpPath=, CamelHttpQuery=null, CamelHttpUri=/events/custom-event, Ce-Knativearrivaltime=2021-11-04T10:19:37.879758Z, content-length=14, Forwarded=for=10.128.2.158;proto=http, Host=log-sink-kbbroker-broker-to-kamelet.avano-tnb-test-namespace.svc.cluster.local, K-Proxy-Request=activator, Traceparent=00-06b71f8df2a155805b2bade66488777f-568d280b5536e10d-00, User-Agent=Go-http-client/1.1, X-Forwarded-For=10.128.2.158, 10.129.2.197, X-Forwarded-Proto=http, X-Request-Id=0cf1d0a6-6465-4971-aa92-5c697a17bedc}, BodyType: byte[], Body: Hello SQS Sink]
      

      I dont know how it is mapped to the message attributes, as there seems to be more headers than 11 as stated in the error message

      Attachments

        Activity

          People

            acosenti Andrea Cosentino
            avano@redhat.com Andrej Vano
            Lucie Krejcirova Lucie Krejcirova
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: