Uploaded image for project: 'Red Hat Fuse'
  1. Red Hat Fuse
  2. ENTESB-20720

Multicast not returning aggregated

    XMLWordPrintable

Details

    • Bug
    • Resolution: Not a Bug
    • Normal
    • fuse-7.12-GA
    • fuse-7.9-GA
    • Camel
    • None
    • False
    • None
    • False
    • % %
    • Todo
    • Moderate
    • Very Likely

    Description

      ENVIRONMENT :
      ==============
      Fuse 7.9.0
      EAP 7.4
       
      ISSUE SUMMARY :
      ===============

        • One of our customers is reporting that multicast with aggregate not working as expected.

       

      – An AggregationStrategy is used for aggregating all reply messages
      The customer's route is 

      from("jms:queue:{{Queue.Route.ID}}?exchangePattern=InOptionalOut"        
                + "&replyToConcurrentConsumers={{jms.replyTo.concurrent.consumers:1}}"
                + "&concurrentConsumers={{jms.concurrent.consumers:1}}
        to("direct:run")
      ;
      from("direct:run")
        to("direct:processRequest")
      ;
      from("direct:procesRequest")
         .multicast(new MyAggregationStrategy())
         .parallelProcessing().timeout(500).to("direct:a", "direct:b", "direct:c")
         .end()
      .to("mock:result");

       

       

      – When tested from direct:run, or direct:processRequest the exchange body mock:result is the aggregate data from direct:a, direct:b direct:c

       

      – When tested from "jms:queue:.. the exchange body at mock:result is the input data passed to the multicast processor.

       

      – If the exchange. pattern for jms: queue is changed to InOut the exchange body at mock: result is the aggregate data from direct:a direct:b direct:c

      ~ 1. from("direct:start")~
      ~ 2.  .multicast(new MyAggregationStrategy())~

       

      – At the end of the final run of the aggregator, the customer has the expected aggregation of the sub-routes.  

       

      – However, when reaching the below Post Run Test processor customer is not getting the expected output i.e aggregated output in the Post Run Test processor for each run.

      –    .process(new Processor() {
                                public void process(Exchange exchange) throws Exception {
                                  log.info("Post Run Test Point " + exchange.getIn().getHeader("JobName") + ": " + new ObjectMapper().writeValueAsString(exchange.getIn().getBody()));
                                  log.info("break Point");

       

      – The issue is when using .multicast in a route where the JMS queue is set InOptionalOut the .multicast processor returns the input exchange and not the aggregate exchange for the last test.
      Attaching a reproducer application.

       

      – Steps to reproduce:

      • Create a setup of Fuse 7.9.0 on EAP 7.4
      • Start EAP 7.4 server
      • You will need to setup 2 queues.
        <jms-queue name="Sahara.IEME.Income" entries="queue/Sahara.IEME.Income"/> <jms-queue name="Sahara.IEME.Income.replyTo" entries="queue/Sahara.IEME.Income.replyTo"/>
      • You will need to configure the arqullian.xml in match your server.
         
          <property name="javaVmArguments">-Xmx512m -XX:MaxPermSize=512m -Djava.net.preferIPv4Stack=true</property> <property name="allowConnectingToRunningServer">true</property> <property name="managementAddress">127.0.0.1</property> <property name="managementPort">10000</property> <property name="username">admin</property> <property name="password">xxxxxxxxx</property>
         
        

      — Run using JUnit.

      — Check the EAP server logs and check the occurrence of the above

      — Post Run processor.

       
      Sahara.IEME.Income.v2.zip

      Attachments

        Activity

          People

            ldemasi Luigi De Masi
            rhn-support-slakade Shivam Lakade
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: