Uploaded image for project: 'AMQ Broker'
  1. AMQ Broker
  2. ENTMQBR-8021

Duplicate Messages Observed Upon Redistribution with Multicast Addresses

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Done
    • Icon: Undefined Undefined
    • None
    • AMQ 7.11.0.GA, AMQ 7.10.2.GA
    • None
    • False
    • None
    • False
    • Hide

      Reproducer Attached.

      To run, extract and correct paths as needed in A/bin/artemis, A/etc/artemis.profile, A/etc/bootstrap.xml, B/bin/artemis, B/etc/artemis.profile, and B/etc/bootstrap.xml.

      Run script reproducer.sh to execute the workflow below.

      Basic Workflow Is As Follows:

      Setup a two broker cluster. No special settings really needed here, generally speaking defaults will do. I used a redistribution-delay of 5k ms, but I doubt the actual value matters as long as its greater than 0. I also have the following properties on the acceptor for openwire: virtualTopicConsumerWildcards=Consumer.*.VirtualTopic.%3E%3B2%3BselectorAware=true

      Auto-create a few queues using the virtualTopic naming scheme. In this test we have these two on each broker:

      Consumer.RH_001.VirtualTopic.Test.Topic - this one has a selector/filter of MyProp='10001'. The acceptor properties should map the consumer selector to a queue filter automatically.
      Consumer.RH_002.VirtualTopic.Test.Topic - this one has no selector/filter

      Connect one consumer to each broker on Consumer.RH_002.VirtualTopic.Test.Topic (the queue with the selectors applied)

      Don't connect any consumers yet to Consumer.RH_002.VirtualTopic.Test.Topic. This is to simulate the offline client in our real-world issue.

      Publish messages to 'VirtualTopic.Test.Topic' with a header like message.setStringProperty("MyProp", "10001"). You will see that these messages all load up on one broker for Consumer.RH_002.VirtualTopic.Test.Topic where there is no consumer, and the messages are evenly distributed and consumed for Consumer.RH_001.VirtualTopic.Test.Topic.

      Getting queue stats from A
      Connection brokerURL = tcp://localhost:62661
      |NAME                     |ADDRESS                  |CONSUMER_COUNT|MESSAGE_COUNT|MESSAGES_ADDED|DELIVERING_COUNT|MESSAGES_ACKED|SCHEDULED_COUNT|ROUTING_TYPE|
      |$.artemis.internal.sf....|$.artemis.internal.sf....|1             |0            |500           |0               |500           |0              |MULTICAST   |
      |Consumer.RH_001.Virtua...|VirtualTopic.Test.Topic  |1             |0            |500           |0               |500           |0              |MULTICAST   |
      |Consumer.RH_002.Virtua...|VirtualTopic.Test.Topic  |0             |1000         |1000          |0               |0             |0              |MULTICAST   |
      |DLQ                      |DLQ                      |0             |0            |0             |0               |0             |0              |ANYCAST     |
      |ExpiryQueue              |ExpiryQueue              |0             |0            |0             |0               |0             |0              |ANYCAST     |
      |activemq.management.47...|activemq.management.47...|1             |0            |0             |0               |0             |0              |MULTICAST   |
      |notif.1fb038b2-ec29-11...|activemq.notifications   |1             |0            |27            |0               |27            |0              |MULTICAST   |
      Sleeping 10 seconds...
      
      Getting queue stats from B
      Connection brokerURL = tcp://localhost:62681
      |NAME                     |ADDRESS                  |CONSUMER_COUNT|MESSAGE_COUNT|MESSAGES_ADDED|DELIVERING_COUNT|MESSAGES_ACKED|SCHEDULED_COUNT|ROUTING_TYPE|
      |$.artemis.internal.sf....|$.artemis.internal.sf....|1             |0            |0             |0               |0             |0              |MULTICAST   |
      |Consumer.RH_001.Virtua...|VirtualTopic.Test.Topic  |1             |0            |500           |0               |500           |0              |MULTICAST   |
      |Consumer.RH_002.Virtua...|VirtualTopic.Test.Topic  |0             |0            |0             |0               |0             |0              |MULTICAST   |
      |DLQ                      |DLQ                      |0             |0            |0             |0               |0             |0              |ANYCAST     |
      |ExpiryQueue              |ExpiryQueue              |0             |0            |0             |0               |0             |0              |ANYCAST     |
      |activemq.management.02...|activemq.management.02...|1             |0            |0             |0               |0             |0              |MULTICAST   |
      |notif.1fac1937-ec29-11...|activemq.notifications   |1             |0            |24            |0               |24            |0              |MULTICAST   |
      

      Now connect a consumer to the Consumer.RH_002.VirtualTopic.Test.Topic queue on the opposite broker where messages are piled up for it. For example, if all the messages for Consumer.RH_002.VirtualTopic.Test.Topic are on broker A then connect the consumer to broker B so that everything gets redistributed.

      If you observe the counts on Consumer.RH_001.VirtualTopic.Test.Topic queue, you will see that when messages were redistributed for Consumer.RH_002.VirtualTopic.Test.Topic that they all were also redistributed to Consumer.RH_001.VirtualTopic.Test.Topic as if they were being fanned out again and the added count goes up for both queues.

      Getting final queue stats from A
      Connection brokerURL = tcp://localhost:62661
      |NAME                     |ADDRESS                  |CONSUMER_COUNT|MESSAGE_COUNT|MESSAGES_ADDED|DELIVERING_COUNT|MESSAGES_ACKED|SCHEDULED_COUNT|ROUTING_TYPE|
      |$.artemis.internal.sf....|$.artemis.internal.sf....|1             |0            |1500          |0               |1500          |0              |MULTICAST   |
      |Consumer.RH_001.Virtua...|VirtualTopic.Test.Topic  |1             |0            |500           |0               |500           |0              |MULTICAST   |
      |Consumer.RH_002.Virtua...|VirtualTopic.Test.Topic  |0             |0            |1000          |0               |1000          |0              |MULTICAST   |
      |DLQ                      |DLQ                      |0             |0            |0             |0               |0             |0              |ANYCAST     |
      |ExpiryQueue              |ExpiryQueue              |0             |0            |0             |0               |0             |0              |ANYCAST     |
      |activemq.management.b8...|activemq.management.b8...|1             |0            |0             |0               |0             |0              |MULTICAST   |
      |notif.1fb038b2-ec29-11...|activemq.notifications   |1             |0            |29            |0               |29            |0              |MULTICAST   |
      Sleeping 10 seconds...
      
      Getting final queue stats from B
      Connection brokerURL = tcp://localhost:62681
      |NAME                     |ADDRESS                  |CONSUMER_COUNT|MESSAGE_COUNT|MESSAGES_ADDED|DELIVERING_COUNT|MESSAGES_ACKED|SCHEDULED_COUNT|ROUTING_TYPE|
      |$.artemis.internal.sf....|$.artemis.internal.sf....|1             |0            |0             |0               |0             |0              |MULTICAST   |
      |Consumer.RH_001.Virtua...|VirtualTopic.Test.Topic  |1             |0            |1000          |0               |500           |0              |MULTICAST   |
      |Consumer.RH_002.Virtua...|VirtualTopic.Test.Topic  |1             |0            |1000          |0               |1000          |0              |MULTICAST   |
      |DLQ                      |DLQ                      |0             |500          |500           |0               |0             |0              |ANYCAST     |
      |DLQ.VirtualTopic.Test....|DLQ                      |0             |500          |500           |0               |0             |0              |MULTICAST   |
      |ExpiryQueue              |ExpiryQueue              |0             |0            |0             |0               |0             |0              |ANYCAST     |
      |activemq.management.7f...|activemq.management.7f...|1             |0            |0             |0               |0             |0              |MULTICAST   |
      |notif.1fac1937-ec29-11...|activemq.notifications   |1             |0            |31            |0               |31            |0              |MULTICAST   |
      

      The extras end up in the DLQ as duplicates.

      Show
      Reproducer Attached. To run, extract and correct paths as needed in A/bin/artemis, A/etc/artemis.profile, A/etc/bootstrap.xml, B/bin/artemis, B/etc/artemis.profile, and B/etc/bootstrap.xml. Run script reproducer.sh to execute the workflow below. Basic Workflow Is As Follows: Setup a two broker cluster. No special settings really needed here, generally speaking defaults will do. I used a redistribution-delay of 5k ms, but I doubt the actual value matters as long as its greater than 0. I also have the following properties on the acceptor for openwire: virtualTopicConsumerWildcards=Consumer.*.VirtualTopic.%3E%3B2%3BselectorAware=true Auto-create a few queues using the virtualTopic naming scheme. In this test we have these two on each broker: Consumer.RH_001.VirtualTopic.Test.Topic - this one has a selector/filter of MyProp='10001'. The acceptor properties should map the consumer selector to a queue filter automatically. Consumer.RH_002.VirtualTopic.Test.Topic - this one has no selector/filter Connect one consumer to each broker on Consumer.RH_002.VirtualTopic.Test.Topic (the queue with the selectors applied) Don't connect any consumers yet to Consumer.RH_002.VirtualTopic.Test.Topic. This is to simulate the offline client in our real-world issue. Publish messages to 'VirtualTopic.Test.Topic' with a header like message.setStringProperty("MyProp", "10001"). You will see that these messages all load up on one broker for Consumer.RH_002.VirtualTopic.Test.Topic where there is no consumer, and the messages are evenly distributed and consumed for Consumer.RH_001.VirtualTopic.Test.Topic. Getting queue stats from A Connection brokerURL = tcp://localhost:62661 |NAME |ADDRESS |CONSUMER_COUNT|MESSAGE_COUNT|MESSAGES_ADDED|DELIVERING_COUNT|MESSAGES_ACKED|SCHEDULED_COUNT|ROUTING_TYPE| |$.artemis.internal.sf....|$.artemis.internal.sf....|1 |0 |500 |0 |500 |0 |MULTICAST | |Consumer.RH_001.Virtua...|VirtualTopic.Test.Topic |1 |0 |500 |0 |500 |0 |MULTICAST | |Consumer.RH_002.Virtua...|VirtualTopic.Test.Topic |0 |1000 |1000 |0 |0 |0 |MULTICAST | |DLQ |DLQ |0 |0 |0 |0 |0 |0 |ANYCAST | |ExpiryQueue |ExpiryQueue |0 |0 |0 |0 |0 |0 |ANYCAST | |activemq.management.47...|activemq.management.47...|1 |0 |0 |0 |0 |0 |MULTICAST | |notif.1fb038b2-ec29-11...|activemq.notifications |1 |0 |27 |0 |27 |0 |MULTICAST | Sleeping 10 seconds... Getting queue stats from B Connection brokerURL = tcp://localhost:62681 |NAME |ADDRESS |CONSUMER_COUNT|MESSAGE_COUNT|MESSAGES_ADDED|DELIVERING_COUNT|MESSAGES_ACKED|SCHEDULED_COUNT|ROUTING_TYPE| |$.artemis.internal.sf....|$.artemis.internal.sf....|1 |0 |0 |0 |0 |0 |MULTICAST | |Consumer.RH_001.Virtua...|VirtualTopic.Test.Topic |1 |0 |500 |0 |500 |0 |MULTICAST | |Consumer.RH_002.Virtua...|VirtualTopic.Test.Topic |0 |0 |0 |0 |0 |0 |MULTICAST | |DLQ |DLQ |0 |0 |0 |0 |0 |0 |ANYCAST | |ExpiryQueue |ExpiryQueue |0 |0 |0 |0 |0 |0 |ANYCAST | |activemq.management.02...|activemq.management.02...|1 |0 |0 |0 |0 |0 |MULTICAST | |notif.1fac1937-ec29-11...|activemq.notifications |1 |0 |24 |0 |24 |0 |MULTICAST | Now connect a consumer to the Consumer.RH_002.VirtualTopic.Test.Topic queue on the opposite broker where messages are piled up for it. For example, if all the messages for Consumer.RH_002.VirtualTopic.Test.Topic are on broker A then connect the consumer to broker B so that everything gets redistributed. If you observe the counts on Consumer.RH_001.VirtualTopic.Test.Topic queue, you will see that when messages were redistributed for Consumer.RH_002.VirtualTopic.Test.Topic that they all were also redistributed to Consumer.RH_001.VirtualTopic.Test.Topic as if they were being fanned out again and the added count goes up for both queues. Getting final queue stats from A Connection brokerURL = tcp://localhost:62661 |NAME |ADDRESS |CONSUMER_COUNT|MESSAGE_COUNT|MESSAGES_ADDED|DELIVERING_COUNT|MESSAGES_ACKED|SCHEDULED_COUNT|ROUTING_TYPE| |$.artemis.internal.sf....|$.artemis.internal.sf....|1 |0 |1500 |0 |1500 |0 |MULTICAST | |Consumer.RH_001.Virtua...|VirtualTopic.Test.Topic |1 |0 |500 |0 |500 |0 |MULTICAST | |Consumer.RH_002.Virtua...|VirtualTopic.Test.Topic |0 |0 |1000 |0 |1000 |0 |MULTICAST | |DLQ |DLQ |0 |0 |0 |0 |0 |0 |ANYCAST | |ExpiryQueue |ExpiryQueue |0 |0 |0 |0 |0 |0 |ANYCAST | |activemq.management.b8...|activemq.management.b8...|1 |0 |0 |0 |0 |0 |MULTICAST | |notif.1fb038b2-ec29-11...|activemq.notifications |1 |0 |29 |0 |29 |0 |MULTICAST | Sleeping 10 seconds... Getting final queue stats from B Connection brokerURL = tcp://localhost:62681 |NAME |ADDRESS |CONSUMER_COUNT|MESSAGE_COUNT|MESSAGES_ADDED|DELIVERING_COUNT|MESSAGES_ACKED|SCHEDULED_COUNT|ROUTING_TYPE| |$.artemis.internal.sf....|$.artemis.internal.sf....|1 |0 |0 |0 |0 |0 |MULTICAST | |Consumer.RH_001.Virtua...|VirtualTopic.Test.Topic |1 |0 |1000 |0 |500 |0 |MULTICAST | |Consumer.RH_002.Virtua...|VirtualTopic.Test.Topic |1 |0 |1000 |0 |1000 |0 |MULTICAST | |DLQ |DLQ |0 |500 |500 |0 |0 |0 |ANYCAST | |DLQ.VirtualTopic.Test....|DLQ |0 |500 |500 |0 |0 |0 |MULTICAST | |ExpiryQueue |ExpiryQueue |0 |0 |0 |0 |0 |0 |ANYCAST | |activemq.management.7f...|activemq.management.7f...|1 |0 |0 |0 |0 |0 |MULTICAST | |notif.1fac1937-ec29-11...|activemq.notifications |1 |0 |31 |0 |31 |0 |MULTICAST | The extras end up in the DLQ as duplicates.
    • Moderate

      In our setup we have two brokers clustered together with redistribution enabled. We have a multicast address with about a dozen subscriptions bound to it. What we are observing is that if a consumer of one of those subscriptions goes offline, messages will begin to build up in the queue (as expected). When the consumer comes online, whatever broker it attaches to first will have messages redistributed from the other broker in the cluster. When this redistribution happens the other multicast queues receive some of those redistributed messages (although not all of them), even though they had already received them previously.

              rhn-support-jbertram Justin Bertram
              rhn-support-dhawkins Duane Hawkins
              Samuel Gajdos Samuel Gajdos
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

                Created:
                Updated:
                Resolved: