-
Feature Request
-
Resolution: Done
-
Major
-
None
-
None
-
False
-
-
False
-
-
Build a replacement for ArrayBlockingQueue for the use case multiple-producers-single-consumer. E.g. used in TransferQueueBundler, PerDestinationBundler etc.
The disadvantage of ArrayBlockingQueue is that every add and remove acquires a lock, leading to contention among producer and consumer threads.
ConcurrentLinkedQueue is lock-less, but not blocking.
Create ConcurrentLinkedBlockingQueue (replacing the existing impl in JGroups), which extends ConcurrentLinkedQueue and adds blocking implementations (implementing BlockingQueue).
Design
- size: AtomicInteger, keeps track of the number of elements in the queue
- capacity: constant, defines the max number of elements in the queue
- lock: lock used to wait on / signal conditions
- notFull, notEmpty: conditions
Producer
- Increments size
- If size == capacity: block on notFull
- If previous value was 0: signal() notEmpty (wakes up consumer)
Consumer
- Decrements size
- If previous value == capacity: signalAll() notFull (wakes up producers)
- If size == 0: block on notEmpty
The advantage of this design is that additions and removals are lock-less, with the exception when size is either 0 or capacity. In an ideal scenario, with producers adding at a given rate, and the consumer removing at roughly the same rate, there wouldn't be any lock contention between producers (adding to the queue) and producers-consumer (adding to and removing from the queue).