### Eclipse Workspace Patch 1.0 #P JGroups-2.9.P2 Index: src/org/jgroups/stack/GossipRouter.java =================================================================== RCS file: /cvsroot/javagroups/JGroups/src/org/jgroups/stack/GossipRouter.java,v retrieving revision 1.68.4.2 diff -u -r1.68.4.2 GossipRouter.java --- src/org/jgroups/stack/GossipRouter.java 9 Mar 2010 17:04:16 -0000 1.68.4.2 +++ src/org/jgroups/stack/GossipRouter.java 15 Mar 2010 21:32:08 -0000 @@ -279,6 +279,9 @@ for(ConnectionHandler ce: map.values()) ce.close(); } + if(log.isTraceEnabled()) + log.trace("stop()-> clearing the routing table."); + routingTable.clear(); if(log.isInfoEnabled()) @@ -383,6 +386,8 @@ if(sock_read_timeout > 0) sock.setSoTimeout((int)sock_read_timeout); + log.info("mainLoop()-> Got socket connection[" + sock +"]"); + ConnectionHandler ch=new ConnectionHandler(sock); getDefaultThreadPoolThreadFactory().newThread(ch).start(); } @@ -439,17 +444,31 @@ map=routingTable.get(group); if(map != null && map.remove(addr) != null && map.isEmpty()) routingTable.remove(group); + if(log.isTraceEnabled()) + log.trace("removeEntry()-> removed group[" + group +"] from routing table"); } else { for(Map.Entry> entry: routingTable.entrySet()) { map=entry.getValue(); - if(map != null && map.remove(addr) != null && map.isEmpty()) - routingTable.remove(entry.getKey()); + if(map != null && map.remove(addr) != null && map.isEmpty()){ + routingTable.remove(entry.getKey()); + if(log.isTraceEnabled()) + log.trace("removeEntry()-> removed member[" + addr +"] from routing table"); + } } } - address_mappings.remove(addr); - + if(address_mappings.containsKey(addr)){ + address_mappings.remove(addr); + if(log.isTraceEnabled()){ + log.trace("removeEntry()-> removed member[" + addr +"] from address_mappings, \nrouting table=\n" + dumpRoutingTable()); + }else{ + log.info("removeEntry() -> removed member[" + addr +"] from address_mappings"); + } + }else{ + log.info("removeEntry()-> No member["+ addr +"] exists in address_mappings"); + } + if(addr instanceof UUID) UUID.remove((UUID)addr); } @@ -533,6 +552,9 @@ suspect.writeTo(stream); Util.writeAddress(a, stream); stream.flush(); + + log.info("connectionTorn()-> sent suspect message to member[" + entry.sock.getInetAddress() +"], suspected member[" + a +"]"); + } } catch (Exception ioe) { // intentionally ignored @@ -573,13 +595,24 @@ this.sock=sock; this.input=new DataInputStream(sock.getInputStream()); this.output=new DataOutputStream(sock.getOutputStream()); + log.info("ConnectionHandler()-> new connection opened[" + sock.getInetAddress() +":" + sock.getPort() + "(" + sock.getLocalPort() +")"); } void close() { + + if(sock != null){ + log.info("close()-> Closing socket[" + sock.getInetAddress() +"]"); + } + if(active.compareAndSet(true, false)) { Util.close(input); Util.close(output); Util.close(sock); + if(sock == null || sock.isClosed()){ + log.info("close()-> socket closed"); + }else{ + log.warn("close()-> socket still open!!"); + } for(Address addr: logical_addrs) { removeEntry(null, addr); } @@ -589,9 +622,11 @@ public void run() { if(active.compareAndSet(false, true)) { try { + log.info("run()-> spawning the read loop thread on socket[" + sock.getInetAddress() +":" + sock.getPort() + "(" + sock.getLocalPort() +")"); readLoop(); } finally { + log.info("run()-> exited the read loop thread"); close(); } } @@ -618,6 +653,7 @@ switch(command) { case GossipRouter.CONNECT: + log.info("handleConnect()-> CONNECT call ..."); handleConnect(request, addr, group); break; @@ -657,17 +693,33 @@ for(PingData data: mbrs) data.writeTo(output); output.flush(); + + if(log.isTraceEnabled()) + log.trace("readLoop()-> GOSSIP_GET \nrouting table=\n" + dumpRoutingTable()); + break; case GossipRouter.DISCONNECT: removeEntry(request.getGroup(), request.getAddress()); + + if(log.isTraceEnabled()){ + log.trace("readLoop()-> DISCONNECT - member[" + request.getAddress() +"], \nrouting table=\n" + dumpRoutingTable()); + }else{ + log.info("readLoop()-> DISCONNECT - member[" + request.getAddress() +"]"); + } + break; case GossipRouter.CLOSE: - close(); + + log.info("readLoop()-> CLOSE call."); + + close(); + break; case -1: // EOF + log.info("readLoop()-> -1 EOF"); notifyAbnormalConnectionTear(this, new EOFException("Connection broken")); break; } @@ -675,6 +727,7 @@ catch(SocketTimeoutException ste) { } catch(IOException ioex) { + log.warn("readLoop()-> IOException ->",ioex); notifyAbnormalConnectionTear(this, ioex); break; } @@ -690,7 +743,11 @@ private void handleConnect(GossipData request, Address addr, String group) throws Exception { ConcurrentMap map = null; - try { + try { + + //check if there exists an old connection for this address + checkExistingConnection(addr,group); + String logical_name = request.getLogicalName(); if (logical_name != null && addr instanceof org.jgroups.util.UUID) org.jgroups.util.UUID.add((org.jgroups.util.UUID) addr, logical_name); @@ -706,6 +763,12 @@ } map.put(addr, this); + if(log.isTraceEnabled()){ + log.trace("handleConnect()-> CONNECT - member[" + addr +"] added to the routing table, \nrouting table=\n" + dumpRoutingTable()); + }else{ + log.info("handleConnect()-> CONNECT - member[" + addr +"] added to the routing table"); + } + Set physical_addrs; if (request.getPhysicalAddresses() != null) { physical_addrs = address_mappings.get(addr); @@ -714,6 +777,10 @@ address_mappings.put(addr, physical_addrs); } physical_addrs.addAll(request.getPhysicalAddresses()); + + if(log.isTraceEnabled()) + log.trace("handleConnect()-> CONNECT - member[" + addr +"] added to the address_mappings /w physical addr[" + request.getPhysicalAddresses() +"]"); + } output.writeByte(CONNECT_OK); output.flush(); @@ -725,10 +792,61 @@ } catch (IOException e1) { //ignored } + log.error("handleConnect()-> error: ", e); throw new Exception("Unsuccessful connection setup handshake"); } } + /** + * Checks if a connection of this address already exists + * If so, then calls close on that connection handler and removes that entry + * from the routing table + * @param addr + * @param group + * @return + */ + private boolean checkExistingConnection(Address addr, String group){ + + boolean isOldExists = false; + + try { + if(address_mappings.containsKey(addr)){ + ConcurrentMap map = null; + ConnectionHandler oldConnectionH = null; + if(group != null) { + map=routingTable.get(group); + if(map != null){ + oldConnectionH = map.get(addr); + } + } + else { + for(Map.Entry> entry: routingTable.entrySet()) { + map=entry.getValue(); + if(map != null){ + oldConnectionH = map.get(addr); + } + } + } + + if(oldConnectionH != null){ + isOldExists = true; + log.info("checkExistingConnection()-> Found old connection[" + oldConnectionH +"] for addr[" + addr +"]. Closing old connection ..."); + oldConnectionH.close(); + }else { + log.info("checkExistingConnection()-> No old connection for addr[" + addr +"] exists"); + } + }else{ + log.info("checkExistingConnection()-> No old connection for addr[" + addr +"] exists in the address_mappings"); + } + + }catch(Exception ex){ + log.error("checkExistingConnection()-> Error closing existing connection", ex); + } + + return isOldExists; + + } + public String toString() { StringBuilder sb=new StringBuilder(); sb.append("peer: " + sock.getInetAddress());