Uploaded image for project: 'AMQ Clients'
  1. AMQ Clients
  2. ENTMQCL-707

[dotnet] Credit is incremented on receipt of message instead of message acknowledgment

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Done
    • Icon: Major Major
    • 2.1.0.B2
    • 2.0.0.GA
    • amqp-dotnet-client
    • None
    • Release Notes
    • Hide

      Here is sample code reproducing the issue:

      using System;
      using System.Collections.Generic;
      using System.Threading;
      using Amqp.Framing;
      using Amqp.Extensions;
      using Amqp.Sasl;
      using Amqp.Types;
      using System.Threading.Tasks;

      namespace Amqp.Examples {
      class Program {
      static void Main(string[] args) {
      Trace.TraceLevel = TraceLevel.Verbose | TraceLevel.Error |
      TraceLevel.Frame | TraceLevel.Information | TraceLevel.Warning;
      Trace.TraceListener = (f, o) => Console.WriteLine(
      DateTime.Now.ToString("[hh:mm:ss.fff]") + " " + string.Format(f, o));

      Connection connection = new Connection(new Address("amqp://localhost:5672"));
      Session session = new Session(connection);
      ReceiverLink receiver = new ReceiverLink(session, "receiver", "orders");

      // sleep to make sure the queue is created before we send, we could
      // await the attach frame but this is just a demo.
      Thread.Sleep(1000);

      SenderLink sender = new SenderLink(session, "sender", "orders");
      Message message = new Message("a message!");
      message.Header = new Header();
      message.Header.Durable = true;

      for (var i = 0; i < 1000; i++)

      { sender.Send(message); }

      sender.Close();

      // never ack... just delay so we can see messages "in-flight" for 60 seconds.
      // we should only see 100 received... we see all 1000 instead.
      receiver.Start(100, async(r, m) =>

      { await Task.Delay(TimeSpan.FromSeconds(60)); }

      );

      Thread.Sleep(30000);
      }

      Show
      Here is sample code reproducing the issue: using System; using System.Collections.Generic; using System.Threading; using Amqp.Framing; using Amqp.Extensions; using Amqp.Sasl; using Amqp.Types; using System.Threading.Tasks; namespace Amqp.Examples { class Program { static void Main(string[] args) { Trace.TraceLevel = TraceLevel.Verbose | TraceLevel.Error | TraceLevel.Frame | TraceLevel.Information | TraceLevel.Warning; Trace.TraceListener = (f, o) => Console.WriteLine( DateTime.Now.ToString(" [hh:mm:ss.fff] ") + " " + string.Format(f, o)); Connection connection = new Connection(new Address("amqp://localhost:5672")); Session session = new Session(connection); ReceiverLink receiver = new ReceiverLink(session, "receiver", "orders"); // sleep to make sure the queue is created before we send, we could // await the attach frame but this is just a demo. Thread.Sleep(1000); SenderLink sender = new SenderLink(session, "sender", "orders"); Message message = new Message("a message!"); message.Header = new Header(); message.Header.Durable = true; for (var i = 0; i < 1000; i++) { sender.Send(message); } sender.Close(); // never ack... just delay so we can see messages "in-flight" for 60 seconds. // we should only see 100 received... we see all 1000 instead. receiver.Start(100, async(r, m) => { await Task.Delay(TimeSpan.FromSeconds(60)); } ); Thread.Sleep(30000); }
    • July 2018

      According to the upstream doc (and consistent with my understanding of message prefetch
      limits in other clients) the link credit should only be incremented
      when a message's disposition has changed (accepted/rejected). From the
      AMQP .Net Lite docs:

      "The link credit is decremented when message arrives and incremented
      when application finishes processing it (by calling
      ReceiverLink.Accept(Message) or ReceiverLink.Reject(Message))" -
      https://github.com/Azure/amqpnetlite/blob/master/docs/articles/building_application.md

      However, this doesn't appear to be what happens... the credit is
      incremented on receipt of a message, not when it's disposition is
      settled. Indeed, if I instantiate a receiverlink and call Start(100,
      callback); where callback is an async method that simply calls await
      Task.Delay(Timespan.FromSeconds(60)), assuming the remote durable
      source has 1000 messages available frame tracing indicates all 1000
      messages are received, and AMQ7 shows 1000 messages in delivery
      despite me never calling Accept() for any message. I'd expect to see
      only 100 received, 100 "in delivery" on the broker, and 900 still
      sitting in "count" on the queue.

              crolke@redhat.com Chuck Rolke (Inactive)
              rhn-gps-tbaert Todd Baert (Inactive)
              David Kornel David Kornel
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

                Created:
                Updated:
                Resolved: