Uploaded image for project: 'JBoss A-MQ'
  1. JBoss A-MQ
  2. ENTMQ-1943

Unable to acknowledge a messages from a composite destination

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Obsolete
    • Icon: Major Major
    • None
    • JBoss A-MQ 6.3
    • broker

      I have a test that opens a transacted session, sends a message to composite destination ActiveMQQueue("test25Queue1, test25Queue2, test25Queue3") and then receives it from this destination and commits. If I then do the receiving again, I end up receiving two messages from the previous run.

      I would've expected that all the messages should be acknowledged by now and should not be delivered again.

          public void doScenario() throws JMSException, InterruptedException {
              MessageProducer producer = session.createProducer(destination);
      
              producer.setDisableMessageID(disableMessageId);
              producer.setDisableMessageTimestamp(disableMessageTimestamp);
              producer.setPriority(priority);
      
              if (message == null) {
                  message = session.createMessage();
                  message.setStringProperty(stringPropertyName, stringPropertyValue);
              }
      
              MessageConsumer consumer = session.createConsumer(destination);
      
              if(FIRST) {
                  producer.send(message);
                  FIRST = false;
              }
              producer.close();
      
              if (session.getTransacted()) {
                  session.commit();
              }
      
              ConsumerThread consumerThread = new ConsumerThread(consumer);
              consumerThread.start();
              consumerThread.join();
              consumer.close();
      
              // we could use in later tests composite destinations, like "topic, queue, queue. ..." so the number of
              // received messages should be same as number of destinations
              int destCnt = 0;
              if (!useExpectedMsgCnt) {
                  destCnt = StringUtils.countMatches(destination.toString(), ",") + 1;
              } else {
                  destCnt = expectedMsgCnt;
              }
      
      //        if (session.getTransacted()) {
      //            session.commit();
      //        }
              for(Message message: consumerThread.messages) {
                  message.acknowledge();
                  if(session.getTransacted()) {
                      session.commit();
                  }
              }
      
              assertThat(consumerThread.messages).hasSize(destCnt);
              compareMessages(consumerThread.messages, message, disableMessageId);
      
              System.out.println("Passed once");
      //        this.closeSession();
      //        this.startSession();
          }
      

              Unassigned Unassigned
              jdanek@redhat.com Jiri Daněk
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

                Created:
                Updated:
                Resolved: