-
Feature Request
-
Resolution: Done
-
Major
-
None
-
None
-
False
-
None
-
False
There are a few issues with TransferQueueBundler, which affect latency:
- The MPSC queue to which senders add their messages and from which the single consumer drains messages is often a point of contention when many threads are sending messages. (Note to self: investigate use of an MPSC queue from JCTools)
- When we have 8 destinations, and there are pending messages for all destinations, then the sender thread (consumer) sends messages sequentially to all 8 destinations in turn (including loopback to self). This delays the sending of messages to separate destinations (which could be sent in parallel) for all but the first destination.
The idea to overcome these deficiencies is (1) to remove the need to a common MPSC queue and (2) to send to different destinations in parallel.
Instead of keeping a count of accumulated bytes in the hashmap, and sending all messages in the hashmap when the count exceeds bundler.max_size, we maintain a hashmap of destinations (null as a special destination) and entries. Each entry maintains the count for the associated destination, plus a thread-counter.
When thread T sends a message to destination P, then T looks up the entry for P. Then it increments (atomically) the thread-counter in the entry. Next, after acquiring a lock, it checks if the byte count plus the size of the message is greater than max_size and sends all queued messages. If not, it simply adds the message. Before exiting, it decrements (atomically) thread-counter. When 0, it sends the queued messages, otherwise it simply returns.
This is a mixture between SenderSendsBundler and BatchBundler. The advantages are:
- When we have multiple threads sending to the same destination, a batch is only sent when the last thread exits or max_size has been exceeded. This reduces contention on the single queue (in TransferQueueBundler), as there is one queue per destination and not one queue for all destinations
- When draining messages, and every destination has pending messages, TransferQueueBundler would send to destination A, then to B, to C etc. This meant that sending to 8 destinations would delay the sending to the last destination by the <number of destinations before it> times <the average send time per destination>.
- Now, every thread can possibly send a batch to a given destination P; and there is no sender thread (consumer) and no common queue anymore.
- Because of thread-counter, when there is no other thread sending messages to the same destination, queued messages will get sent to P more quickly
- When we have a lot of threads sending to all destinations, then the resulting message batches will be bigger, because we have a max_size per destination and not for all destinations. OTOH, when we don't have a lot of sender threads, then messages might get sent immediately (possibly after every message), reducing latency. In other words, the sending of a message to P is not dependent on the sending of a message to a different destination Q (delaying the message to P until max_size has been reached).
Note that a threads sending to destination null (sending to all members) is not optimal for this bundler, as this degenerates to what SenderSendsBundler is doing.