From 4ca0f7e41c8a04d5940200cf7aaab741d4686cfc Mon Sep 17 00:00:00 2001 From: Martin Remmelzwaal Date: Thu, 26 Mar 2009 10:58:23 +0100 Subject: [PATCH 2/3] Forward port of max_retransmit_time feature to JGROUPS 2.6.8.GA. #22103 Patches on JGROUPS 2.6.8.GA source code in order to introduce the "max_retransmit_time" feature from JGroups v2.4SP4-CUSTOMISED in the UNICAST protcol handler (workaround for endless retransmit problem). --- .../java/org/jgroups/protocols/UNICAST.java | 56 +++++++++++++++---- .../java/org/jgroups/stack/AckSenderWindow.java | 49 +++++++++++++++-- 2 files changed, 87 insertions(+), 18 deletions(-) diff --git a/ttnlib/sources/java/org/jgroups/protocols/UNICAST.java b/ttnlib/sources/java/org/jgroups/protocols/UNICAST.java index ea8bbab..4814cf0 100644 --- a/ttnlib/sources/java/org/jgroups/protocols/UNICAST.java +++ b/ttnlib/sources/java/org/jgroups/protocols/UNICAST.java @@ -1,7 +1,24 @@ package org.jgroups.protocols; -import org.jgroups.*; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Vector; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; + +import org.jgroups.Address; +import org.jgroups.Event; +import org.jgroups.Global; +import org.jgroups.Header; +import org.jgroups.Message; +import org.jgroups.View; import org.jgroups.stack.AckReceiverWindow; import org.jgroups.stack.AckSenderWindow; import org.jgroups.stack.Protocol; @@ -11,11 +28,6 @@ import org.jgroups.util.Streamable; import org.jgroups.util.TimeScheduler; import org.jgroups.util.Util; -import java.io.*; -import java.util.*; -import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.atomic.AtomicInteger; - /** * Reliable unicast layer. Uses acknowledgement scheme similar to TCP to provide lossless transmission @@ -34,12 +46,13 @@ import java.util.concurrent.atomic.AtomicInteger; * whenever a message is received: the new message is added and then we try to remove as many messages as * possible (until we stop at a gap, or there are no more messages). * @author Bela Ban - * @version $Id: UNICAST.java,v 1.91.2.14 2008/06/17 08:21:26 belaban Exp $ + * @version UNICAST.java,v 1.91.2.14 (JGROUPS 2.6.8.GA) + CUSTOMISED patch for "max_retransmit_time" feature in UNICAST protocol */ public class UNICAST extends Protocol implements AckSenderWindow.RetransmitCommand { private final Vector
members=new Vector
(11); private final HashMap connections=new HashMap(11); private long[] timeouts={400,800,1600,3200}; // for AckSenderWindow: max time to wait for missing acks + private long maxRetransmitTime = 15*60*1000; // maximum retransmit time private Address local_addr=null; private TimeScheduler timer=null; // used for retransmissions (passed to AckSenderWindow) private Map locks; @@ -214,7 +227,13 @@ public class UNICAST extends Protocol implements AckSenderWindow.RetransmitComma use_gms=Boolean.valueOf(str).booleanValue(); props.remove("use_gms"); } - + + str=props.getProperty("max_retransmit_time"); + if(str != null) { + maxRetransmitTime=Long.valueOf(str).longValue(); + props.remove("max_retransmit_time"); + } + str=props.getProperty("immediate_ack"); if(str != null) { immediate_ack=Boolean.valueOf(str).booleanValue(); @@ -349,7 +368,7 @@ public class UNICAST extends Protocol implements AckSenderWindow.RetransmitComma seqno=entry.sent_msgs_seqno; UnicastHeader hdr=new UnicastHeader(UnicastHeader.DATA, seqno); if(entry.sent_msgs == null) { // first msg to peer 'dst' - entry.sent_msgs=new AckSenderWindow(this, new StaticInterval(timeouts), timer, this.local_addr); // use the global timer + entry.sent_msgs=new AckSenderWindow(this, new StaticInterval(timeouts), timer, this.local_addr, maxRetransmitTime); // use the global timer } msg.putHeader(name, hdr); if(log.isTraceEnabled()) @@ -525,13 +544,26 @@ public class UNICAST extends Protocol implements AckSenderWindow.RetransmitComma } + /** + * Called by AckSenderWindow indicating the retransmission failed (because of max retransmittime exceeded). + */ + public void retransmitFailed(long seqno, Message msg) { + Address dst = msg.getDest(); + if(log.isWarnEnabled()) log.warn("Retransmit failed, seqno=" + seqno + ": dest " + dst + "; removing entry !"); + + // remove connection + synchronized (connections) { + removeConnection(dst); + } + return; + } /** - * Check whether the hashtable contains an entry e for sender (create if not). If - * e.received_msgs is null and first is true: create a new AckReceiverWindow(seqno) and + * Check whether the hashtable contains an entry e for sender (create if not). If e.received_msgs is null and first is true: create a new AckReceiverWindow(seqno) and * add message. Set e.received_msgs to the new window. Else just add the message. + * * @return boolean True if we can send an ack, false otherwise */ private boolean handleDataReceived(Address sender, long seqno, Message msg) { diff --git a/ttnlib/sources/java/org/jgroups/stack/AckSenderWindow.java b/ttnlib/sources/java/org/jgroups/stack/AckSenderWindow.java index 36c0978..2ad5fae 100644 --- a/ttnlib/sources/java/org/jgroups/stack/AckSenderWindow.java +++ b/ttnlib/sources/java/org/jgroups/stack/AckSenderWindow.java @@ -3,6 +3,11 @@ package org.jgroups.stack; +import java.util.Map; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.jgroups.Address; @@ -10,10 +15,6 @@ import org.jgroups.Message; import org.jgroups.util.TimeScheduler; import org.jgroups.util.Util; -import java.util.TreeSet; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - /** * ACK-based sliding window for a sender. Messages are added to the window keyed by seqno @@ -23,17 +24,22 @@ import java.util.concurrent.ConcurrentMap; * table left, the thread terminates. It will be re-activated when a new entry is added to the * retransmission table. * @author Bela Ban + * @version JGROUPS 2.6.8.GA + CUSTOMISED patch for "max_retransmit_time" feature in UNICAST protocol */ public class AckSenderWindow implements Retransmitter.RetransmitCommand { RetransmitCommand retransmit_command = null; // called to request XMIT of msg final ConcurrentMap msgs=new ConcurrentHashMap(); // keys: seqnos (Long), values: Messages + /** keys: seqnos (Long), values: #retransmit timestamp of message */ + final Map msgs_retransmitStopTimes=new ConcurrentHashMap(); Interval interval=new StaticInterval(400,800,1200,1600); final Retransmitter retransmitter; + long maxRetransmitTime = 15*60*1000; // 15 minutes per default static final Log log=LogFactory.getLog(AckSenderWindow.class); public interface RetransmitCommand { void retransmit(long seqno, Message msg); + void retransmitFailed(long seqno, Message msg); } @@ -73,10 +79,18 @@ public class AckSenderWindow implements Retransmitter.RetransmitCommand { retransmitter.setRetransmitTimeouts(interval); } + public AckSenderWindow(RetransmitCommand com, Interval interval, TimeScheduler sched, Address sender, long maxRetransmitTime) { + retransmit_command = com; + this.interval = interval; + retransmitter = new Retransmitter(sender, this, sched); + retransmitter.setRetransmitTimeouts(interval); + this.maxRetransmitTime = maxRetransmitTime; + } public void reset() { msgs.clear(); + msgs_retransmitStopTimes.clear(); // moved out of sync scope: Retransmitter.reset()/add()/remove() are sync'ed anyway // Bela Jan 15 2003 @@ -92,7 +106,16 @@ public class AckSenderWindow implements Retransmitter.RetransmitCommand { * threshold (min_threshold) */ public void add(long seqno, Message msg) { - msgs.putIfAbsent(seqno, msg); + final Long seqNoAsLong = Long.valueOf(seqno); + + synchronized (msgs_retransmitStopTimes) { // update atomically + msgs.putIfAbsent(seqno, msg); + if(!msgs_retransmitStopTimes.containsKey(seqNoAsLong)) { + msgs_retransmitStopTimes.put(seqNoAsLong, + (Long)(System.currentTimeMillis() + maxRetransmitTime)); // set max retransmit time + } + } + retransmitter.add(seqno, seqno); } @@ -105,6 +128,7 @@ public class AckSenderWindow implements Retransmitter.RetransmitCommand { */ public void ack(long seqno) { msgs.remove(new Long(seqno)); + msgs_retransmitStopTimes.remove(new Long(seqno)); retransmitter.remove(seqno); } @@ -141,7 +165,18 @@ public class AckSenderWindow implements Retransmitter.RetransmitCommand { append(" - ").append(last_seqno).append(" from ").append(sender)); for(long i = first_seqno; i <= last_seqno; i++) { if((msg=msgs.get(i)) != null) { // find the message to retransmit - retransmit_command.retransmit(i, msg); + // check if the max number of retransmits is exceeded + Long retransmitStopTime = null; + if ((retransmitStopTime = (Long)msgs_retransmitStopTimes.get(new Long(i))) != null) { + if (retransmitStopTime > System.currentTimeMillis()) + retransmit_command.retransmit(i, msg); + else + { + ack(i); // stops retransmitting this message + retransmit_command.retransmitFailed(i, msg); + } + } + } } } @@ -180,6 +215,8 @@ public class AckSenderWindow implements Retransmitter.RetransmitCommand { if(log.isDebugEnabled()) log.debug("seqno=" + seqno); curr_time = System.currentTimeMillis(); } + + public void retransmitFailed(long seqno, Message msg) {} } -- 1.6.2.1217.gd7bc3