-
Bug
-
Resolution: Done
-
Major
-
camel-k-1.6-GA
-
None
-
False
-
False
-
%
-
+
-
Automated
I have two integrations with a knative broker in between: timer -> broker event and broker event -> aws-sqs-sink. Sending the message to sqs fails with:
2021-11-04 09:51:53,686 ERROR [io.qua.ver.htt.run.QuarkusErrorHandler] (vert.x-eventloop-thread-0) HTTP Request to /events/custom-event failed, error id: ddd35cdf-fbcc-49c0-8c1e-207606b91503-5: software.amazon.awssdk.services.sqs.model.SqsException: Number of message attributes [11] exceeds the allowed maximum [10]. (Service: Sqs, Status Code: 400, Request ID: 3b14a830-5730-5da4-9d54-f1cdf8495c4b, Extended Request ID: null) at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleErrorResponse(CombinedResponseHandler.java:123) at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleResponse(CombinedResponseHandler.java:79) at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:59) at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:40) at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:40) at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:30) at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:73) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42) at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78) at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:50) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:36) at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:64) at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:34) at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:56) at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:36) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:80) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:60) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:42) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:48) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:31) at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37) at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26) at software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:193) at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:135) at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:161) at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:114) at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:169) at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:95) at software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45) at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:55) at software.amazon.awssdk.services.sqs.DefaultSqsClient.sendMessage(DefaultSqsClient.java:1528) at org.apache.camel.component.aws2.sqs.Sqs2Producer.processSingleMessage(Sqs2Producer.java:103) at org.apache.camel.component.aws2.sqs.Sqs2Producer.process(Sqs2Producer.java:73) at org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:66) at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:172) at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:463) at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:179) at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:64) at org.apache.camel.processor.Pipeline.process(Pipeline.java:184) at org.apache.camel.component.knative.http.KnativeHttpConsumer.lambda$handleRequest$2(KnativeHttpConsumer.java:211) at io.vertx.core.impl.ContextImpl.lambda$null$0(ContextImpl.java:159) at io.vertx.core.impl.AbstractContext.dispatch(AbstractContext.java:100) at io.vertx.core.impl.ContextImpl.lambda$executeBlocking$1(ContextImpl.java:157) at io.quarkus.vertx.core.runtime.VertxCoreRecorder$13.runWith(VertxCoreRecorder.java:543) at org.jboss.threads.EnhancedQueueExecutor$Task.run(EnhancedQueueExecutor.java:2449) at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1478) at org.jboss.threads.DelegatingRunnable.run(DelegatingRunnable.java:29) at org.jboss.threads.ThreadLocalResettingRunnable.run(ThreadLocalResettingRunnable.java:29) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:829)
integration timer -> broker:
// camel-k: language=java package com.test; import org.apache.camel.builder.RouteBuilder; public class MyRouteBuilder extends RouteBuilder { private final String body = "Hello SQS Sink"; @Override public void configure() throws Exception { from("timer:x?repeatCount=5").setBody(constant(body)).to("knative:event/custom-event?apiVersion=eventing.knative.dev/v1&kind=Broker"); } }
kameletbinding broker -> sqs sink:
spec: sink: properties: accessKey: <key> autoCreateQueue: true queueNameOrArn: <queue> region: us-west-1 secretKey: <secret> ref: apiVersion: camel.apache.org/v1alpha1 kind: Kamelet name: aws-sqs-sink namespace: avano-tnb-test-namespace source: properties: type: custom-event ref: apiVersion: eventing.knative.dev/v1 kind: Broker name: default steps: []
The exchange looks like this (used log-sink instead aws-sqs-sink):
Exchange[ExchangePattern: InOnly, Headers: {Accept-Encoding=gzip, CamelCloudEventID=D9474440744ADEF-0000000000000004, CamelCloudEventSource=route1, CamelCloudEventTime=2021-11-04T10:19:37.877Z, CamelCloudEventType=custom-event, CamelCloudEventVersion=1.0, CamelHttpMethod=POST, CamelHttpPath=, CamelHttpQuery=null, CamelHttpUri=/events/custom-event, Ce-Knativearrivaltime=2021-11-04T10:19:37.879758Z, content-length=14, Forwarded=for=10.128.2.158;proto=http, Host=log-sink-kbbroker-broker-to-kamelet.avano-tnb-test-namespace.svc.cluster.local, K-Proxy-Request=activator, Traceparent=00-06b71f8df2a155805b2bade66488777f-568d280b5536e10d-00, User-Agent=Go-http-client/1.1, X-Forwarded-For=10.128.2.158, 10.129.2.197, X-Forwarded-Proto=http, X-Request-Id=0cf1d0a6-6465-4971-aa92-5c697a17bedc}, BodyType: byte[], Body: Hello SQS Sink]
I dont know how it is mapped to the message attributes, as there seems to be more headers than 11 as stated in the error message