I have a test that opens a transacted session, sends a message to composite destination ActiveMQQueue("test25Queue1, test25Queue2, test25Queue3") and then receives it from this destination and commits. If I then do the receiving again, I end up receiving two messages from the previous run.
I would've expected that all the messages should be acknowledged by now and should not be delivered again.
public void doScenario() throws JMSException, InterruptedException { MessageProducer producer = session.createProducer(destination); producer.setDisableMessageID(disableMessageId); producer.setDisableMessageTimestamp(disableMessageTimestamp); producer.setPriority(priority); if (message == null) { message = session.createMessage(); message.setStringProperty(stringPropertyName, stringPropertyValue); } MessageConsumer consumer = session.createConsumer(destination); if(FIRST) { producer.send(message); FIRST = false; } producer.close(); if (session.getTransacted()) { session.commit(); } ConsumerThread consumerThread = new ConsumerThread(consumer); consumerThread.start(); consumerThread.join(); consumer.close(); // we could use in later tests composite destinations, like "topic, queue, queue. ..." so the number of // received messages should be same as number of destinations int destCnt = 0; if (!useExpectedMsgCnt) { destCnt = StringUtils.countMatches(destination.toString(), ",") + 1; } else { destCnt = expectedMsgCnt; } // if (session.getTransacted()) { // session.commit(); // } for(Message message: consumerThread.messages) { message.acknowledge(); if(session.getTransacted()) { session.commit(); } } assertThat(consumerThread.messages).hasSize(destCnt); compareMessages(consumerThread.messages, message, disableMessageId); System.out.println("Passed once"); // this.closeSession(); // this.startSession(); }