Uploaded image for project: 'JBoss A-MQ'
  1. JBoss A-MQ
  2. ENTMQ-1734

Advisory Messages Filtered for Temp Queue in NoB

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Done
    • Icon: Major Major
    • None
    • JBoss A-MQ 6.1
    • broker, networks
    • None
    • Hide

      Do not specify dynamicallyIncludedDestinations
      or set destinationFilter

      Show
      Do not specify dynamicallyIncludedDestinations or set destinationFilter
    • Hide

      Create a 3 server NoB. Test broker configuration:

      <?xml version="1.0" encoding="UTF-8"?>
      <beans
          xmlns="http://www.springframework.org/schema/beans"
          xmlns:amq="http://activemq.apache.org/schema/core"
          xmlns:p="http://www.springframework.org/schema/p"
          xmlns:context="http://www.springframework.org/schema/context"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="
              http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
              http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
          <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
              <property name="properties">
                  <bean class="org.fusesource.mq.fabric.ConfigurationProperties" />
              </property>
          </bean>
      
          <broker
              xmlns="http://activemq.apache.org/schema/core"
              brokerName="${broker.name}"
              dataDirectory="${data}"
              dedicatedTaskRunner="true"
              useJmx="true"
              persistent="false">
              
              <destinationPolicy>
                  <policyMap>
                      <policyEntries>
                          <policyEntry
                              topic=">"
                              producerFlowControl="true"
                              cursorMemoryHighWaterMark="100"
                              memoryLimit="10mb">
                              <pendingMessageLimitStrategy>
                                  <constantPendingMessageLimitStrategy
                                      limit="10" />
                              </pendingMessageLimitStrategy>
                          </policyEntry>
                          <policyEntry
                              queue=">"
                              producerFlowControl="true"
                              cursorMemoryHighWaterMark="100"
                              memoryLimit="10mb">
                              <pendingQueuePolicy>
                                  <vmQueueCursor />
                              </pendingQueuePolicy>
                              <amq:networkBridgeFilterFactory>
                                  <conditionalNetworkBridgeFilterFactory
                                      replayWhenNoConsumers="true" />
                              </amq:networkBridgeFilterFactory>
                          </policyEntry>
                      </policyEntries>
                  </policyMap>
              </destinationPolicy>
      
              <managementContext>
                  <managementContext
                      createConnector="false" />
              </managementContext>
      
              <networkConnectors>
                  <networkConnector
                      name="${broker.network.name.1}"
                      uri="${broker.network.uri.1}"
                      userName="${broker.username}"
                      password="${broker.password}"
                      conduitSubscriptions="false"
                      decreaseNetworkConsumerPriority="false"
      			    networkTTL="1"
                      >
      			<dynamicallyIncludedDestinations>
      				<queue physicalName="com.redhat.>" />
      			</dynamicallyIncludedDestinations>
                  </networkConnector>
                  <networkConnector
                      name="${broker.network.name.2}"
                      uri="${broker.network.uri.2}"
                      userName="${broker.username}"
                      password="${broker.password}"
                      conduitSubscriptions="false"
                      decreaseNetworkConsumerPriority="false"
      			    networkTTL="1"
                      >
      			<dynamicallyIncludedDestinations>
      				<queue physicalName="com.redhat.>" />
      			</dynamicallyIncludedDestinations>
                  </networkConnector>
              </networkConnectors>
      
              <amq:persistenceAdapter>
                  <amq:levelDB
                      directory="${broker.data.dir}" />
              </amq:persistenceAdapter>                
      
              <plugins>
                  <jaasAuthenticationPlugin configuration="karaf" />
              </plugins>
              
              <systemUsage>
                  <systemUsage
                      sendFailIfNoSpace="true">
                      <memoryUsage>
                          <memoryUsage
                              limit="64 mb" />
                      </memoryUsage>
                      <storeUsage>
                          <storeUsage
                              limit="1 gb" />
                      </storeUsage>
                      <tempUsage>
                          <tempUsage
                              limit="500 mb" />
                      </tempUsage>
                  </systemUsage>
              </systemUsage>
      
              <transportConnectors>
                  <transportConnector
                      name="${broker.client.transport.name}"
                      uri="${broker.client.transport.uri}" />
                  <transportConnector
                      name="${broker.nob.transport.name}"
                      uri="${broker.nob.transport.uri}" />
              </transportConnectors>
          </broker>
      </beans>
      

      Deploy camel request/reply route using temp destinations:

      <camelContext trace="false" xmlns="http://camel.apache.org/schema/blueprint">                                                                                                                                                       
              <route id="produce-incoming1">                                                                                                                                                                                              
                      <from uri="timer://productionTimer?fixedRate=true&amp;period=5000"/>                                                                                                                                                
                      <transform>                                                                                                                                                                                                         
                              <simple>{{question}}</simple>                                                                                                                                                                               
                      </transform>                                                                                                                                                                                                        
                      <bean ref="myProducer" method="produce"/>                                                                                                                                                                           
                      <log message="${body}"/>                                                                                                                                                                                            
                      <to uri="amq:queue:com.redhat.incoming1"/>                                                                                                                                                                          
              </route>                                                                                                                                                                                                                    
                                                                                                                                                                                                                                          
              <route id="initial-consume-reply1">                                                                                                                                                                                         
                      <from uri="amq:queue:com.redhat.incoming1"/>                                                                                                                                                                        
                      <to uri="amq:queue:com.redhat.service1?replyToType=Temporary&amp;requestTimeout=35000" pattern="InOut"/>                                                                                                            
              </route>                                                                                                                                                                                                                    
                                                                                                                                                                                                                                          
              <route id="consume-process-reply1">                                                                                                                                                                                         
                      <from uri="amq:queue:com.redhat.service1"/>                                                                                                                                                                         
                              <choice>                                                                                                                                                                                                    
                                      <when>                                                                                                                                                                                              
                                              <simple>${body} contains 'What is your name?'</simple>                                                                                                                                      
                                              <transform>                                                                                                                                                                                 
                                                      <simple>{{name}}!</simple>                                                                                                                                                          
                                              </transform>                                                                                                                                                                                
                                              <bean ref="myConsumer" method="modify"/>                                                                                                                                                    
                                      </when>                                                                                                                                                                                             
                                      <otherwise>                                                                                                                                                                                         
                                              <transform>                                                                                                                                                                                 
                                                      <simple>If you don't ask me my name, I'm not going to tell you!</simple>                                                                                                            
                                              </transform>                                                                                                                                                                                
                                      </otherwise>                                                                                                                                                                                        
                              </choice>                                                                                                                                                                                                   
                      <log message="${body}"/>                                                                                                                                                                                            
              </route>                                                                                                                                                                                                                    
      </camelContext>
      
      Show
      Create a 3 server NoB. Test broker configuration: <?xml version= "1.0" encoding= "UTF-8" ?> <beans xmlns= "http: //www.springframework.org/schema/beans" xmlns:amq= "http: //activemq.apache.org/schema/core" xmlns:p= "http: //www.springframework.org/schema/p" xmlns:context= "http: //www.springframework.org/schema/context" xmlns:xsi= "http: //www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http: //www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http: //activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <bean class= "org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" > <property name= "properties" > <bean class= "org.fusesource.mq.fabric.ConfigurationProperties" /> </property> </bean> <broker xmlns= "http: //activemq.apache.org/schema/core" brokerName= "${broker.name}" dataDirectory= "${data}" dedicatedTaskRunner= " true " useJmx= " true " persistent= " false " > <destinationPolicy> <policyMap> <policyEntries> <policyEntry topic= ">" producerFlowControl= " true " cursorMemoryHighWaterMark= "100" memoryLimit= "10mb" > <pendingMessageLimitStrategy> <constantPendingMessageLimitStrategy limit= "10" /> </pendingMessageLimitStrategy> </policyEntry> <policyEntry queue= ">" producerFlowControl= " true " cursorMemoryHighWaterMark= "100" memoryLimit= "10mb" > <pendingQueuePolicy> <vmQueueCursor /> </pendingQueuePolicy> <amq:networkBridgeFilterFactory> <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers= " true " /> </amq:networkBridgeFilterFactory> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> <managementContext> <managementContext createConnector= " false " /> </managementContext> <networkConnectors> <networkConnector name= "${broker.network.name.1}" uri= "${broker.network.uri.1}" userName= "${broker.username}" password= "${broker.password}" conduitSubscriptions= " false " decreaseNetworkConsumerPriority= " false " networkTTL= "1" > <dynamicallyIncludedDestinations> <queue physicalName= "com.redhat.>" /> </dynamicallyIncludedDestinations> </networkConnector> <networkConnector name= "${broker.network.name.2}" uri= "${broker.network.uri.2}" userName= "${broker.username}" password= "${broker.password}" conduitSubscriptions= " false " decreaseNetworkConsumerPriority= " false " networkTTL= "1" > <dynamicallyIncludedDestinations> <queue physicalName= "com.redhat.>" /> </dynamicallyIncludedDestinations> </networkConnector> </networkConnectors> <amq:persistenceAdapter> <amq:levelDB directory= "${broker.data.dir}" /> </amq:persistenceAdapter> <plugins> <jaasAuthenticationPlugin configuration= "karaf" /> </plugins> <systemUsage> <systemUsage sendFailIfNoSpace= " true " > <memoryUsage> <memoryUsage limit= "64 mb" /> </memoryUsage> <storeUsage> <storeUsage limit= "1 gb" /> </storeUsage> <tempUsage> <tempUsage limit= "500 mb" /> </tempUsage> </systemUsage> </systemUsage> <transportConnectors> <transportConnector name= "${broker.client.transport.name}" uri= "${broker.client.transport.uri}" /> <transportConnector name= "${broker.nob.transport.name}" uri= "${broker.nob.transport.uri}" /> </transportConnectors> </broker> </beans> Deploy camel request/reply route using temp destinations: <camelContext trace= " false " xmlns= "http: //camel.apache.org/schema/blueprint" > <route id= "produce-incoming1" > <from uri= "timer: //productionTimer?fixedRate= true &amp;period=5000" /> <transform> <simple>{{question}}</simple> </transform> <bean ref= "myProducer" method= "produce" /> <log message= "${body}" /> <to uri= "amq:queue:com.redhat.incoming1" /> </route> <route id= "initial-consume-reply1" > <from uri= "amq:queue:com.redhat.incoming1" /> <to uri= "amq:queue:com.redhat.service1?replyToType=Temporary&amp;requestTimeout=35000" pattern= "InOut" /> </route> <route id= "consume-process-reply1" > <from uri= "amq:queue:com.redhat.service1" /> <choice> <when> <simple>${body} contains 'What is your name?' </simple> <transform> <simple>{{name}}!</simple> </transform> <bean ref= "myConsumer" method= "modify" /> </when> <otherwise> <transform> <simple>If you don 't ask me my name, I' m not going to tell you!</simple> </transform> </otherwise> </choice> <log message= "${body}" /> </route> </camelContext>

      Setting dynamicallyIncludedDestinations on a networkConnector prevents consumer info advisory messages from being sent to other nodes in a NoB with using Camel-JMS with Temporary Queue Request Reply.

      Camel-JMS is creating a temporary queue:

      https://github.com/apache/camel/blob/camel-2.11.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java#L205

      2016-05-31 11:48:01,796 | DEBUG | edhat.incoming1] | JmsProducer                      | .camel.component.jms.JmsProducer  106 | 2741 - org.apache.camel.camel-jms - 2.12.0.redhat-611423 | Using JmsReplyManager: org.apache.camel.component.jms.reply.TemporaryQueueReplyManager@3822ab41 to process replies from temporary queue
      2016-05-31 11:48:01,802 | TRACE | edhat.incoming1] | JmsProducer                      | .camel.component.jms.JmsProducer  395 | 2741 - org.apache.camel.camel-jms - 2.12.0.redhat-611423 | Using inOut jms template
      2016-05-31 11:48:01,804 | DEBUG | edhat.incoming1] | msConfiguration$CamelJmsTemplate | ngframework.jms.core.JmsTemplate  464 | 2754 - org.apache.servicemix.bundles.spring-jms - 3.2.12.RELEASE_1 | Executing callback on JMS Session: PooledSession { ActiveMQSession {id=ID:fusefabric4.gsslab.rdu2.redhat.com-38624-1464709680594-5:1:2,started=true} }
      2016-05-31 11:48:01,826 | TRACE | redhat.service1] | TemporaryQueueReplyManager       | nt.jms.reply.ReplyManagerSupport   71 | 2741 - org.apache.camel.camel-jms - 2.12.0.redhat-611423 | ReplyTo destination: temp-queue://ID:fusefabric4.gsslab.rdu2.redhat.com-38624-1464709680594-5:1:1
      2016-05-31 11:48:01,827 | DEBUG | redhat.service1] | TemporaryQueueReplyManager       | aryReplyQueueDestinationResolver  208 | 2741 - org.apache.camel.camel-jms - 2.12.0.redhat-611423 | Refreshed Temporary ReplyTo Queue. New queue: ID:fusefabric4.gsslab.rdu2.redhat.com-38624-1464709680594-5:1:1
      

      Looking at the broker after removing the dynamicallyIncludedDestinations, you can see the Advisory Topic is being created as 'ActiveMQ.Advisory.Consumer.Queue.TMPQUEUE' and not under 'ActiveMQ.Advisory.TempQueue'.

      NoB 1:

      Fabric8:admin@cigna-nob1> dstat 
      Name                                                Queue Size  Producer #  Consumer #   Enqueue #   Dequeue #    Memory %
      ActiveMQ.Advisory.Connection                                 0           0           0           5           0           0
      ActiveMQ.Advisory.Consumer.Queue.ID:fusefabric4.gsslab.rdu2.redhat.com-38624-1464709680594-5:1:1           0           0           2           1           0           0
      ActiveMQ.Advisory.Consumer.Queue.com.redhat.incoming1           0           0           2           5           0           0
      ActiveMQ.Advisory.Consumer.Queue.com.redhat.service1           0           0           2           5           0           0
      ActiveMQ.Advisory.MasterBroker                               0           0           0           1           0           0
      ActiveMQ.Advisory.NetworkBridge                              0           0           0           2           0           0
      ActiveMQ.Advisory.Queue                                      0           0           0           6           0           0
      ActiveMQ.Advisory.TempQueue                                  0           0           3           5           0           0
      ID:fusefabric4.gsslab.rdu2.redhat.com-38624-1464709680594-5:1:1           0           0           1        1564        1564           0
      com.redhat.incoming1                                         0           0           1           0           0           0
      com.redhat.service1                                          0           0           1        1564        1564           0
      

      NoB 2:

      Fabric8:admin@cigna-nob2> dstat
      Name                                                Queue Size  Producer #  Consumer #   Enqueue #   Dequeue #    Memory %
      ActiveMQ.Advisory.Connection                                 0           0           0           5           0           0
      ActiveMQ.Advisory.Consumer.Queue.ID:fusefabric4.gsslab.rdu2.redhat.com-38624-1464709680594-5:1:1           0           0           2           1           0           0
      ActiveMQ.Advisory.Consumer.Queue.com.redhat.incoming1           0           0           2           5           0           0
      ActiveMQ.Advisory.Consumer.Queue.com.redhat.service1           0           0           2           5           0           0
      ActiveMQ.Advisory.MasterBroker                               0           0           0           1           0           0
      ActiveMQ.Advisory.NetworkBridge                              0           0           0           2           0           0
      ActiveMQ.Advisory.Queue                                      0           0           0           6           0           0
      ActiveMQ.Advisory.TempQueue                                  0           0           3           5           0           0
      ID:fusefabric4.gsslab.rdu2.redhat.com-38624-1464709680594-5:1:1           0           0           1        1613        1613           0
      com.redhat.incoming1                                         0           0           1           0           0           0
      com.redhat.service1                                          0           0           1        1613        1613           0
      

      NoB 3:

      Fabric8:admin@cigna-nob3> dstat
      Name                                                Queue Size  Producer #  Consumer #   Enqueue #   Dequeue #    Memory %
      ActiveMQ.Advisory.Connection                                 0           0           0          11           0           0
      ActiveMQ.Advisory.Consumer.Queue.ID:fusefabric4.gsslab.rdu2.redhat.com-38624-1464709680594-5:1:1           0           0           2           1           0           0
      ActiveMQ.Advisory.Consumer.Queue.com.redhat.incoming1           0           0           2           5           0           0
      ActiveMQ.Advisory.Consumer.Queue.com.redhat.service1           0           0           2           5           0           0
      ActiveMQ.Advisory.MasterBroker                               0           0           0           1           0           0
      ActiveMQ.Advisory.NetworkBridge                              0           0           0           2           0           0
      ActiveMQ.Advisory.Queue                                      0           0           0           6           0           0
      ActiveMQ.Advisory.TempQueue                                  0           0           3           7           0           0
      ID:fusefabric4.gsslab.rdu2.redhat.com-38624-1464709680594-5:1:1           0           0           1           0           0           0
      com.redhat.incoming1                                         0           0           1        1844        1844           0
      com.redhat.service1                                          0           0           1         216         216           0
      

              gtully@redhat.com Gary Tully
              rhn-support-mrobson Matt Robson
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

                Created:
                Updated:
                Resolved: