using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Transactions;
using Amqp;
using Amqp.Framing;
namespace test
{
class Test
{
public static void Main(string[] args)
{
Address address = new Address("amqp:);
string queue = "jms.queue.test";
string testName = "TransactedPosting";
int nMsgs = 5;
Connection connection = new Connection(address);
Session session = new Session(connection);
SenderLink sender = new SenderLink(session, "sender-" + testName, queue);
using (var ts = new TransactionScope())
{
for (int i = 0; i < nMsgs; i++)
{
Message message = new Message("test");
message.Properties = new Properties() { MessageId = "commit" + i, GroupId = testName };
sender.Send(message);
}
ts.Complete();
}
using (var ts = new TransactionScope())
{
for (int i = nMsgs; i < nMsgs * 2; i++)
{
Message message = new Message("test");
message.Properties = new Properties() { MessageId = "rollback" + i, GroupId = testName };
sender.Send(message); }
}
using (var ts = new TransactionScope())
{
for (int i = 0; i < nMsgs; i++)
{
Message message = new Message("test");
message.Properties = new Properties() { MessageId = "commit" + i, GroupId = testName };
sender.Send(message);
}
ts.Complete();
}
ReceiverLink receiver = new ReceiverLink(session, "receiver-" + testName, queue);
for (int i = 0; i < nMsgs * 2; i++)
{
Message message = receiver.Receive();
Trace.WriteLine(TraceLevel.Information, "receive: {0}", message.Properties.MessageId);
receiver.Accept(message);
}
connection.Close();
}
}
}