package org.jgroups.stack;
import org.jgroups.Message;
import org.jgroups.util.Util;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* Counterpart of AckSenderWindow. Simple FIFO buffer. Every message received is ACK'ed (even
* duplicates) and added to a hashmap keyed by seqno. The next seqno to be received is stored in
* next_to_remove
. When a message with a seqno less than next_to_remove is received, it
* will be discarded. The remove()
method removes and returns a message whose seqno is
* equal to next_to_remove, or null if not found.
* Change May 28 2002 (bela): replaced TreeSet with HashMap. Keys do not need to be sorted, and
* adding a key to a sorted set incurs overhead.
*
* @author Bela Ban
* @version $Id: AckReceiverWindow.java,v 1.40 2010/01/20 06:59:15 belaban Exp $
*/
public class AckReceiverWindow {
private final static int TOMBSTONE_WINDOW_SIZE = 2;
private final static Message TOMBSTONE = new Message();
private final AtomicLong next_to_remove = new AtomicLong(1);
private final AtomicLong low_watermark = new AtomicLong(1);
private final ConcurrentMap msgs = new ConcurrentHashMap();
private final AtomicBoolean processing = new AtomicBoolean(false);
public AckReceiverWindow(long initial_seqno) {
next_to_remove.set(initial_seqno);
low_watermark.set(initial_seqno);
}
public AtomicBoolean getProcessing() {
return processing;
}
public Message get(long seqno) {
return msgs.get(seqno);
}
/**
* Adds a new message. Message cannot be null
*
* @return True if the message was added, false if not (e.g. duplicate, message was already
* present)
*/
public boolean add(long seqno, Message msg) {
return add2(seqno, msg) == 1;
}
/**
* Adds a message if not yet received
*
* @param seqno
* @param msg
* @return -1 if not added because seqno < next_to_remove, 0 if not added because already
* present, 1 if added successfully
*/
public byte add2(long seqno, Message msg) {
if (msg == null)
throw new IllegalArgumentException("msg must be non-null");
long next = next_to_remove.get();
if (seqno >= next) {
Message retval = msgs.putIfAbsent(seqno, msg);
if (retval == null) {
moveWatermarkTo(next);
return 1;
} else if (retval == TOMBSTONE) {
return -1;
}
return 0;
}
return -1;
}
public byte add2(long seqno, Message msg, long timeout) {
if (msg == null)
throw new IllegalArgumentException("msg must be non-null");
long next = next_to_remove.get();
Util.sleep(timeout);
if (seqno >= next) {
Message retval = msgs.putIfAbsent(seqno, msg);
if (retval == null) {
moveWatermarkTo(next);
return 1;
} else if (retval == TOMBSTONE) {
return -1;
}
return 0;
}
return -1;
}
private void moveWatermarkTo(long seqno) {
long until = seqno - TOMBSTONE_WINDOW_SIZE;
long start_from = low_watermark.get();
while (start_from < until && msgs.remove(start_from++, TOMBSTONE)) {
low_watermark.set(start_from);
}
}
private Message removeHelper(boolean oob) {
long seqno = next_to_remove.get();
Message retval = msgs.get(seqno);
if (retval != null) {
if (oob && !retval.isFlagSet(Message.OOB))
return null;
if (msgs.replace(seqno, retval, TOMBSTONE)) {
next_to_remove.compareAndSet(seqno, seqno + 1);
} else {
retval = null;
}
}
return retval;
}
/**
* Removes a message whose seqno is equal to next_to_remove
, increments the latter.
* Returns message that was removed, or null, if no message can be removed. Messages are thus
* removed in order.
*/
public Message remove() {
return removeHelper(false);
}
/**
* Removes as many messages as possible (in sequence, without gaps)
*
* @return
*/
public List removeMany() {
List retval = new LinkedList(); // we remove msgs.size() messages *max*
Message msg;
long seqno = next_to_remove.get();
while ((msg = msgs.get(seqno)) != null) {
if (msgs.replace(seqno, msg, TOMBSTONE)) {
retval.add(msg);
if (!next_to_remove.compareAndSet(seqno, ++seqno)) {
break;
}
} else {
break;
}
}
return retval;
}
public Message removeOOBMessage() {
return removeHelper(true);
}
/**
* Removes as many OOB messages as possible and return the highest seqno
*
* @return the highest seqno or -1 if no OOB message was found
*/
public long removeOOBMessages() {
boolean seenOOB = false;
Message msg;
long seqno = next_to_remove.get();
while ((msg = msgs.get(seqno)) != null && msg.isFlagSet(Message.OOB)) {
if (msgs.replace(seqno, msg, TOMBSTONE)) {
seenOOB = true;
if (!next_to_remove.compareAndSet(seqno, ++seqno)) {
break;
}
} else
break;
}
if (seenOOB)
return seqno - 1;
else
return -1;
}
public boolean hasMessagesToRemove() {
return msgs.containsKey(next_to_remove.get());
}
public void reset() {
msgs.clear();
}
public int size() {
return msgs.size();
}
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(msgs.size()).append(" msgs (").append("next=").append(next_to_remove).append(")");
TreeSet s = new TreeSet(msgs.keySet());
if (!s.isEmpty()) {
sb.append(" [").append(s.first()).append(" - ").append(s.last()).append("]");
sb.append(": ").append(s);
}
return sb.toString();
}
public String printDetails() {
StringBuilder sb = new StringBuilder();
sb.append(msgs.size()).append(" msgs (").append("next=").append(next_to_remove).append(")")
.append(", msgs=").append(new TreeSet(msgs.keySet()));
return sb.toString();
}
}