-
Bug
-
Resolution: Won't Do
-
Major
-
2.2.9.1
-
None
After JChannel.close() is called and exit the JVM, the membership continue to linger in the channel. The View from the viewAccepted() if flooded with the Addresses from the x-members.
Steps to reproduce using the enclose program.
1. copy the enclose program to a directory and name it the JGroupsMember.java.
2. copy commons-logging.jar, jgroups-all.jar, concurrent.jar and log4j-1.2.9.jar to this directory.
3. copy the JGroups example conf/total-token.xml to this directory.
4. create a log4j.xml in this directory.
5. to compile (on NT)
javac -classpath .;commons-logging.jar;jgroups-all.jar;concurrent.jar;log4j-1.2.9.jar JGroupsMember.java
6. open a DOS window and start a server instance
java -classpath .;commons-logging.jar;jgroups-all.jar;concurrent.jar;log4j-1.2.9.jar JGroupsMember server total-token.xml server 1
7. open another DOS window and start a client instance
java -classpath .;commons-logging.jar;jgroups-all.jar;concurrent.jar;log4j-1.2.9.jar JGroupsMember client total-token.xml client 1
Watch that the viewAccepted() views 2 members (this is OK)
8. let step 7 to complete, then repeat it the second time from the same DOS window. The viewAccepted() views 2 members (it is still OK)
9. let step 8 to complete, then repeat it the third time from the same DOS window. The viewAccepted() now views 3 members! (there are only 2 actually.)
10. watch the DOS window where the server instance is run. You would also observe CoordGmsImpl.handleLeave(): - mbr [<IP>:<Port>] is not a member !
11. to stop the server instance, goto the DOS window where the client instance was run, and start a client instance
java -classpath .;commons-logging.jar;jgroups-all.jar;concurrent.jar;log4j-1.2.9.jar JGroupsMember client total-token.xml stop 1
Observation:
Steps 9 and 10 exposed lingering members. If continue to repeate step 9, the number of lingering members increases each time.
Source Code...
import java.io.IOException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.util.Vector;
import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jgroups.Address;
import org.jgroups.Channel;
import org.jgroups.JChannel;
import org.jgroups.JChannelFactory;
import org.jgroups.MembershipListener;
import org.jgroups.Message;
import org.jgroups.MessageListener;
import org.jgroups.View;
import org.jgroups.ChannelException;
import org.jgroups.ChannelClosedException;
import org.jgroups.ChannelNotConnectedException;
import org.jgroups.TimeoutException;
import org.jgroups.SuspectedException;
import org.jgroups.conf.XmlConfigurator;
import org.jgroups.blocks.GroupRequest;
import org.jgroups.blocks.MessageDispatcher;
import org.jgroups.blocks.RequestHandler;
import org.jgroups.util.Util;
/**
- Usage: JGroupsMember <role> <protocolConfigXmlFileName> <message> <numberOfRepeats>
- role = client or server. A client will connect and close, and a server will connect and
- stay on the channel forever.
- protocolConfigXmlFileName = the path name to the JGroups protocol stack configuration
- XML file.
- message = any message, use quote if blank spaces are embeded. A special message "stop"
- from a client role would shutdown the server.
- numberOfRepeats = number of time to repeat the sending of the message. Applicable only
- if role is client.
*/
public class JGroupsMember implements RequestHandler, MembershipListener, MessageListener
{
private Log logger;
private String role;
private String protocolConfigXmlFile;
private String messageContents = null;
private int numOfRepeats = 1;
private Channel channel = null;
private String groupName = "win2kschouProto";
private MessageDispatcher disp = null;
private long jtimeout = 30000; // miliseconds
private long startTime;
private Address serverAddress = null;
private Object serverAddressMutex = new Object();
private Object serverEnds = new Object();
public JGroupsMember(String role, String protocolConfigXmlFile, String message,
int numOfRepeats)
{
this.role = role;
this.protocolConfigXmlFile = protocolConfigXmlFile;
this.messageContents = message;
this.numOfRepeats = (numOfRepeats > 0 ? numOfRepeats : 1);
this.logger = LogFactory.getLog(getClass());
Runtime.getRuntime().addShutdownHook(new Thread("Shutdown")
{
public void run()
{
if (null != channel)
{
logger.info("["getLocalAddress()
"] shutdown and close channel...");
channel.close();
}
else
logger.info("["getLocalAddress()
"] shutdown");
}
});
}
public boolean init()
{
boolean initComplete = false;
String addr = null;
try
{
// create a channel
logger.debug("Creating a channel...");
JChannelFactory factory = new JChannelFactory(new File(protocolConfigXmlFile));
channel = factory.createChannel();
}
catch (NullPointerException nx)
{
logger.error("Missing required JGroups protocol stack configuration XML file.");
return initComplete = false;
}
catch (ChannelException cx)
{
logger.error("Failed to init JGroups protocal stack when creating Channel."+
" Error: ", cx);
return initComplete = false;
}
try
{
logger.debug("Creating a MessageDispatcher using the channel...");
// MessageDispatcher(Channel, MessageListener, MembershipListener, RequestHandler)
disp = new MessageDispatcher(channel, this, this, this);
try
{
// connect the channel to the group
logger.debug("Connecting the channel to the group \""groupName"\"");
channel.connect(groupName);
}
catch (ChannelClosedException csx)
{
logger.error("Attempt to connect using a closed channel.", csx);
return initComplete = false;
}
catch (ChannelException cex)
{
logger.error("Failed to start JGroups protocal stack. Error: ", cex);
return initComplete = false;
}
if (!channel.isConnected())
{
logger.error("Failed to connect to the group.");
return initComplete = false;
}
addr = getLocalAddress();
logger.debug("The channel is connected using the address "+addr);
try
{
// request for the state of the group, i.e. the server address
while (null == serverAddress)
{
if (logger.isDebugEnabled())
logger.debug("["+addr+"] Requesting for group State from the"+
" coordinator using timeout "jtimeout"ms...");
startTime = System.currentTimeMillis();
if (!channel.getState(null, jtimeout))
{
if ((System.currentTimeMillis() - startTime) < jtimeout)
{
if (logger.isDebugEnabled())
logger.debug("["+addr+"] unable to get group state, may be"+
" because this is the first member joining the"+
" group.");
}
else
{
if (logger.isWarnEnabled())
logger.warn("["+addr+"] unable to get group state after "+
jtimeout+"ms timeout.");
}
}
if (role.equals("server"))
break;
}
}
catch (ChannelNotConnectedException cnex)
{
if (logger.isErrorEnabled())
logger.error("["+addr+"] failed to get group state,"+
" ChannelNotConnectedException: "+cnex.getMessage());
return initComplete = false;
}
catch (ChannelClosedException ccex)
{
if (logger.isErrorEnabled())
logger.error("["+addr+"] failed to get group state, the channel is"+
" already closed and cannot be reused -"+
" ChannelClosedException: "+ ccex.getMessage());
return initComplete = false;
}
Message msg = new Message(null, null, role);
logger.debug("["+addr+"] casting my role ("role") to the group...");
disp.castMessage(null, msg, GroupRequest.GET_NONE, 0); // timeout 0 has no effect
// because of GET_NONE
return initComplete = true;
}
finally
{
if (null != channel && !initComplete)
{
channel.close();
channel = null;
}
}
}
public void run()
{
String addr = getLocalAddress();
if (role.equals("server"))
runServer();
else
runClient(messageContents, numOfRepeats);
if (null != channel)
{
logger.info("["+addr+"] closing channel...");
channel.close();
channel = null;
logger.info("["+addr+"] exits___ __ _ .. .");
}
}
public void runServer()
{
String addr = getLocalAddress();
synchronized (serverEnds)
{
try
{
logger.debug("["+addr+"] server main thread goes into wait...");
serverEnds.wait(0);
}
catch(InterruptedException ix)
{
logger.info("["+addr+"] server main thread is waken up.");
}
}
logger.info("["+addr+"] runServer() stops.");
}
public void runClient(String clientMessage, int repeats)
{
String addr = getLocalAddress();
for (int i = 0; i < repeats; i++)
{
Message msg = new Message(serverAddress, null, clientMessage);
if (logger.isDebugEnabled())
logger.debug("["+addr+"] sending Message \""clientMessage"\" to server at"+
serverAddress);
try
{
Object reply = disp.sendMessage(msg, GroupRequest.GET_FIRST, 3000);
if (null != reply)
{
if (logger.isDebugEnabled())
logger.debug("["+addr+"] received reply \""reply.toString()"\"");
}
else
logger.debug("["+addr+"] received null reply");
}
catch(TimeoutException tex)
{
logger.error("["+addr+"] timeout exception when sending Message \""+
clientMessage+"\"; Exception: "+tex.getMessage());
break;
}
catch(SuspectedException sex)
{
logger.error("["+addr+"] suspected exception when sending Message \""+
clientMessage+"\"; Exception: "+sex.getMessage());
break;
}
}
}
/**
- A MembershipListener callback. It is called by the JGroups to notify this member to
- stop sending messages to the group.
*/
public void block()
{
if (logger.isDebugEnabled())
logger.debug("["+getLocalAddress()+"] MembershipListener.block() is called"+
" notifying this member to stop sending messages...");
}
/**
- A MembershipListener callback. It is called when a member is suspected being crashed,
- but has not yet been excluded from the group.
- @param suspectedMbr is the Address of the suspected crashed member.
*/
public void suspect(Address suspectedMbr)
{
if (logger.isDebugEnabled())
logger.debug("["+getLocalAddress()+"] MembershipListener.suspect(Address="+
suspectedMbr+") is called notifying a suspected crashed member...");
}
/**
- A MembershipListener callback. It is called when a change in group membership has
- occurred, either new member joins, existing member leaves or has been suspected crashed.
- @param mbrView is a View containing Address of all members in the group.
*/
public void viewAccepted(View mbrView)
{
String addr = getLocalAddress();
logger.info("["+addr+"] MembershipListener.viewAccepted(View) is called...");
if (null == mbrView)
{
logger.warn("["+addr+"] a null View is received.");
return;
}
Vector mbrs = mbrView.getMembers();
if (logger.isDebugEnabled())
{
logger.debug("["+addr+"] View has " + mbrs.size() + " members.");
Address mbr = null;
for (Iterator it = mbrs.iterator(); it.hasNext(); )
{
mbr = (Address)it.next();
logger.debug("["+addr+"] View a member address: " + mbr);
}
}
}
/**
- A MessageListener callback. This method is called when another member invokes its
- Channel.getState() requesting for group state, and this member is chosen to respond,
- which would indicate that this member is a group coordinator.
- @return It returns a byte[] containing a serialized Address of the server known
- by this member. It returns null when failed to serialize the Address.
*/
public byte[] getState()
{
String addr = getLocalAddress();
logger.info("["+addr+"] is responding to a Channel.getState() inquiry from another"+
" member...");
try
{
byte[] result = null;
logger.debug("["+addr+"] waiting to lock serverAddress to read...");
synchronized (serverAddressMutex)
{
result = Util.objectToByteBuffer(serverAddress);
logger.debug("["+addr+"] has read serverAddress;"+
" releasing serverAddress lock...");
}
logger.info("["+addr+"] replying Channel.getState() inquiry...");
return result;
}
catch (Exception ex)
{
logger.error("["+addr+"] failed to serialize reply to the Channel.getState()"+
" inquiry; Exception: ", ex);
logger.error("["+addr+"] replying null to Channel.getState() inquiry...");
return null;
}
}
/**
- A MessageListener callback. This method is called to receive a group state after this
- member has invoked a Channel.getState() call. The state received is a serialized
- Address of server known by the current group coordinator.
- @param state is a byte[] containing an Address of server known by the current group
- coordinator. It could be null if the group coordinator failed to
- serialize the server address, or the coordinator does not have the
- server address when responding to this member's inquiry.
*/
public void setState(byte[] state)
{
String addr = getLocalAddress();
logger.info("["+addr+"] is receiving a group state...");
try
{
if (null != state)
{
logger.debug("["+addr+"] waiting to lock serverAddress to write...");
synchronized (serverAddressMutex)
{
serverAddress = (Address)Util.objectFromByteBuffer(state);
logger.debug("["+addr+"] has written to serverAddress;"+
" releasing serverAddress lock...");
}
if (logger.isDebugEnabled())
logger.debug("["+addr+"] Got a state from a group coordinator that "+
"the server address is "+serverAddress);
}
else
logger.error("["+addr+"] Got null state.");
}
catch (Exception ex)
{
logger.error("["+addr+"] failed to de-serialize the state; Exception: ", ex);
}
}
/**
- A MessageListener callback. It is called to receive a Message sent from another member.
- @param message the Message received.
*/
public void receive(Message message)
{
String addr = getLocalAddress();
if (logger.isDebugEnabled())
logger.debug("["+addr+"] is receiving a Message via MessageListener...");
if (null == message)
{
logger.error("["+addr+"] got a null Message.");
return;
}
Address src = message.getSrc();
if (message.getObject() instanceof String)
{
String contents = (String)message.getObject();
if (logger.isDebugEnabled())
{
logger.debug("["+addr+"] got a Message \""contents"\" from member at "+
src.toString());
}
}
else
{
if (logger.isDebugEnabled())
{
logger.debug("["+addr+"] got a Message from member at "+src.toString());
}
}
}
/**
- A RequestHandler callback. It is called to receive a Message multi-casted by another
- member in the group, e.g. by a MessageDispatcher.castMessage(Message).
- @param message the Message received.
- @return If it is a server, it returns a String "ACK". If it is a client,
- it returns null.
*/
public Object handle(Message message)
{
String addr = getLocalAddress();
if (logger.isDebugEnabled())
logger.debug("["+addr+"] is receiving a Message via RequestHandler...");
if (null == message)
{
logger.error("["+addr+"] got a null Message.");
return null;
}
Address src = message.getSrc();
if (message.getObject() instanceof String)
{
String contents = (String)message.getObject();
if (logger.isDebugEnabled())
{
logger.debug("["+addr+"] got a Message \""contents"\" from member at "+
src.toString());
}
if (contents.equals("server"))
{
logger.debug("["+addr+"] waiting to lock serverAddress to write...");
synchronized (serverAddressMutex)
{
serverAddress = src;
if (logger.isDebugEnabled())
logger.debug("["+addr+"] has written to serverAddress = "src
"; releasing serverAddress lock..");
}
}
else if (contents.equals("stop"))
{
logger.debug("["+addr+"] received a \"stop\" message;"+
" waiting to lock serverEnds...");
synchronized (serverEnds)
{
logger.debug("["+addr+"] waking up server main thread...");
serverEnds.notifyAll();
}
}
}
else
{
if (logger.isDebugEnabled())
{
logger.debug("["+addr+"] got a Message from member at "+src.toString());
}
}
if (role.equals("server"))
{
logger.debug("["+addr+"] replying \"ACK\"...");
return new String("ACK");
}
else
{
logger.debug("["+addr+"] replying null...");
return null;
}
}
/*
- Returns a String of the Address this member is connected to the JGroups. It returns
- null if the channel is not connected.
*/
private String getLocalAddress()
{
if (null == channel)
return null;
else
{
Address addr = channel.getLocalAddress();
if (null == addr) // the channel is in closed state
return null;
else
return addr.toString();
}
}
public static void main(String args[])
{
String role, propFile, message;
int numOfRepeats;
JGroupsMember member = null;
if (4 != args.length)
{
System.out.println(
"Usage: JGroupsMember <role> <configXmlFile> <message> <repeats>");
System.out.println(" role = client or server.");
System.out.println(" configXmlFile = the JGroups protocol stack config file.");
System.out.println(" For example: config/total-token.xml");
System.out.println(" message = the message to multi-cast to the group.");
System.out.println(" A message \"stop\" from a client will stop");
System.out.println(" all servers.");
System.out.println(" repeats = if the role is \"client\", then how many");
System.out.println(" times to repeat the sending of the message.");
return;
}
else
{
role = args[0];
propFile = args[1];
message = args[2];
numOfRepeats = Integer.parseInt(args[3]);
}
member = new JGroupsMember(role, propFile, message, numOfRepeats);
member.init();
member.run();
}
}