Uploaded image for project: 'JGroups'
  1. JGroups
  2. JGRP-218

Memgers linger around after connection is closed

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Won't Do
    • Icon: Major Major
    • 2.3
    • 2.2.9.1
    • None

      After JChannel.close() is called and exit the JVM, the membership continue to linger in the channel. The View from the viewAccepted() if flooded with the Addresses from the x-members.

      Steps to reproduce using the enclose program.
      1. copy the enclose program to a directory and name it the JGroupsMember.java.
      2. copy commons-logging.jar, jgroups-all.jar, concurrent.jar and log4j-1.2.9.jar to this directory.
      3. copy the JGroups example conf/total-token.xml to this directory.
      4. create a log4j.xml in this directory.
      5. to compile (on NT)
      javac -classpath .;commons-logging.jar;jgroups-all.jar;concurrent.jar;log4j-1.2.9.jar JGroupsMember.java
      6. open a DOS window and start a server instance
      java -classpath .;commons-logging.jar;jgroups-all.jar;concurrent.jar;log4j-1.2.9.jar JGroupsMember server total-token.xml server 1
      7. open another DOS window and start a client instance
      java -classpath .;commons-logging.jar;jgroups-all.jar;concurrent.jar;log4j-1.2.9.jar JGroupsMember client total-token.xml client 1
      Watch that the viewAccepted() views 2 members (this is OK)
      8. let step 7 to complete, then repeat it the second time from the same DOS window. The viewAccepted() views 2 members (it is still OK)
      9. let step 8 to complete, then repeat it the third time from the same DOS window. The viewAccepted() now views 3 members! (there are only 2 actually.)
      10. watch the DOS window where the server instance is run. You would also observe CoordGmsImpl.handleLeave(): - mbr [<IP>:<Port>] is not a member !
      11. to stop the server instance, goto the DOS window where the client instance was run, and start a client instance
      java -classpath .;commons-logging.jar;jgroups-all.jar;concurrent.jar;log4j-1.2.9.jar JGroupsMember client total-token.xml stop 1

      Observation:
      Steps 9 and 10 exposed lingering members. If continue to repeate step 9, the number of lingering members increases each time.

      Source Code...

      import java.io.IOException;
      import java.io.File;
      import java.io.FileInputStream;
      import java.io.FileNotFoundException;
      import java.util.Vector;
      import java.util.Iterator;

      import org.apache.commons.logging.Log;
      import org.apache.commons.logging.LogFactory;

      import org.jgroups.Address;
      import org.jgroups.Channel;
      import org.jgroups.JChannel;
      import org.jgroups.JChannelFactory;
      import org.jgroups.MembershipListener;
      import org.jgroups.Message;
      import org.jgroups.MessageListener;
      import org.jgroups.View;
      import org.jgroups.ChannelException;
      import org.jgroups.ChannelClosedException;
      import org.jgroups.ChannelNotConnectedException;
      import org.jgroups.TimeoutException;
      import org.jgroups.SuspectedException;

      import org.jgroups.conf.XmlConfigurator;

      import org.jgroups.blocks.GroupRequest;
      import org.jgroups.blocks.MessageDispatcher;
      import org.jgroups.blocks.RequestHandler;

      import org.jgroups.util.Util;

      /**

      • Usage: JGroupsMember <role> <protocolConfigXmlFileName> <message> <numberOfRepeats>
      • role = client or server. A client will connect and close, and a server will connect and
      • stay on the channel forever.
      • protocolConfigXmlFileName = the path name to the JGroups protocol stack configuration
      • XML file.
      • message = any message, use quote if blank spaces are embeded. A special message "stop"
      • from a client role would shutdown the server.
      • numberOfRepeats = number of time to repeat the sending of the message. Applicable only
      • if role is client.
        */
        public class JGroupsMember implements RequestHandler, MembershipListener, MessageListener
        {
        private Log logger;
        private String role;
        private String protocolConfigXmlFile;
        private String messageContents = null;
        private int numOfRepeats = 1;
        private Channel channel = null;
        private String groupName = "win2kschouProto";
        private MessageDispatcher disp = null;
        private long jtimeout = 30000; // miliseconds
        private long startTime;
        private Address serverAddress = null;
        private Object serverAddressMutex = new Object();
        private Object serverEnds = new Object();

      public JGroupsMember(String role, String protocolConfigXmlFile, String message,
      int numOfRepeats)
      {
      this.role = role;
      this.protocolConfigXmlFile = protocolConfigXmlFile;
      this.messageContents = message;
      this.numOfRepeats = (numOfRepeats > 0 ? numOfRepeats : 1);
      this.logger = LogFactory.getLog(getClass());
      Runtime.getRuntime().addShutdownHook(new Thread("Shutdown")
      {
      public void run()
      {
      if (null != channel)
      {
      logger.info("["getLocalAddress()
      "] shutdown and close channel...");
      channel.close();
      }
      else
      logger.info("["getLocalAddress()
      "] shutdown");
      }
      });

      }

      public boolean init()
      {
      boolean initComplete = false;
      String addr = null;

      try
      {
      // create a channel
      logger.debug("Creating a channel...");
      JChannelFactory factory = new JChannelFactory(new File(protocolConfigXmlFile));
      channel = factory.createChannel();
      }
      catch (NullPointerException nx)
      {
      logger.error("Missing required JGroups protocol stack configuration XML file.");
      return initComplete = false;
      }
      catch (ChannelException cx)
      {
      logger.error("Failed to init JGroups protocal stack when creating Channel."+
      " Error: ", cx);
      return initComplete = false;
      }

      try
      {
      logger.debug("Creating a MessageDispatcher using the channel...");
      // MessageDispatcher(Channel, MessageListener, MembershipListener, RequestHandler)
      disp = new MessageDispatcher(channel, this, this, this);

      try
      {
      // connect the channel to the group
      logger.debug("Connecting the channel to the group \""groupName"\"");
      channel.connect(groupName);
      }
      catch (ChannelClosedException csx)
      {
      logger.error("Attempt to connect using a closed channel.", csx);
      return initComplete = false;
      }
      catch (ChannelException cex)
      {
      logger.error("Failed to start JGroups protocal stack. Error: ", cex);
      return initComplete = false;
      }

      if (!channel.isConnected())
      {
      logger.error("Failed to connect to the group.");
      return initComplete = false;
      }

      addr = getLocalAddress();
      logger.debug("The channel is connected using the address "+addr);

      try
      {
      // request for the state of the group, i.e. the server address
      while (null == serverAddress)
      {
      if (logger.isDebugEnabled())
      logger.debug("["+addr+"] Requesting for group State from the"+
      " coordinator using timeout "jtimeout"ms...");
      startTime = System.currentTimeMillis();
      if (!channel.getState(null, jtimeout))
      {
      if ((System.currentTimeMillis() - startTime) < jtimeout)
      {
      if (logger.isDebugEnabled())
      logger.debug("["+addr+"] unable to get group state, may be"+
      " because this is the first member joining the"+
      " group.");
      }
      else
      {
      if (logger.isWarnEnabled())
      logger.warn("["+addr+"] unable to get group state after "+
      jtimeout+"ms timeout.");
      }
      }
      if (role.equals("server"))
      break;
      }
      }
      catch (ChannelNotConnectedException cnex)
      {
      if (logger.isErrorEnabled())
      logger.error("["+addr+"] failed to get group state,"+
      " ChannelNotConnectedException: "+cnex.getMessage());
      return initComplete = false;
      }
      catch (ChannelClosedException ccex)
      {
      if (logger.isErrorEnabled())
      logger.error("["+addr+"] failed to get group state, the channel is"+
      " already closed and cannot be reused -"+
      " ChannelClosedException: "+ ccex.getMessage());
      return initComplete = false;
      }

      Message msg = new Message(null, null, role);
      logger.debug("["+addr+"] casting my role ("role") to the group...");
      disp.castMessage(null, msg, GroupRequest.GET_NONE, 0); // timeout 0 has no effect
      // because of GET_NONE
      return initComplete = true;
      }
      finally
      {
      if (null != channel && !initComplete)
      {
      channel.close();
      channel = null;
      }
      }
      }

      public void run()
      {
      String addr = getLocalAddress();

      if (role.equals("server"))
      runServer();
      else
      runClient(messageContents, numOfRepeats);

      if (null != channel)
      {
      logger.info("["+addr+"] closing channel...");
      channel.close();
      channel = null;
      logger.info("["+addr+"] exits___ __ _ .. .");
      }
      }

      public void runServer()
      {
      String addr = getLocalAddress();

      synchronized (serverEnds)
      {
      try
      {
      logger.debug("["+addr+"] server main thread goes into wait...");
      serverEnds.wait(0);
      }
      catch(InterruptedException ix)
      {
      logger.info("["+addr+"] server main thread is waken up.");
      }
      }
      logger.info("["+addr+"] runServer() stops.");
      }

      public void runClient(String clientMessage, int repeats)
      {
      String addr = getLocalAddress();
      for (int i = 0; i < repeats; i++)
      {
      Message msg = new Message(serverAddress, null, clientMessage);
      if (logger.isDebugEnabled())
      logger.debug("["+addr+"] sending Message \""clientMessage"\" to server at"+
      serverAddress);
      try
      {
      Object reply = disp.sendMessage(msg, GroupRequest.GET_FIRST, 3000);
      if (null != reply)
      {
      if (logger.isDebugEnabled())
      logger.debug("["+addr+"] received reply \""reply.toString()"\"");
      }
      else
      logger.debug("["+addr+"] received null reply");
      }
      catch(TimeoutException tex)
      {
      logger.error("["+addr+"] timeout exception when sending Message \""+
      clientMessage+"\"; Exception: "+tex.getMessage());
      break;
      }
      catch(SuspectedException sex)
      {
      logger.error("["+addr+"] suspected exception when sending Message \""+
      clientMessage+"\"; Exception: "+sex.getMessage());
      break;
      }
      }
      }

      /**

      • A MembershipListener callback. It is called by the JGroups to notify this member to
      • stop sending messages to the group.
        */
        public void block()
        {
        if (logger.isDebugEnabled())
        logger.debug("["+getLocalAddress()+"] MembershipListener.block() is called"+
        " notifying this member to stop sending messages...");
        }

      /**

      • A MembershipListener callback. It is called when a member is suspected being crashed,
      • but has not yet been excluded from the group.
      • @param suspectedMbr is the Address of the suspected crashed member.
        */
        public void suspect(Address suspectedMbr)
        {
        if (logger.isDebugEnabled())
        logger.debug("["+getLocalAddress()+"] MembershipListener.suspect(Address="+
        suspectedMbr+") is called notifying a suspected crashed member...");
        }

      /**

      • A MembershipListener callback. It is called when a change in group membership has
      • occurred, either new member joins, existing member leaves or has been suspected crashed.
      • @param mbrView is a View containing Address of all members in the group.
        */
        public void viewAccepted(View mbrView)
        {
        String addr = getLocalAddress();
        logger.info("["+addr+"] MembershipListener.viewAccepted(View) is called...");

      if (null == mbrView)
      {
      logger.warn("["+addr+"] a null View is received.");
      return;
      }

      Vector mbrs = mbrView.getMembers();
      if (logger.isDebugEnabled())
      {
      logger.debug("["+addr+"] View has " + mbrs.size() + " members.");
      Address mbr = null;
      for (Iterator it = mbrs.iterator(); it.hasNext(); )
      {
      mbr = (Address)it.next();
      logger.debug("["+addr+"] View a member address: " + mbr);
      }
      }
      }

      /**

      • A MessageListener callback. This method is called when another member invokes its
      • Channel.getState() requesting for group state, and this member is chosen to respond,
      • which would indicate that this member is a group coordinator.
      • @return It returns a byte[] containing a serialized Address of the server known
      • by this member. It returns null when failed to serialize the Address.
        */
        public byte[] getState()
        {
        String addr = getLocalAddress();
        logger.info("["+addr+"] is responding to a Channel.getState() inquiry from another"+
        " member...");

      try
      {
      byte[] result = null;
      logger.debug("["+addr+"] waiting to lock serverAddress to read...");
      synchronized (serverAddressMutex)
      {
      result = Util.objectToByteBuffer(serverAddress);
      logger.debug("["+addr+"] has read serverAddress;"+
      " releasing serverAddress lock...");
      }
      logger.info("["+addr+"] replying Channel.getState() inquiry...");
      return result;
      }
      catch (Exception ex)
      {
      logger.error("["+addr+"] failed to serialize reply to the Channel.getState()"+
      " inquiry; Exception: ", ex);
      logger.error("["+addr+"] replying null to Channel.getState() inquiry...");
      return null;
      }
      }

      /**

      • A MessageListener callback. This method is called to receive a group state after this
      • member has invoked a Channel.getState() call. The state received is a serialized
      • Address of server known by the current group coordinator.
      • @param state is a byte[] containing an Address of server known by the current group
      • coordinator. It could be null if the group coordinator failed to
      • serialize the server address, or the coordinator does not have the
      • server address when responding to this member's inquiry.
        */
        public void setState(byte[] state)
        {
        String addr = getLocalAddress();
        logger.info("["+addr+"] is receiving a group state...");

      try
      {
      if (null != state)
      {
      logger.debug("["+addr+"] waiting to lock serverAddress to write...");
      synchronized (serverAddressMutex)
      {
      serverAddress = (Address)Util.objectFromByteBuffer(state);
      logger.debug("["+addr+"] has written to serverAddress;"+
      " releasing serverAddress lock...");
      }
      if (logger.isDebugEnabled())
      logger.debug("["+addr+"] Got a state from a group coordinator that "+
      "the server address is "+serverAddress);
      }
      else
      logger.error("["+addr+"] Got null state.");
      }
      catch (Exception ex)
      {
      logger.error("["+addr+"] failed to de-serialize the state; Exception: ", ex);
      }
      }

      /**

      • A MessageListener callback. It is called to receive a Message sent from another member.
      • @param message the Message received.
        */
        public void receive(Message message)
        {
        String addr = getLocalAddress();
        if (logger.isDebugEnabled())
        logger.debug("["+addr+"] is receiving a Message via MessageListener...");

      if (null == message)
      {
      logger.error("["+addr+"] got a null Message.");
      return;
      }

      Address src = message.getSrc();
      if (message.getObject() instanceof String)
      {
      String contents = (String)message.getObject();
      if (logger.isDebugEnabled())
      {
      logger.debug("["+addr+"] got a Message \""contents"\" from member at "+
      src.toString());
      }
      }
      else
      {
      if (logger.isDebugEnabled())
      {
      logger.debug("["+addr+"] got a Message from member at "+src.toString());
      }
      }
      }

      /**

      • A RequestHandler callback. It is called to receive a Message multi-casted by another
      • member in the group, e.g. by a MessageDispatcher.castMessage(Message).
      • @param message the Message received.
      • @return If it is a server, it returns a String "ACK". If it is a client,
      • it returns null.
        */
        public Object handle(Message message)
        {
        String addr = getLocalAddress();
        if (logger.isDebugEnabled())
        logger.debug("["+addr+"] is receiving a Message via RequestHandler...");

      if (null == message)
      {
      logger.error("["+addr+"] got a null Message.");
      return null;
      }

      Address src = message.getSrc();
      if (message.getObject() instanceof String)
      {
      String contents = (String)message.getObject();
      if (logger.isDebugEnabled())
      {
      logger.debug("["+addr+"] got a Message \""contents"\" from member at "+
      src.toString());
      }
      if (contents.equals("server"))
      {
      logger.debug("["+addr+"] waiting to lock serverAddress to write...");
      synchronized (serverAddressMutex)
      {
      serverAddress = src;
      if (logger.isDebugEnabled())
      logger.debug("["+addr+"] has written to serverAddress = "src
      "; releasing serverAddress lock..");
      }
      }
      else if (contents.equals("stop"))
      {
      logger.debug("["+addr+"] received a \"stop\" message;"+
      " waiting to lock serverEnds...");
      synchronized (serverEnds)
      {
      logger.debug("["+addr+"] waking up server main thread...");
      serverEnds.notifyAll();
      }
      }
      }
      else
      {
      if (logger.isDebugEnabled())
      {
      logger.debug("["+addr+"] got a Message from member at "+src.toString());
      }
      }

      if (role.equals("server"))
      {
      logger.debug("["+addr+"] replying \"ACK\"...");
      return new String("ACK");
      }
      else
      {
      logger.debug("["+addr+"] replying null...");
      return null;
      }
      }

      /*

      • Returns a String of the Address this member is connected to the JGroups. It returns
      • null if the channel is not connected.
        */
        private String getLocalAddress()
        {
        if (null == channel)
        return null;
        else
        {
        Address addr = channel.getLocalAddress();
        if (null == addr) // the channel is in closed state
        return null;
        else
        return addr.toString();
        }
        }

      public static void main(String args[])
      {
      String role, propFile, message;
      int numOfRepeats;
      JGroupsMember member = null;

      if (4 != args.length)
      {
      System.out.println(
      "Usage: JGroupsMember <role> <configXmlFile> <message> <repeats>");
      System.out.println(" role = client or server.");
      System.out.println(" configXmlFile = the JGroups protocol stack config file.");
      System.out.println(" For example: config/total-token.xml");
      System.out.println(" message = the message to multi-cast to the group.");
      System.out.println(" A message \"stop\" from a client will stop");
      System.out.println(" all servers.");
      System.out.println(" repeats = if the role is \"client\", then how many");
      System.out.println(" times to repeat the sending of the message.");
      return;
      }
      else
      {
      role = args[0];
      propFile = args[1];
      message = args[2];
      numOfRepeats = Integer.parseInt(args[3]);
      }

      member = new JGroupsMember(role, propFile, message, numOfRepeats);
      member.init();
      member.run();
      }
      }

              rhn-engineering-bban Bela Ban
              simon_chou Simon Chou (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              0 Start watching this issue

                Created:
                Updated:
                Resolved: