-
Bug
-
Resolution: Won't Do
-
Major
-
None
-
2.6.0.GA
-
None
-
False
-
False
-
-
Undefined
-
-
- Try reproducer program(1)
- ==> You can see a lot of messages remain in incomingList field in the Session and a large amount of memory is used.
-
- Memory leak when to close ReceiverLink (C# amqpnetlite)
- The cause seems that when the receiver link is closed, the messages that were held as a credit in the incomingList field in the Session remains unreleased.
- If you repeatedly create and close ReceiverLink, the accumulation of unreleased messages increases.
- Each time the number of credits is increased or the message size is increased, more memory leaks.
(1) reproducer program
using System; using System.Collections.Generic; using System.Linq; using System.Reflection; using System.Text; using System.Threading; using System.Threading.Tasks; using Amqp; // using Amqp; using Amqp.Framing; using Amqp.Types; namespace Custom { internal class Session : Amqp.Session { public Session(Amqp.Connection connection) : base(connection) { } public override String ToString() { // enhancing this ToString() method to show incomingList contents("DeliveryId:Message.Body"). Type sessionType = typeof(Amqp.Session); Object incomingList = sessionType.GetField("incomingList", BindingFlags.NonPublic | BindingFlags.Instance).GetValue(this); Type linkedListType = incomingList.GetType(); Object incomingListHead = linkedListType.GetField("head", BindingFlags.NonPublic | BindingFlags.Instance).GetValue(incomingList); Type deliveryType = incomingListHead?.GetType(); List nodeList = new List(); Object currentNode = incomingListHead; while (currentNode != null) { nodeList.Add(currentNode); currentNode = deliveryType.GetProperty("Next").GetValue(currentNode); } var deliveryBodyList = nodeList.Select(i => deliveryType.GetField("DeliveryId").GetValue(i).ToString() + ":" + ((Message)deliveryType.GetProperty("Message").GetValue(i)).Body.ToString() ).ToList(); return String.Format("tostring:{0}, IsClosed:{1}, SessionState:{2}, incomingList.Length:{3}, incomingList:[{4}]", base.ToString(), this.IsClosed, this.SessionState, deliveryBodyList.Count(), string.Join(", ", deliveryBodyList)); } } class Custom { private static readonly int MESSAGE_COUNT = 50; private static readonly int RECEIVER_COUNT = 10; private Amqp.Connection connection; private Session session; private string target; public static void Main(string[] args) { Custom c = new Custom(); // prepare c.createSession(); c.clearMessages(); c.putInitialMessages(); // reproducer for (int i = 0; i < Custom.RECEIVER_COUNT; i++) { c.receiveMessage(i); } // post-processing c.closeSession(); } private void createSession() { try { string url = "amqp://admin:admin@localhost:5672"; var address = new Amqp.Address(url); this.connection = new Amqp.Connection(address); this.session = new Session(this.connection); this.target = "test"; } catch (Exception ex) { } } private void clearMessages() { ReceiverLink receiver; TimeSpan timeSpan = new TimeSpan(0, 0, 3); receiver = new ReceiverLink(this.session, "1", new Source() { Address = target, }, null); while (true) { Amqp.Message message = receiver.Receive(timeSpan); if (message != null) { receiver.Accept(message); } else { break; } } Console.WriteLine("clearMessages before closing receiver ### session " + this.session.ToString()); receiver.Close(); Console.WriteLine("clearMessages after closing receiver ### session " + this.session.ToString()); } private void putInitialMessages() { var sender = new Amqp.SenderLink(session, "sender", target); for (int i = 0; i < MESSAGE_COUNT; i++) { // var message = new Amqp.Message(new byte[100000]); // when message size is increased, more memory leaks. var message = new Amqp.Message(i); sender.Send(message); } sender.Close(); } private void receiveMessage(int receivercount) { ReceiverLink receiver; TimeSpan timeSpan = new TimeSpan(0, 0, 1); try { receiver = new ReceiverLink(this.session, "1", new Source() { Address = target, }, null); receiver.SetCredit(20); // The message count that remains in Session's incomingList field depends on this credit. Amqp.Message message = receiver.Receive(timeSpan); if (message != null) { Console.WriteLine("receiveMessage before accepting message ### message " + message.Body.ToString()); Console.WriteLine("receiveMessage before accepting message ### session " + this.session.ToString()); receiver.Accept(message); Console.WriteLine("receiveMessage before accepting message ### session " + this.session.ToString()); Thread.Sleep(100); } receiver.Close(); } catch (Exception ex) { } } private void closeSession() { Thread.Sleep(3000); Console.WriteLine("closeSession start ### " + this.session.ToString()); this.session.Close(); Thread.Sleep(3000); Console.WriteLine("closeSession end ### " + this.session.ToString()); this.session = null; } } }