Details
-
Bug
-
Resolution: Not a Bug
-
Normal
-
fuse-7.9-GA
-
None
Description
ENVIRONMENT :
==============
Fuse 7.9.0
EAP 7.4
ISSUE SUMMARY :
===============
-
- One of our customers is reporting that multicast with aggregate not working as expected.
– An AggregationStrategy is used for aggregating all reply messages
The customer's route is
from("jms:queue:{{Queue.Route.ID}}?exchangePattern=InOptionalOut" + "&replyToConcurrentConsumers={{jms.replyTo.concurrent.consumers:1}}" + "&concurrentConsumers={{jms.concurrent.consumers:1}} to("direct:run") ; from("direct:run") to("direct:processRequest") ; from("direct:procesRequest") .multicast(new MyAggregationStrategy()) .parallelProcessing().timeout(500).to("direct:a", "direct:b", "direct:c") .end() .to("mock:result");
– When tested from direct:run, or direct:processRequest the exchange body mock:result is the aggregate data from direct:a, direct:b direct:c
– When tested from "jms:queue:.. the exchange body at mock:result is the input data passed to the multicast processor.
– If the exchange. pattern for jms: queue is changed to InOut the exchange body at mock: result is the aggregate data from direct:a direct:b direct:c
~ 1. from("direct:start")~
~ 2. .multicast(new MyAggregationStrategy())~
– At the end of the final run of the aggregator, the customer has the expected aggregation of the sub-routes.
– However, when reaching the below Post Run Test processor customer is not getting the expected output i.e aggregated output in the Post Run Test processor for each run.
– .process(new Processor() {
public void process(Exchange exchange) throws Exception {
log.info("Post Run Test Point " + exchange.getIn().getHeader("JobName") + ": " + new ObjectMapper().writeValueAsString(exchange.getIn().getBody()));
log.info("break Point");
– The issue is when using .multicast in a route where the JMS queue is set InOptionalOut the .multicast processor returns the input exchange and not the aggregate exchange for the last test.
Attaching a reproducer application.
– Steps to reproduce:
- Create a setup of Fuse 7.9.0 on EAP 7.4
- Start EAP 7.4 server
- You will need to setup 2 queues.
<jms-queue name="Sahara.IEME.Income" entries="queue/Sahara.IEME.Income"/> <jms-queue name="Sahara.IEME.Income.replyTo" entries="queue/Sahara.IEME.Income.replyTo"/>
- You will need to configure the arqullian.xml in match your server.
<property name="javaVmArguments">-Xmx512m -XX:MaxPermSize=512m -Djava.net.preferIPv4Stack=true</property> <property name="allowConnectingToRunningServer">true</property> <property name="managementAddress">127.0.0.1</property> <property name="managementPort">10000</property> <property name="username">admin</property> <property name="password">xxxxxxxxx</property>
— Run using JUnit.
— Check the EAP server logs and check the occurrence of the above
— Post Run processor.