### Eclipse Workspace Patch 1.0 #P JGroups Index: src/org/jgroups/blocks/mux/MuxRpcDispatcher.java =================================================================== RCS file: /cvsroot/javagroups/JGroups/src/org/jgroups/blocks/mux/MuxRpcDispatcher.java,v retrieving revision 1.1 diff -u -r1.1 MuxRpcDispatcher.java --- src/org/jgroups/blocks/mux/MuxRpcDispatcher.java 13 Apr 2010 17:57:07 -0000 1.1 +++ src/org/jgroups/blocks/mux/MuxRpcDispatcher.java 6 Jun 2010 16:30:27 -0000 @@ -36,14 +36,24 @@ public class MuxRpcDispatcher extends RpcDispatcher { private final short id; + private final StateTransferFilter stateTransferFilter; public MuxRpcDispatcher(short id) { + this(id, null); + } + + public MuxRpcDispatcher(short id, StateTransferFilter stateTransferFilter) { super(); this.id = id; + this.stateTransferFilter = stateTransferFilter; } - public MuxRpcDispatcher(short id, Channel channel, MessageListener messageListener, MembershipListener membershipListener, Object serverObject) { - this(id); + public MuxRpcDispatcher(short id, Channel channel, MessageListener messageListener, MembershipListener membershipListener, RequestHandler handler) { + this(id, channel, messageListener, membershipListener, handler, null); + } + + public MuxRpcDispatcher(short id, Channel channel, MessageListener messageListener, MembershipListener membershipListener, Object serverObject, StateTransferFilter stateTransferFilter) { + this(id, stateTransferFilter); setMessageListener(messageListener); setMembershipListener(membershipListener); @@ -71,7 +81,8 @@ super.start(); Muxer muxer = this.getMuxer(); if (muxer != null) { - muxer.add(id, this.getProtocolAdapter()); + UpHandler uh = this.stateTransferFilter == null ? this.getProtocolAdapter() : new StateTransferFilterAdapter(this.getProtocolAdapter(), this.stateTransferFilter); + muxer.add(id, uh); } } Index: src/org/jgroups/blocks/mux/MuxMessageDispatcher.java =================================================================== RCS file: /cvsroot/javagroups/JGroups/src/org/jgroups/blocks/mux/MuxMessageDispatcher.java,v retrieving revision 1.1 diff -u -r1.1 MuxMessageDispatcher.java --- src/org/jgroups/blocks/mux/MuxMessageDispatcher.java 13 Apr 2010 17:57:07 -0000 1.1 +++ src/org/jgroups/blocks/mux/MuxMessageDispatcher.java 6 Jun 2010 16:30:27 -0000 @@ -35,14 +35,23 @@ public class MuxMessageDispatcher extends MessageDispatcher { private final short id; + private final StateTransferFilter stateTransferFilter; public MuxMessageDispatcher(short id) { + this(id, null); + } + + public MuxMessageDispatcher(short id, StateTransferFilter stateTransferFilter) { this.id = id; + this.stateTransferFilter = stateTransferFilter; } public MuxMessageDispatcher(short id, Channel channel, MessageListener messageListener, MembershipListener membershipListener, RequestHandler handler) { - this(id); - + this(id, channel, messageListener, membershipListener, handler, null); + } + + public MuxMessageDispatcher(short id, Channel channel, MessageListener messageListener, MembershipListener membershipListener, RequestHandler handler, StateTransferFilter stateTransferFilter) { + this(id, stateTransferFilter); setMessageListener(messageListener); setMembershipListener(membershipListener); setChannel(channel); @@ -68,7 +77,8 @@ super.start(); Muxer muxer = this.getMuxer(); if (muxer != null) { - muxer.add(id, this.getProtocolAdapter()); + UpHandler uh = this.stateTransferFilter == null ? this.getProtocolAdapter() : new StateTransferFilterAdapter(this.getProtocolAdapter(), this.stateTransferFilter); + muxer.add(id, uh); } } Index: src/org/jgroups/blocks/mux/MuxUpHandler.java =================================================================== RCS file: /cvsroot/javagroups/JGroups/src/org/jgroups/blocks/mux/MuxUpHandler.java,v retrieving revision 1.2 diff -u -r1.2 MuxUpHandler.java --- src/org/jgroups/blocks/mux/MuxUpHandler.java 15 Apr 2010 20:05:22 -0000 1.2 +++ src/org/jgroups/blocks/mux/MuxUpHandler.java 6 Jun 2010 16:30:27 -0000 @@ -6,6 +6,9 @@ import org.jgroups.Event; import org.jgroups.Message; import org.jgroups.UpHandler; +import org.jgroups.logging.Log; +import org.jgroups.logging.LogFactory; +import org.jgroups.stack.StateTransferInfo; /** * Allows up handler multiplexing. @@ -16,8 +19,11 @@ */ public class MuxUpHandler implements UpHandler, Muxer { + protected final Log log=LogFactory.getLog(getClass()); private final Map handlers = new ConcurrentHashMap(); private final UpHandler defaultHandler; + private volatile Event lastFlushEvent; + private final Object flushMutex = new Object(); /** * Creates a multiplexing up handler, with no default handler. @@ -40,7 +46,14 @@ */ @Override public void add(short id, UpHandler handler) { - handlers.put(id, handler); + synchronized (flushMutex) + { + if (lastFlushEvent != null) + { + handler.up(lastFlushEvent); + } + handlers.put(id, handler); + } } /** @@ -69,14 +82,76 @@ } break; } - case Event.VIEW_CHANGE: { - for (UpHandler handler: handlers.values()) { - handler.up(evt); + case Event.GET_APPLSTATE: + case Event.GET_STATE_OK: + case Event.STATE_TRANSFER_OUTPUTSTREAM: + case Event.STATE_TRANSFER_INPUTSTREAM: { + StateTransferInfo info=(StateTransferInfo)evt.getArg(); + String state_id=info.state_id; + UpHandler basicHandler = null; + boolean multipleBasic = false; + for (UpHandler uh: handlers.values()) + { + if (uh instanceof StateTransferFilter) + { + if (((StateTransferFilter) uh).accepts(state_id)) + { + return (uh.up(evt)); + } + } + else if (basicHandler == null) + { + basicHandler = uh; + } + else + { + multipleBasic = true; + } } + + if (basicHandler != null) + { + if (multipleBasic) + { + // TODO throw exception?? + log.warn("Received state transfer related event with more " + + "than one basic UpHandler registered. Arbitrarily " + + "picking a handler to handle request"); + } + + return basicHandler.up(evt); + } + // else let default handler handle it below break; + } + case Event.BLOCK: + case Event.UNBLOCK: { + synchronized (flushMutex) + { + this.lastFlushEvent = evt; + passToAllHandlers(evt); + break; + } } + case Event.VIEW_CHANGE: + case Event.SET_LOCAL_ADDRESS: + case Event.SUSPECT: { + passToAllHandlers(evt); + break; + } + default: { + passToAllHandlers(evt); + break; + } } return (defaultHandler != null) ? defaultHandler.up(evt) : null; } + + private void passToAllHandlers(Event evt) + { + for (UpHandler handler: handlers.values()) { + handler.up(evt); + } + } } Index: src/org/jgroups/blocks/mux/StateTransferFilter.java =================================================================== RCS file: src/org/jgroups/blocks/mux/StateTransferFilter.java diff -N src/org/jgroups/blocks/mux/StateTransferFilter.java --- /dev/null 1 Jan 1970 00:00:00 -0000 +++ src/org/jgroups/blocks/mux/StateTransferFilter.java 1 Jan 1970 00:00:00 -0000 @@ -0,0 +1,50 @@ +/* + * JBoss, Home of Professional Open Source. + * Copyright 2010, Red Hat, Inc. and individual contributors + * as indicated by the @author tags. See the copyright.txt file in the + * distribution for a full listing of individual contributors. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ + +package org.jgroups.blocks.mux; + + +/** + * Allows an component that integrates with MuxUpHandler to accept or reject + * partial state transfer events. If multiple StateTransferFilters are associated + * with the same MuxUpHandler, but they are mutually exclusive in terms of which + * state_id's they will accept, then the MuxUpHandler can direct + * the state transfer event to the appropriate handler. + * + * @author Brian Stansberry + * + * @version $Id: MuxRpcDispatcher.java,v 1.1 2010/04/13 17:57:07 ferraro Exp $ + */ +public interface StateTransferFilter { + + /** + * Returns true if a state transfer event associated with a + * given state_id should be passed to this component's UpHandler. + * + * @param state_id the id of the partial state transfer + * + * @return true if a state transfer event associated with a + * given state_id should be passed to this component's UpHandler + */ + boolean accepts(String state_id); + +} Index: src/org/jgroups/blocks/mux/StateTransferFilterAdapter.java =================================================================== RCS file: src/org/jgroups/blocks/mux/StateTransferFilterAdapter.java diff -N src/org/jgroups/blocks/mux/StateTransferFilterAdapter.java --- /dev/null 1 Jan 1970 00:00:00 -0000 +++ src/org/jgroups/blocks/mux/StateTransferFilterAdapter.java 1 Jan 1970 00:00:00 -0000 @@ -0,0 +1,76 @@ +/* + * JBoss, Home of Professional Open Source. + * Copyright 2010, Red Hat, Inc., and individual contributors + * as indicated by the @author tags. See the copyright.txt file in the + * distribution for a full listing of individual contributors. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.jgroups.blocks.mux; + +import org.jgroups.Event; +import org.jgroups.UpHandler; + +/** + * Supports the {@link UpHandler} interface and the {@link StateTransferFilter} + * interface by delegating to two independent objects each of which implements + * one of the interfaces. + * + * @author Brian Stansberry + * @version $Revision$ + */ +public class StateTransferFilterAdapter implements UpHandler, StateTransferFilter +{ + private final UpHandler upHandlerDelegate; + private final StateTransferFilter filterDelegate; + + public StateTransferFilterAdapter(UpHandler upHandlerDelegate, StateTransferFilter filterDelegate) { + + if (upHandlerDelegate == null) { + throw new IllegalArgumentException("upHandlerDelegate is null"); + } + if (filterDelegate == null) { + throw new IllegalArgumentException("filterDelegate is null"); + } + + this.upHandlerDelegate = upHandlerDelegate; + this.filterDelegate = filterDelegate; + } + + + /** + * Invokes on the UpHandler delegate. + * + * @see org.jgroups.UpHandler#up(org.jgroups.Event) + */ + @Override + public Object up(Event evt) + { + return upHandlerDelegate.up(evt); + } + + /** + * Invokes on the StateTransferFilter delegate. + * + * @see org.jgroups.blocks.mux.StateTransferFilter#accepts(java.lang.String) + */ + @Override + public boolean accepts(String stateId) + { + return filterDelegate.accepts(stateId); + } + +}