I have a test that opens a transacted session, sends a message to composite destination ActiveMQQueue("test25Queue1, test25Queue2, test25Queue3") and then receives it from this destination and commits. If I then do the receiving again, I end up receiving two messages from the previous run.
I would've expected that all the messages should be acknowledged by now and should not be delivered again.
public void doScenario() throws JMSException, InterruptedException {
MessageProducer producer = session.createProducer(destination);
producer.setDisableMessageID(disableMessageId);
producer.setDisableMessageTimestamp(disableMessageTimestamp);
producer.setPriority(priority);
if (message == null) {
message = session.createMessage();
message.setStringProperty(stringPropertyName, stringPropertyValue);
}
MessageConsumer consumer = session.createConsumer(destination);
if(FIRST) {
producer.send(message);
FIRST = false;
}
producer.close();
if (session.getTransacted()) {
session.commit();
}
ConsumerThread consumerThread = new ConsumerThread(consumer);
consumerThread.start();
consumerThread.join();
consumer.close();
// we could use in later tests composite destinations, like "topic, queue, queue. ..." so the number of
// received messages should be same as number of destinations
int destCnt = 0;
if (!useExpectedMsgCnt) {
destCnt = StringUtils.countMatches(destination.toString(), ",") + 1;
} else {
destCnt = expectedMsgCnt;
}
// if (session.getTransacted()) {
// session.commit();
// }
for(Message message: consumerThread.messages) {
message.acknowledge();
if(session.getTransacted()) {
session.commit();
}
}
assertThat(consumerThread.messages).hasSize(destCnt);
compareMessages(consumerThread.messages, message, disableMessageId);
System.out.println("Passed once");
// this.closeSession();
// this.startSession();
}