package org.jgroups.stack; import org.jgroups.Message; import org.jgroups.util.Util; import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; /** * Counterpart of AckSenderWindow. Simple FIFO buffer. Every message received is ACK'ed (even * duplicates) and added to a hashmap keyed by seqno. The next seqno to be received is stored in * next_to_remove. When a message with a seqno less than next_to_remove is received, it * will be discarded. The remove() method removes and returns a message whose seqno is * equal to next_to_remove, or null if not found.
* Change May 28 2002 (bela): replaced TreeSet with HashMap. Keys do not need to be sorted, and * adding a key to a sorted set incurs overhead. * * @author Bela Ban * @version $Id: AckReceiverWindow.java,v 1.40 2010/01/20 06:59:15 belaban Exp $ */ public class AckReceiverWindow { private final static int TOMBSTONE_WINDOW_SIZE = 2; private final static Message TOMBSTONE = new Message(); private final AtomicLong next_to_remove = new AtomicLong(1); private final AtomicLong low_watermark = new AtomicLong(1); private final ConcurrentMap msgs = new ConcurrentHashMap(); private final AtomicBoolean processing = new AtomicBoolean(false); public AckReceiverWindow(long initial_seqno) { next_to_remove.set(initial_seqno); low_watermark.set(initial_seqno); } public AtomicBoolean getProcessing() { return processing; } public Message get(long seqno) { return msgs.get(seqno); } /** * Adds a new message. Message cannot be null * * @return True if the message was added, false if not (e.g. duplicate, message was already * present) */ public boolean add(long seqno, Message msg) { return add2(seqno, msg) == 1; } /** * Adds a message if not yet received * * @param seqno * @param msg * @return -1 if not added because seqno < next_to_remove, 0 if not added because already * present, 1 if added successfully */ public byte add2(long seqno, Message msg) { if (msg == null) throw new IllegalArgumentException("msg must be non-null"); long next = next_to_remove.get(); if (seqno >= next) { Message retval = msgs.putIfAbsent(seqno, msg); if (retval == null) { moveWatermarkTo(next); return 1; } else if (retval == TOMBSTONE) { return -1; } return 0; } return -1; } public byte add2(long seqno, Message msg, long timeout) { if (msg == null) throw new IllegalArgumentException("msg must be non-null"); long next = next_to_remove.get(); Util.sleep(timeout); if (seqno >= next) { Message retval = msgs.putIfAbsent(seqno, msg); if (retval == null) { moveWatermarkTo(next); return 1; } else if (retval == TOMBSTONE) { return -1; } return 0; } return -1; } private void moveWatermarkTo(long seqno) { long until = seqno - TOMBSTONE_WINDOW_SIZE; long start_from = low_watermark.get(); while (start_from < until && msgs.remove(start_from++, TOMBSTONE)) { low_watermark.set(start_from); } } private Message removeHelper(boolean oob) { long seqno = next_to_remove.get(); Message retval = msgs.get(seqno); if (retval != null) { if (oob && !retval.isFlagSet(Message.OOB)) return null; if (msgs.replace(seqno, retval, TOMBSTONE)) { next_to_remove.compareAndSet(seqno, seqno + 1); } else { retval = null; } } return retval; } /** * Removes a message whose seqno is equal to next_to_remove, increments the latter. * Returns message that was removed, or null, if no message can be removed. Messages are thus * removed in order. */ public Message remove() { return removeHelper(false); } /** * Removes as many messages as possible (in sequence, without gaps) * * @return */ public List removeMany() { List retval = new LinkedList(); // we remove msgs.size() messages *max* Message msg; long seqno = next_to_remove.get(); while ((msg = msgs.get(seqno)) != null) { if (msgs.replace(seqno, msg, TOMBSTONE)) { retval.add(msg); if (!next_to_remove.compareAndSet(seqno, ++seqno)) { break; } } else { break; } } return retval; } public Message removeOOBMessage() { return removeHelper(true); } /** * Removes as many OOB messages as possible and return the highest seqno * * @return the highest seqno or -1 if no OOB message was found */ public long removeOOBMessages() { boolean seenOOB = false; Message msg; long seqno = next_to_remove.get(); while ((msg = msgs.get(seqno)) != null && msg.isFlagSet(Message.OOB)) { if (msgs.replace(seqno, msg, TOMBSTONE)) { seenOOB = true; if (!next_to_remove.compareAndSet(seqno, ++seqno)) { break; } } else break; } if (seenOOB) return seqno - 1; else return -1; } public boolean hasMessagesToRemove() { return msgs.containsKey(next_to_remove.get()); } public void reset() { msgs.clear(); } public int size() { return msgs.size(); } public String toString() { StringBuilder sb = new StringBuilder(); sb.append(msgs.size()).append(" msgs (").append("next=").append(next_to_remove).append(")"); TreeSet s = new TreeSet(msgs.keySet()); if (!s.isEmpty()) { sb.append(" [").append(s.first()).append(" - ").append(s.last()).append("]"); sb.append(": ").append(s); } return sb.toString(); } public String printDetails() { StringBuilder sb = new StringBuilder(); sb.append(msgs.size()).append(" msgs (").append("next=").append(next_to_remove).append(")") .append(", msgs=").append(new TreeSet(msgs.keySet())); return sb.toString(); } }