Uploaded image for project: 'AMQ Broker'
  1. AMQ Broker
  2. ENTMQBR-8290

Federation Flow Stops with Large Messages

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Done
    • Icon: Undefined Undefined
    • None
    • AMQ 7.11.1.GA
    • broker-core
    • 3
    • False
    • Hide

      None

      Show
      None
    • False
    • Documentation (Ref Guide, User Guide, etc.)
    • Hide

      1. Configure 2 brokers for federation (I had an HA config, ,but ha was not part of the test):

            <connectors>
               <connector name="b2b">tcp://node1.dhawkins.redhat.com:61602?sslEnabled=false</connector>
               <connector name="ms">tcp://node1.dhawkins.redhat.com:61702?sslEnabled=false</connector>
               <connector name="upstream-primary">tcp://node4.dhawkins.redhat.com:61602?sslEnabled=false</connector>
               <connector name="upstream-secondary">tcp://node4.dhawkins.redhat.com:61702?sslEnabled=false</connector>
            </connectors>
      ...
            <federations xmlns="urn:activemq:core">
               <federation name="lh-federation" user="admin" password="admin">
      
                  <upstream name="lh-upstream">
                     <ha>true</ha>
                     <circuit-breaker-timeout>5000</circuit-breaker-timeout>
                     <reconnect-attempts>3</reconnect-attempts>
                     <call-failover-timeout>-1</call-failover-timeout>
                     <static-connectors>
                        <connector-ref>upstream-primary</connector-ref>
                        <connector-ref>upstream-secondary</connector-ref>
                     </static-connectors>
                     <policy ref="lh-queue-policy"/>
                  </upstream>
      
                  <queue-policy name="lh-queue-policy">
                     <include queue-match="TEST.Q.0" address-match="TEST.Q.0"/>
                     <include queue-match="TEST.Q.1" address-match="TEST.Q.1"/>
                     <include queue-match="TEST.Q.2" address-match="TEST.Q.2"/>
                     <include queue-match="TEST.Q.3" address-match="TEST.Q.3"/>
                     <include queue-match="TEST.Q.4" address-match="TEST.Q.4"/>
                     <include queue-match="TEST.Q.5" address-match="TEST.Q.5"/>
                     <include queue-match="TEST.Q.6" address-match="TEST.Q.6"/>
                     <include queue-match="TEST.Q.7" address-match="TEST.Q.7"/>
                     <include queue-match="TEST.Q.8" address-match="TEST.Q.8"/>
                     <include queue-match="TEST.Q.9" address-match="TEST.Q.9"/>
                  </queue-policy>
      
               </federation>
            </federations>
      
            <connectors>
               <connector name="b2b">tcp://node4.dhawkins.redhat.com:61602?sslEnabled=false</connector>
               <connector name="ms">tcp://node4.dhawkins.redhat.com:61702?sslEnabled=false</connector>
               <connector name="upstream-primary">tcp://node1.dhawkins.redhat.com:61602?sslEnabled=false</connector>
               <connector name="upstream-secondary">tcp://node1.dhawkins.redhat.com:61702?sslEnabled=false</connector>
            </connectors>
      ...
            <federations xmlns="urn:activemq:core">
               <federation name="rh-federation" user="admin" password="admin">
      
                  <upstream name="rh-upstream">
                     <ha>true</ha>
                     <circuit-breaker-timeout>5000</circuit-breaker-timeout>
                     <reconnect-attempts>3</reconnect-attempts>
                     <call-failover-timeout>-1</call-failover-timeout>
                     <static-connectors>
                        <connector-ref>upstream-primary</connector-ref>
                        <connector-ref>upstream-secondary</connector-ref>
                     </static-connectors>
                     <policy ref="rh-queue-policy"/>
                  </upstream>
      
                  <queue-policy name="rh-queue-policy">
                     <include queue-match="TEST.Q.0" address-match="TEST.Q.0"/>
                     <include queue-match="TEST.Q.1" address-match="TEST.Q.1"/>
                     <include queue-match="TEST.Q.2" address-match="TEST.Q.2"/>
                     <include queue-match="TEST.Q.3" address-match="TEST.Q.3"/>
                     <include queue-match="TEST.Q.4" address-match="TEST.Q.4"/>
                     <include queue-match="TEST.Q.5" address-match="TEST.Q.5"/>
                     <include queue-match="TEST.Q.6" address-match="TEST.Q.6"/>
                     <include queue-match="TEST.Q.7" address-match="TEST.Q.7"/>
                     <include queue-match="TEST.Q.8" address-match="TEST.Q.8"/>
                     <include queue-match="TEST.Q.9" address-match="TEST.Q.9"/>
                  </queue-policy>
      
               </federation>
            </federations>
      

      2. Start a consumer on the downstream node:

      ./artemis consumer --user admin --password admin --url tcp://localhost:61605 --destination queue://TEST.Q.0 --verbose
      

      3. Start a producer on the upstream node with a large message size:

      ./artemis producer --user admin --password admin  --url tcp://localhost:61605 --message-size 2048000 --message-count 500 --destination queue://TEST.Q.0
      

      4. Wait for some time. No messages are logged by the consumer and the upstream node still shows a count of 500 messages

      5. If the unlimited window sizes are set on the connectors and the tests are retried, messages can be seen moving between the nodes and received by the consumer:

      Example:

            <connectors>
               <connector name="b2b">tcp://node1.dhawkins.redhat.com:61602?sslEnabled=false</connector>
               <connector name="ms">tcp://node1.dhawkins.redhat.com:61702?sslEnabled=false</connector>
               <connector name="upstream-primary">tcp://node4.dhawkins.redhat.com:61602?sslEnabled=false;ackBatchSize=0;consumerWindowSize=-1;producerWindowSize=-1</connector>
               <connector name="upstream-secondary">tcp://node4.dhawkins.redhat.com:61702?sslEnabled=false;ackBatchSize=0;consumerWindowSize=-1;producerWindowSize=-1</connector>
            </connectors>
      
      Show
      1. Configure 2 brokers for federation (I had an HA config, ,but ha was not part of the test): <connectors> <connector name="b2b">tcp://node1.dhawkins.redhat.com:61602?sslEnabled=false</connector> <connector name="ms">tcp://node1.dhawkins.redhat.com:61702?sslEnabled=false</connector> <connector name="upstream-primary">tcp://node4.dhawkins.redhat.com:61602?sslEnabled=false</connector> <connector name="upstream-secondary">tcp://node4.dhawkins.redhat.com:61702?sslEnabled=false</connector> </connectors> ... <federations xmlns="urn:activemq:core"> <federation name="lh-federation" user="admin" password="admin"> <upstream name="lh-upstream"> <ha>true</ha> <circuit-breaker-timeout>5000</circuit-breaker-timeout> <reconnect-attempts>3</reconnect-attempts> <call-failover-timeout>-1</call-failover-timeout> <static-connectors> <connector-ref>upstream-primary</connector-ref> <connector-ref>upstream-secondary</connector-ref> </static-connectors> <policy ref="lh-queue-policy"/> </upstream> <queue-policy name="lh-queue-policy"> <include queue-match="TEST.Q.0" address-match="TEST.Q.0"/> <include queue-match="TEST.Q.1" address-match="TEST.Q.1"/> <include queue-match="TEST.Q.2" address-match="TEST.Q.2"/> <include queue-match="TEST.Q.3" address-match="TEST.Q.3"/> <include queue-match="TEST.Q.4" address-match="TEST.Q.4"/> <include queue-match="TEST.Q.5" address-match="TEST.Q.5"/> <include queue-match="TEST.Q.6" address-match="TEST.Q.6"/> <include queue-match="TEST.Q.7" address-match="TEST.Q.7"/> <include queue-match="TEST.Q.8" address-match="TEST.Q.8"/> <include queue-match="TEST.Q.9" address-match="TEST.Q.9"/> </queue-policy> </federation> </federations> <connectors> <connector name="b2b">tcp://node4.dhawkins.redhat.com:61602?sslEnabled=false</connector> <connector name="ms">tcp://node4.dhawkins.redhat.com:61702?sslEnabled=false</connector> <connector name="upstream-primary">tcp://node1.dhawkins.redhat.com:61602?sslEnabled=false</connector> <connector name="upstream-secondary">tcp://node1.dhawkins.redhat.com:61702?sslEnabled=false</connector> </connectors> ... <federations xmlns="urn:activemq:core"> <federation name="rh-federation" user="admin" password="admin"> <upstream name="rh-upstream"> <ha>true</ha> <circuit-breaker-timeout>5000</circuit-breaker-timeout> <reconnect-attempts>3</reconnect-attempts> <call-failover-timeout>-1</call-failover-timeout> <static-connectors> <connector-ref>upstream-primary</connector-ref> <connector-ref>upstream-secondary</connector-ref> </static-connectors> <policy ref="rh-queue-policy"/> </upstream> <queue-policy name="rh-queue-policy"> <include queue-match="TEST.Q.0" address-match="TEST.Q.0"/> <include queue-match="TEST.Q.1" address-match="TEST.Q.1"/> <include queue-match="TEST.Q.2" address-match="TEST.Q.2"/> <include queue-match="TEST.Q.3" address-match="TEST.Q.3"/> <include queue-match="TEST.Q.4" address-match="TEST.Q.4"/> <include queue-match="TEST.Q.5" address-match="TEST.Q.5"/> <include queue-match="TEST.Q.6" address-match="TEST.Q.6"/> <include queue-match="TEST.Q.7" address-match="TEST.Q.7"/> <include queue-match="TEST.Q.8" address-match="TEST.Q.8"/> <include queue-match="TEST.Q.9" address-match="TEST.Q.9"/> </queue-policy> </federation> </federations> 2. Start a consumer on the downstream node: ./artemis consumer --user admin --password admin --url tcp://localhost:61605 --destination queue://TEST.Q.0 --verbose 3. Start a producer on the upstream node with a large message size: ./artemis producer --user admin --password admin --url tcp://localhost:61605 --message-size 2048000 --message-count 500 --destination queue://TEST.Q.0 4. Wait for some time. No messages are logged by the consumer and the upstream node still shows a count of 500 messages 5. If the unlimited window sizes are set on the connectors and the tests are retried, messages can be seen moving between the nodes and received by the consumer: Example: <connectors> <connector name="b2b">tcp://node1.dhawkins.redhat.com:61602?sslEnabled=false</connector> <connector name="ms">tcp://node1.dhawkins.redhat.com:61702?sslEnabled=false</connector> <connector name="upstream-primary">tcp://node4.dhawkins.redhat.com:61602?sslEnabled=false;ackBatchSize=0;consumerWindowSize=-1;producerWindowSize=-1</connector> <connector name="upstream-secondary">tcp://node4.dhawkins.redhat.com:61702?sslEnabled=false;ackBatchSize=0;consumerWindowSize=-1;producerWindowSize=-1</connector> </connectors>
    • Important

      With AMQ 7.11.1 message flow between federated brokers stops when large messages are encountered. Setting properties consumerWindowSize=-1;producerWindowSize=-1;ackBatchSize=0 on the connectors used for federation seems to resolve the blockage.

              jcliffor@redhat.com John Clifford
              rhn-support-dhawkins Duane Hawkins (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

                Created:
                Updated:
                Resolved: