Uploaded image for project: 'AMQ Clients'
  1. AMQ Clients
  2. ENTMQCL-2335

[dotnet] Memory leak when closing ReceiverLink

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Won't Do
    • Icon: Major Major
    • None
    • 2.6.0.GA
    • amqp-dotnet-client
    • None
    • False
    • False
    • Undefined
    • Hide

      Workaround 1 : Keep to use the same ReceiverLink without closing it
      Workaround 2 : Periodically recreate a Session. The leaked memory also can be released.

      Show
      Workaround 1 : Keep to use the same ReceiverLink without closing it Workaround 2 : Periodically recreate a Session. The leaked memory also can be released.
      1. Try reproducer program(1)
      2. ==> You can see a lot of messages remain in incomingList field in the Session and a large amount of memory is used.

      • Memory leak when to close ReceiverLink (C# amqpnetlite)
      • The cause seems that when the receiver link is closed, the messages that were held as a credit in the incomingList field in the Session remains unreleased.
      • If you repeatedly create and close ReceiverLink, the accumulation of unreleased messages increases.
      • Each time the number of credits is increased or the message size is increased, more memory leaks.

      (1) reproducer program

      using System;
      using System.Collections.Generic;
      using System.Linq;
      using System.Reflection;
      using System.Text;
      using System.Threading;
      using System.Threading.Tasks;
      using Amqp;
      // using Amqp;
      using Amqp.Framing;
      using Amqp.Types;
      
      
      namespace Custom
      {
          internal class Session : Amqp.Session
          {
              public Session(Amqp.Connection connection) : base(connection) { }
      
              public override String ToString()
              {
                  // enhancing this ToString() method to show incomingList contents("DeliveryId:Message.Body").
      
                  Type sessionType = typeof(Amqp.Session);
      
                  Object incomingList = sessionType.GetField("incomingList", BindingFlags.NonPublic | BindingFlags.Instance).GetValue(this);
                  Type linkedListType = incomingList.GetType();
                  Object incomingListHead = linkedListType.GetField("head", BindingFlags.NonPublic | BindingFlags.Instance).GetValue(incomingList);
                  Type deliveryType = incomingListHead?.GetType();
      
                  List nodeList = new List();
                  Object currentNode = incomingListHead;
                  while (currentNode != null) {
                      nodeList.Add(currentNode);
                      currentNode = deliveryType.GetProperty("Next").GetValue(currentNode);
                  }
      
                  var deliveryBodyList = nodeList.Select(i => 
                      deliveryType.GetField("DeliveryId").GetValue(i).ToString()
                          + ":" + ((Message)deliveryType.GetProperty("Message").GetValue(i)).Body.ToString()
                      ).ToList();
      
                  return String.Format("tostring:{0}, IsClosed:{1}, SessionState:{2}, incomingList.Length:{3}, incomingList:[{4}]", base.ToString(), this.IsClosed, this.SessionState, deliveryBodyList.Count(), string.Join(", ", deliveryBodyList));
              }
          }
      
          class Custom
          {
              private static readonly int MESSAGE_COUNT = 50; 
              private static readonly int RECEIVER_COUNT = 10; 
      
              private Amqp.Connection connection;
              private Session session;
              private string target;
      
              public static void Main(string[] args)
              {
                  Custom c = new Custom();
      
                  // prepare
                  c.createSession();
                  c.clearMessages();
                  c.putInitialMessages();
      
                  // reproducer
                  for (int i = 0; i < Custom.RECEIVER_COUNT; i++)
                  { 
                      c.receiveMessage(i);
                  }
      
                  // post-processing
                  c.closeSession();
              }
      
              private void createSession()
              {
                  try
                  {
                      string url = "amqp://admin:admin@localhost:5672";
      
                      var address = new Amqp.Address(url);
                      this.connection = new Amqp.Connection(address);
                      this.session = new Session(this.connection);
                      this.target = "test";
      
                  }
                  catch (Exception ex)
                  {
                  }
              }
      
              private void clearMessages()
              {
      
                  ReceiverLink receiver;
                  TimeSpan timeSpan = new TimeSpan(0, 0, 3);
                  receiver = new ReceiverLink(this.session, "1",
                                      new Source()
                                      {
                                          Address = target,
                                      }, null);
                  while (true)
                  {
                      Amqp.Message message = receiver.Receive(timeSpan);
                      if (message != null)
                      {
                          receiver.Accept(message);
                      }
                      else
                      {
                          break;
                      }
                  }
      
                  Console.WriteLine("clearMessages before closing receiver ### session " + this.session.ToString());
                  receiver.Close();
                  Console.WriteLine("clearMessages after closing receiver ### session " + this.session.ToString());
              }
      
              private void putInitialMessages()
              {
                  var sender = new Amqp.SenderLink(session, "sender", target);
      
                  for (int i = 0; i < MESSAGE_COUNT; i++)
                  {
                      // var message = new Amqp.Message(new byte[100000]); // when message size is increased, more memory leaks.
                      var message = new Amqp.Message(i);
      
                      sender.Send(message);
                  }
      
                  sender.Close();
              }
      
              private void receiveMessage(int receivercount)
              {
                  ReceiverLink receiver;
      
                  TimeSpan timeSpan = new TimeSpan(0, 0, 1); 
      
                  try
                  {
                      receiver = new ReceiverLink(this.session, "1",
                                          new Source()
                                          {
                                              Address = target,
                                          }, null);
                      receiver.SetCredit(20); // The message count that remains in Session's incomingList field depends on this credit.
      
                      Amqp.Message message = receiver.Receive(timeSpan);
                      if (message != null)
                      {
                          Console.WriteLine("receiveMessage before accepting message ### message " + message.Body.ToString());
                          Console.WriteLine("receiveMessage before accepting message ### session " + this.session.ToString());
                          receiver.Accept(message);
                          Console.WriteLine("receiveMessage before accepting message ### session " + this.session.ToString());
                          Thread.Sleep(100);
                      }
      
                      receiver.Close();
                      
                  }
                  catch (Exception ex)
                  {
      
                  }
              }
      
              private void closeSession()
              {
                  Thread.Sleep(3000);
                  
                  Console.WriteLine("closeSession start ### " + this.session.ToString());
      
                  this.session.Close();
                  Thread.Sleep(3000);
                  Console.WriteLine("closeSession end ### " + this.session.ToString());
      
                  this.session = null;
      
              }
          }
      }
      

              rhn-support-tbish Tim Bish
              rhn-support-tyamashi Tomonari Yamashita
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

                Created:
                Updated:
                Resolved: