Index: .gitignore =================================================================== --- .gitignore (revision 1989) +++ .gitignore (working copy) @@ -55,6 +55,7 @@ /deploy/jbossas/modeshape-jbossas-service/target/ /deploy/jbossas/modeshape-jbossas-web-rest-war/target/ +/extensions/modeshape-clustering/target /extensions/modeshape-search-lucene/target /extensions/modeshape-classloader-maven/target /extensions/modeshape-common-jdbc/target Index: extensions/modeshape-clustering/.classpath new file mode 100644 =================================================================== --- /dev/null (revision 1989) +++ extensions/modeshape-clustering/.classpath (working copy) @@ -0,0 +1,10 @@ + + + + + + + + + + Index: extensions/modeshape-clustering/.project new file mode 100644 =================================================================== --- /dev/null (revision 1989) +++ extensions/modeshape-clustering/.project (working copy) @@ -0,0 +1,23 @@ + + + modeshape-clustering + + + + + + org.maven.ide.eclipse.maven2Builder + + + + + org.eclipse.jdt.core.javabuilder + + + + + + org.eclipse.jdt.core.javanature + org.maven.ide.eclipse.maven2Nature + + Index: extensions/modeshape-clustering/pom.xml new file mode 100644 =================================================================== --- /dev/null (revision 1989) +++ extensions/modeshape-clustering/pom.xml (working copy) @@ -0,0 +1,131 @@ + + + 4.0.0 + + org.modeshape + modeshape + 2.1-SNAPSHOT + ../.. + + + modeshape-clustering + jar + ModeShape Clustering Component + ModeShape clustering mechanism. + http://www.modeshape.org + + + + + org.modeshape + modeshape-graph + + + + jgroups + jgroups + 2.9.0.GA + + + + junit + junit + + + org.mockito + mockito-all + + + org.modeshape + modeshape-common + ${project.version} + test-jar + test + + + + org.slf4j + slf4j-api + + + org.slf4j + slf4j-log4j12 + + + log4j + log4j + + + + net.jcip + jcip-annotations + + + + + + org.apache.maven.plugins + maven-surefire-report-plugin + + + + + + + + maven-assembly-plugin + + + ${project.basedir}/../../src/main/assembly/connector-with-dependencies.xml + + ${project.artifactId}-${project.version} + + + + package + + single + + + + + + + maven-jar-plugin + + + ${project.build.outputDirectory}/META-INF/MANIFEST.MF + + + + + org.apache.felix + maven-bundle-plugin + + + bundle-manifest + process-classes + + manifest + + + + + + + Index: extensions/modeshape-clustering/src/main/java/org/modeshape/clustering/ClusteredObservationBus.java new file mode 100644 =================================================================== --- /dev/null (revision 1989) +++ extensions/modeshape-clustering/src/main/java/org/modeshape/clustering/ClusteredObservationBus.java (working copy) @@ -0,0 +1,515 @@ +/* + * ModeShape (http://www.modeshape.org) + * See the COPYRIGHT.txt file distributed with this work for information + * regarding copyright ownership. Some portions may be licensed + * to Red Hat, Inc. under one or more contributor license agreements. + * See the AUTHORS.txt file in the distribution for a full listing of + * individual contributors. + * + * ModeShape is free software. Unless otherwise indicated, all code in ModeShape + * is licensed to you under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * ModeShape is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.modeshape.clustering; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.atomic.AtomicBoolean; +import org.jgroups.Address; +import org.jgroups.Channel; +import org.jgroups.ChannelClosedException; +import org.jgroups.ChannelException; +import org.jgroups.ChannelListener; +import org.jgroups.ChannelNotConnectedException; +import org.jgroups.JChannel; +import org.jgroups.Message; +import org.jgroups.View; +import org.jgroups.conf.ProtocolStackConfigurator; +import org.jgroups.conf.XmlConfigurator; +import org.jgroups.util.Util; +import org.modeshape.common.SystemFailureException; +import org.modeshape.common.util.CheckArg; +import org.modeshape.common.util.Logger; +import org.modeshape.graph.observe.ChangeObservers; +import org.modeshape.graph.observe.Changes; +import org.modeshape.graph.observe.ObservationBus; +import org.modeshape.graph.observe.Observer; + +/** + * An implementation of a cluster-aware {@link ObservationBus}. + */ +public class ClusteredObservationBus implements ObservationBus { + + protected static final Logger LOGGER = Logger.getLogger(ClusteredObservationBus.class); + + /** + * The list of {@link Observer} instances that should receive events from the bus. These Observer objects are all local. + */ + protected final ChangeObservers observers = new ChangeObservers(); + + /** + * The listener for channel changes. + */ + private final Listener listener = new Listener(); + + /** + * The component that will receive the JGroups messages and broadcast them to this bus' observers. + */ + private final Receiver receiver = new Receiver(); + + /** + * Flag that dictates whether this bus has connected to the cluster. + */ + protected final AtomicBoolean isOpen = new AtomicBoolean(false); + + /** + * Flag that dictates whether there are multiple participants in the cluster; if not, then the changes are propogated only to + * the local observers. + */ + protected final AtomicBoolean multipleAddressesInCluster = new AtomicBoolean(false); + + /** + * The configuration for JGroups + */ + private String configuration; + + /** + * The name of the JGroups cluster. + */ + private String clusterName; + + /** + * The JGroups channel to which all {@link #notify(Changes) change notifications} will be sent and from which all changes will + * be received and sent to the observers. + *

+ * It is important that the order of the {@link Changes} instances are maintained across the cluster, and JGroups will do this + * for us as long as we push all local changes into the channel and receive all local/remote changes from the channel. + *

+ */ + private JChannel channel; + + /** + * Get the configuration for JGroups. This configuration may be a string that refers to a resource on the classpath, the path + * of the local configuration, the URL to the configuration file, or the configuration specified using the newer-style XML + * form or older-style string form. + * + * @return the location of the JGroups configuration, or null if no configuration has been defined + */ + public String getConfiguration() { + return configuration; + } + + /** + * Set the JGroups configuration, which may be a string that refers to a resource on the classpath, the path of the local + * configuration, the URL to the configuration file, or the configuration specified using the newer-style XML form or + * older-style string form. + * + * @param configuration the relative path to a classpath resource, path to a local file, URL to the configuration file, or the + * configuration specified using the newer-style XML form or older-style string form. + * @throws IllegalStateException if this method is called after this bus has been {@link #start() started} but before it has + * been {@link #shutdown() shutdown} + */ + public void setConfiguration( String configuration ) { + if (channel != null) { + String name = this.clusterName; + throw new IllegalStateException(ClusteringI18n.clusteringChannelIsRunningAndCannotBeChangedUnlessShutdown.text(name)); + } + this.configuration = configuration; + } + + /** + * Get the name of the JGroups cluster. + * + * @return the cluster name, or null if the cluster name was not yet defined + * @see JChannel#connect(String) + */ + public String getClusterName() { + return clusterName; + } + + /** + * Set the name of the JGroups cluster. This must be called with a non-null cluster name before {@link #start()} is called. + * + * @param clusterName Sets clusterName to the specified value. + * @throws IllegalStateException if this method is called after this bus has been {@link #start() started} but before it has + * been {@link #shutdown() shutdown} + * @see JChannel#connect(String) + */ + public void setClusterName( String clusterName ) { + CheckArg.isNotNull(clusterName, "clusterName"); + if (channel != null) { + String name = this.clusterName; + throw new IllegalStateException(ClusteringI18n.clusteringChannelIsRunningAndCannotBeChangedUnlessShutdown.text(name)); + } + this.clusterName = clusterName; + } + + /** + * {@inheritDoc} + * + * @see org.modeshape.graph.observe.ObservationBus#start() + */ + @Override + public synchronized void start() { + if (clusterName == null) { + throw new IllegalStateException(ClusteringI18n.clusterNameRequired.text()); + } + if (channel != null) { + // Disconnect from any previous channel ... + channel.removeChannelListener(listener); + channel.setReceiver(null); + } + try { + // Create the new channel by calling the delegate method ... + channel = newChannel(configuration); + assert channel != null; + // Add a listener through which we'll know what's going on within the cluster ... + channel.addChannelListener(listener); + + // Now connect to the cluster ... + channel.connect(clusterName); + + // Set the receiver through which we'll receive all of the changes ... + channel.setReceiver(receiver); + } catch (ChannelException e) { + throw new IllegalStateException(ClusteringI18n.errorWhileStartingJGroups.text(configuration), e); + } + } + + /** + * A method that is used to instantiate the {@link JChannel} object with the supplied configuration. Subclasses can override + * this method to specialize this behavior. + * + * @param configuration the configuration; may be null if the default configuration should be used + * @return the JChannel instance; never null + * @throws ChannelException if there is a problem creating the new channel object + */ + protected JChannel newChannel( String configuration ) throws ChannelException { + if (configuration == null) { + return new JChannel(); + } + // Try the XML configuration first ... + ProtocolStackConfigurator configurator = null; + InputStream stream = new ByteArrayInputStream(configuration.getBytes()); + try { + configurator = XmlConfigurator.getInstance(stream); + } catch (IOException e) { + // ignore, since the configuration may be of another form ... + } finally { + try { + stream.close(); + } catch (IOException e) { + // ignore this + } + } + if (configurator != null) { + return new JChannel(configurator); + } + // Otherwise, just try the regular configuration ... + return new JChannel(configuration); + } + + /** + * {@inheritDoc} + * + * @see org.modeshape.graph.observe.Observer#notify(org.modeshape.graph.observe.Changes) + */ + @Override + public void notify( Changes changes ) { + if (changes == null) return; // do nothing + if (!isOpen.get()) { + // The channel is not open ... + return; + } + if (!multipleAddressesInCluster.get()) { + // We are in clustered mode, but there is only one participant in the cluster (us). + // So short-circuit the cluster and just notify the local observers ... + if (!observers.isEmpty()) { + observers.broadcast(changes); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Received on cluster '{0}' {1} changes in source '{2}' made by {3} from process '{4}' at {5}", + getClusterName(), + changes.getChangeRequests().size(), + changes.getSourceName(), + changes.getUserName(), + changes.getProcessId(), + changes.getTimestamp()); + } + } + return; + } + + // There are multiple participants in the cluster, so send all changes out to JGroups, + // letting JGroups do the ordering of messages... + try { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Sending to cluster '{0}' {1} changes in source '{2}' made by {3} from process '{4}' at {5}", + clusterName, + changes.getChangeRequests().size(), + changes.getSourceName(), + changes.getUserName(), + changes.getProcessId(), + changes.getTimestamp()); + } + byte[] data = serialize(changes); + Message message = new Message(null, null, data); + channel.send(message); + } catch (ChannelClosedException e) { + LOGGER.warn(ClusteringI18n.unableToNotifyChangesBecauseClusterChannelHasClosed, + clusterName, + changes.getChangeRequests().size(), + changes.getSourceName(), + changes.getUserName(), + changes.getProcessId(), + changes.getTimestamp()); + } catch (ChannelNotConnectedException e) { + LOGGER.warn(ClusteringI18n.unableToNotifyChangesBecauseClusterChannelIsNotConnected, + clusterName, + changes.getChangeRequests().size(), + changes.getSourceName(), + changes.getUserName(), + changes.getProcessId(), + changes.getTimestamp()); + } catch (Exception e) { + // Something went wrong here (this should not happen) ... + String msg = ClusteringI18n.errorSerializingChanges.text(clusterName, + changes.getChangeRequests().size(), + changes.getSourceName(), + changes.getUserName(), + changes.getProcessId(), + changes.getTimestamp(), + changes); + throw new SystemFailureException(msg, e); + } + } + + /** + * {@inheritDoc} + * + * @see org.modeshape.graph.observe.Observable#register(org.modeshape.graph.observe.Observer) + */ + @Override + public boolean register( Observer observer ) { + return observers.register(observer); + } + + /** + * {@inheritDoc} + * + * @see org.modeshape.graph.observe.Observable#unregister(org.modeshape.graph.observe.Observer) + */ + @Override + public boolean unregister( Observer observer ) { + return observers.unregister(observer); + } + + /** + * {@inheritDoc} + * + * @see org.modeshape.graph.observe.ObservationBus#hasObservers() + */ + @Override + public boolean hasObservers() { + return !observers.isEmpty(); + } + + /** + * Return whether this bus has been {@link #start() started} and not yet {@link #shutdown() shut down}. + * + * @return true if {@link #start()} has been called but {@link #shutdown()} has not, or false otherwise + */ + public boolean isStarted() { + return channel != null; + } + + /** + * {@inheritDoc} + * + * @see org.modeshape.graph.observe.ObservationBus#shutdown() + */ + @Override + public synchronized void shutdown() { + if (channel != null) { + // Mark this as not accepting any more ... + isOpen.set(false); + try { + // Disconnect from the channel and close it ... + channel.removeChannelListener(listener); + channel.setReceiver(null); + channel.close(); + } finally { + channel = null; + // Now that we're not receiving any more messages, shut down the list of observers ... + observers.shutdown(); + } + } + } + + protected static byte[] serialize( Changes changes ) throws Exception { + return Util.objectToByteBuffer(changes); + } + + protected static Changes deserialize( byte[] data ) throws Exception { + return (Changes)Util.objectFromByteBuffer(data); + } + + protected class Receiver implements org.jgroups.Receiver { + private byte[] state; + + /** + * {@inheritDoc} + * + * @see org.jgroups.MembershipListener#block() + */ + @Override + public void block() { + isOpen.set(false); + } + + /** + * {@inheritDoc} + * + * @see org.jgroups.MessageListener#receive(org.jgroups.Message) + */ + @Override + public void receive( Message message ) { + if (!observers.isEmpty()) { + // We have at least one observer ... + try { + // Deserialize the changes ... + Changes changes = deserialize(message.getBuffer()); + // and broadcast to all of our observers ... + observers.broadcast(changes); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Received on cluster '{0}' {1} changes in source '{2}' made by {3} from process '{4}' at {5}", + getClusterName(), + changes.getChangeRequests().size(), + changes.getSourceName(), + changes.getUserName(), + changes.getProcessId(), + changes.getTimestamp()); + } + } catch (Exception e) { + // Something went wrong here (this should not happen) ... + String msg = ClusteringI18n.errorDeserializingChanges.text(getClusterName()); + throw new SystemFailureException(msg, e); + } + } + } + + /** + * {@inheritDoc} + * + * @see org.jgroups.MessageListener#getState() + */ + @Override + public byte[] getState() { + return state; + } + + /** + * {@inheritDoc} + * + * @see org.jgroups.MessageListener#setState(byte[]) + */ + @Override + public void setState( byte[] state ) { + this.state = state; + } + + /** + * {@inheritDoc} + * + * @see org.jgroups.MembershipListener#suspect(org.jgroups.Address) + */ + @Override + public void suspect( Address suspectedMbr ) { + LOGGER.error(ClusteringI18n.memberOfClusterIsSuspect, getClusterName(), suspectedMbr); + } + + /** + * {@inheritDoc} + * + * @see org.jgroups.MembershipListener#viewAccepted(org.jgroups.View) + */ + @Override + public void viewAccepted( View newView ) { + LOGGER.trace("Members of '{0}' cluster have changed: {1}", getClusterName(), newView); + if (newView.getMembers().size() > 1) { + if (multipleAddressesInCluster.compareAndSet(false, true)) { + LOGGER.debug("There are now multiple members of cluster '{0}'; changes will be propagated throughout the cluster", + getClusterName()); + } + } else { + if (multipleAddressesInCluster.compareAndSet(true, false)) { + LOGGER.debug("There is only one member of cluster '{0}'; changes will be propagated locally only", + getClusterName()); + } + } + } + } + + protected class Listener implements ChannelListener { + /** + * {@inheritDoc} + * + * @see org.jgroups.ChannelListener#channelClosed(org.jgroups.Channel) + */ + @Override + public void channelClosed( Channel channel ) { + isOpen.set(false); + } + + /** + * {@inheritDoc} + * + * @see org.jgroups.ChannelListener#channelConnected(org.jgroups.Channel) + */ + @Override + public void channelConnected( Channel channel ) { + isOpen.set(true); + } + + /** + * {@inheritDoc} + * + * @see org.jgroups.ChannelListener#channelDisconnected(org.jgroups.Channel) + */ + @Override + public void channelDisconnected( Channel channel ) { + isOpen.set(false); + } + + /** + * {@inheritDoc} + * + * @see org.jgroups.ChannelListener#channelReconnected(org.jgroups.Address) + */ + @Override + public void channelReconnected( Address addr ) { + isOpen.set(true); + } + + /** + * {@inheritDoc} + * + * @see org.jgroups.ChannelListener#channelShunned() + */ + @Override + public void channelShunned() { + isOpen.set(false); + } + } +} Index: extensions/modeshape-clustering/src/main/java/org/modeshape/clustering/ClusteringI18n.java new file mode 100644 =================================================================== --- /dev/null (revision 1989) +++ extensions/modeshape-clustering/src/main/java/org/modeshape/clustering/ClusteringI18n.java (working copy) @@ -0,0 +1,49 @@ +/* + * ModeShape (http://www.modeshape.org) + * See the COPYRIGHT.txt file distributed with this work for information + * regarding copyright ownership. Some portions may be licensed + * to Red Hat, Inc. under one or more contributor license agreements. +* See the AUTHORS.txt file in the distribution for a full listing of +* individual contributors. + * + * ModeShape is free software. Unless otherwise indicated, all code in ModeShape + * is licensed to you under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * ModeShape is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.modeshape.clustering; + +import org.modeshape.common.i18n.I18n; + +/** + * The internationalized string constants for the org.modeshape.connector.filesystem* packages. + */ +public final class ClusteringI18n { + + public static I18n errorWhileStartingJGroups; + public static I18n clusterNameRequired; + public static I18n unableToNotifyChangesBecauseClusterChannelHasClosed; + public static I18n unableToNotifyChangesBecauseClusterChannelIsNotConnected; + public static I18n errorSerializingChanges; + public static I18n errorDeserializingChanges; + public static I18n clusteringChannelIsRunningAndCannotBeChangedUnlessShutdown; + public static I18n memberOfClusterIsSuspect; + + static { + try { + I18n.initialize(ClusteringI18n.class); + } catch (final Exception err) { + System.err.println(err); + } + } +} Index: extensions/modeshape-clustering/src/main/resources/org/modeshape/clustering/ClusteringI18n.properties new file mode 100644 =================================================================== --- /dev/null (revision 1989) +++ extensions/modeshape-clustering/src/main/resources/org/modeshape/clustering/ClusteringI18n.properties (working copy) @@ -0,0 +1,31 @@ +# +# ModeShape (http://www.modeshape.org) +# See the COPYRIGHT.txt file distributed with this work for information +# regarding copyright ownership. Some portions may be licensed +# to Red Hat, Inc. under one or more contributor license agreements. +# See the AUTHORS.txt file in the distribution for a full listing of +# individual contributors. +# +# ModeShape is free software. Unless otherwise indicated, all code in ModeShape +# is licensed to you under the terms of the GNU Lesser General Public License as +# published by the Free Software Foundation; either version 2.1 of +# the License, or (at your option) any later version. +# +# ModeShape is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this software; if not, write to the Free +# Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA +# 02110-1301 USA, or see the FSF site: http://www.fsf.org. +# +errorWhileStartingJGroups = Error while starting JGroups with configuration: {0} +clusterNameRequired = A cluster name is required for JGroups-based clustering +unableToNotifyChangesBecauseClusterChannelHasClosed = Cluster channel '{0}' has closed, so unable to send {1} changes in source '{2}' made by {3} from process '{4}' at {5} +unableToNotifyChangesBecauseClusterChannelIsNotConnected = Cluster channel '{0}' is not connected, so unable to send {1} changes in source '{2}' made by {3} from process '{4}' at {5} +errorSerializingChanges = Error in channel '{0}' while serializing {1} changes in source '{2}' made by {3} from process '{4}' at {5}: {6} +errorDeserializingChanges = Error deserializing changes obtained from channel '{0}' +clusteringChannelIsRunningAndCannotBeChangedUnlessShutdown = The cluster channel '{0}' is running and cannot be changed unless shut down +memberOfClusterIsSuspect = Member of '{0}' cluster is suspect at '{1}' Index: extensions/modeshape-clustering/src/test/java/org/modeshape/clustering/ClusteredObservationBusTest.java new file mode 100644 =================================================================== --- /dev/null (revision 1989) +++ extensions/modeshape-clustering/src/test/java/org/modeshape/clustering/ClusteredObservationBusTest.java (working copy) @@ -0,0 +1,352 @@ +/* + * ModeShape (http://www.modeshape.org) + * See the COPYRIGHT.txt file distributed with this work for information + * regarding copyright ownership. Some portions may be licensed + * to Red Hat, Inc. under one or more contributor license agreements. + * See the AUTHORS.txt file in the distribution for a full listing of + * individual contributors. + * + * ModeShape is free software. Unless otherwise indicated, all code in ModeShape + * is licensed to you under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * ModeShape is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.modeshape.clustering; + +import static org.hamcrest.core.Is.is; +import static org.hamcrest.core.IsNot.not; +import static org.hamcrest.core.IsNull.nullValue; +import static org.hamcrest.core.IsSame.sameInstance; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.jgroups.ChannelListener; +import org.jgroups.JChannel; +import org.jgroups.Message; +import org.jgroups.Receiver; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; +import org.modeshape.graph.ExecutionContext; +import org.modeshape.graph.Location; +import org.modeshape.graph.observe.Changes; +import org.modeshape.graph.observe.Observer; +import org.modeshape.graph.property.DateTime; +import org.modeshape.graph.property.Name; +import org.modeshape.graph.property.Path; +import org.modeshape.graph.request.ChangeRequest; +import org.modeshape.graph.request.CreateNodeRequest; + +public class ClusteredObservationBusTest { + + private ClusteredObservationBus bus; + private ExecutionContext context = new ExecutionContext(); + protected JChannel mockChannel; + + @Before + public void beforeEach() { + mockChannel = Mockito.mock(JChannel.class); + // Create a clustered bus that does NOT use a real JChannel ... + bus = newBus(mockChannel); + } + + @Test + public void shouldSerializeAndDeserializeChanges() throws Exception { + Changes changes = changes(); + byte[] data = ClusteredObservationBus.serialize(changes); + Changes deserialized = ClusteredObservationBus.deserialize(data); + // Should be equal ... + assertThat(changes, is(deserialized)); + // but not == ... + assertThat(changes, is(not(sameInstance(deserialized)))); + } + + @Test( expected = IllegalArgumentException.class ) + public void shouldNotAllowSettingClusterNameToNull() { + bus.setClusterName(null); + } + + @Test + public void shouldAllowSettingClusterNameToBlankString() { + setAndGetClusterName(""); + } + + @Test + public void shouldAllowSettingClusterNameToStringWithAlphaNumericCharacters() { + setAndGetClusterName("abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"); + } + + @Test + public void shouldAllowSettingClusterNameToStringWithAlphaNumericAndPunctuationCharacters() { + setAndGetClusterName("valid.cluster!name@#$%^&*()<>?,./:\"'[]\\{}|_+-="); + } + + @Test + public void shouldAllowSettingClusterNameToStringWithAlphaNumericAndWhitespaceCharacters() { + setAndGetClusterName("valid cluster name"); + } + + @Test + public void shouldAllowSettingConfigurationToNull() { + setAndGetConfiguration(null); + } + + @Test + public void shouldAllowSettingConfigurationToBlankString() { + setAndGetConfiguration(null); + } + + @Test( expected = IllegalStateException.class ) + public void shouldNotAllowStartingWithoutSettingClusterName() { + assertThat(bus.getClusterName(), is(nullValue())); + assertThat(bus.isStarted(), is(false)); + bus.start(); + assertThat(bus.isStarted(), is(true)); + bus.shutdown(); + assertThat(bus.isStarted(), is(false)); + } + + @Test + public void shouldAllowStartingWithoutSettingConfiguration() { + bus.setClusterName("clusterName"); + assertThat(bus.isStarted(), is(false)); + bus.start(); + assertThat(bus.isStarted(), is(true)); + bus.shutdown(); + assertThat(bus.isStarted(), is(false)); + } + + @Test + public void shouldAllowShuttingDownWithoutHavingStarted() { + assertThat(bus.isStarted(), is(false)); + bus.shutdown(); + assertThat(bus.isStarted(), is(false)); + } + + @Test( expected = IllegalStateException.class ) + public void shouldNotAllowSettingConfigurationAfterBusHasBeenStartedButBeforeBusHasBeenShutdown() { + bus.setClusterName("clusterName"); + bus.setConfiguration("old configuration"); + assertThat(bus.isStarted(), is(false)); + bus.start(); + assertThat(bus.isStarted(), is(true)); + bus.setConfiguration("new configuration"); // !! should fail !! + } + + @Test + public void shouldAllowSettingConfigurationAfterBusHasBeenStartedAndShutdown() { + bus.setClusterName("clusterName"); + bus.setConfiguration("old configuration"); + assertThat(bus.isStarted(), is(false)); + bus.start(); + assertThat(bus.isStarted(), is(true)); + bus.shutdown(); + assertThat(bus.isStarted(), is(false)); + bus.setConfiguration("new configuration"); + } + + @Test + public void shouldAllowNotifyToBeCalledBeforeStartButShouldDoNothing() throws Exception { + bus.setClusterName("clusterName"); + bus.notify(changes()); + ArgumentCaptor argument = ArgumentCaptor.forClass(Message.class); + verify(mockChannel, never()).send(argument.capture()); + } + + @Test + public void shouldAllowNotifyToBeCalledAfterStartWithMultipleMembersAndShouldSendMessageToJGroups() throws Exception { + bus.setClusterName("clusterName"); + bus.start(); + verify(mockChannel, times(1)).addChannelListener(ArgumentCaptor.forClass(ChannelListener.class).capture()); + verify(mockChannel, times(1)).connect("clusterName"); + verify(mockChannel, times(1)).setReceiver(ArgumentCaptor.forClass(Receiver.class).capture()); + + // When connected, JGroups will call back to the listener and the bus will record it as open. + // But we have to do this manually because we've stubbed out JGroups ... + bus.isOpen.set(true); + + // JGroups also normally calls Receiver.viewAccepted(...), and the bus' receiver sets whether there are + // multiple members in the cluster. We need to set this manually because we've stubbed out JGroups ... + bus.multipleAddressesInCluster.set(true); + + // Now call the notify method ... + bus.notify(changes()); + verify(mockChannel, times(1)).send(ArgumentCaptor.forClass(Message.class).capture()); + verifyNoMoreInteractions(mockChannel); + } + + @Test + public void shouldAllowNotifyToBeCalledAfterStartWithOneMemberAndShouldSendMessageToLocalObserversBuNotJGroups() + throws Exception { + bus.setClusterName("clusterName"); + bus.start(); + verify(mockChannel, times(1)).addChannelListener(ArgumentCaptor.forClass(ChannelListener.class).capture()); + verify(mockChannel, times(1)).connect("clusterName"); + verify(mockChannel, times(1)).setReceiver(ArgumentCaptor.forClass(Receiver.class).capture()); + + // When connected, JGroups will call back to the listener and the bus will record it as open. + // But we have to do this manually because we've stubbed out JGroups ... + bus.isOpen.set(true); + + // JGroups also normally calls Receiver.viewAccepted(...), and the bus' receiver sets whether there are + // multiple members in the cluster. We need to set this manually because we've stubbed out JGroups ... + bus.multipleAddressesInCluster.set(false); + + // Add a local listener ... + Observer observer = mock(Observer.class); + bus.register(observer); + + // Now call the notify method ... + Changes changes = changes(); + bus.notify(changes); + + verify(mockChannel, never()).send(ArgumentCaptor.forClass(Message.class).capture()); + verifyNoMoreInteractions(mockChannel); + verify(observer, times(1)).notify(changes); + } + + @Test + public void shouldProperlySendChangesThroughRealJGroupsCluster() throws Exception { + + // Create three observers ... + CountDownLatch latch = new CountDownLatch(3); + CustomObserver observer1 = new CustomObserver(latch); + CustomObserver observer2 = new CustomObserver(latch); + CustomObserver observer3 = new CustomObserver(latch); + + // Create three busses using a real JGroups cluster ... + String name = "MyCluster"; + ClusteredObservationBus bus1 = startNewBus(name, observer1); + try { + ClusteredObservationBus bus2 = startNewBus(name, observer2); + try { + ClusteredObservationBus bus3 = startNewBus(name, observer3); + try { + + // Send changes to one of the busses ... + Changes changes = changes(); + bus1.notify(changes); + + // Wait for the observers to be notified ... + observer1.await(); + observer2.await(); + observer3.await(); + + // Now verify that all of the observers received the notification ... + assertThat(observer1.getObservedChanges().size(), is(1)); + assertThat(observer2.getObservedChanges().size(), is(1)); + assertThat(observer3.getObservedChanges().size(), is(1)); + assertThat(observer1.getObservedChanges().get(0), is(changes)); + assertThat(observer2.getObservedChanges().get(0), is(changes)); + assertThat(observer3.getObservedChanges().get(0), is(changes)); + + // Stop the busses ... + } finally { + bus3.shutdown(); + } + } finally { + bus2.shutdown(); + } + } finally { + bus1.shutdown(); + } + } + + // ---------------------------------------------------------------------------------------------------------------- + // Utility methods + // ---------------------------------------------------------------------------------------------------------------- + + protected void setAndGetClusterName( String name ) { + bus.setClusterName(name); + String nameAfter = bus.getClusterName(); + assertThat(nameAfter, is(name)); + } + + protected void setAndGetConfiguration( String config ) { + bus.setConfiguration(config); + String configAfter = bus.getConfiguration(); + assertThat(configAfter, is(config)); + } + + protected ClusteredObservationBus startNewBus( String name, + Observer localObserver ) { + ClusteredObservationBus bus = newBus(null); + bus.setClusterName(name); + bus.start(); + bus.register(localObserver); + return bus; + } + + protected ClusteredObservationBus newBus( final JChannel channel ) { + return channel == null ? new ClusteredObservationBus() : new ClusteredObservationBus() { + /** + * {@inheritDoc} + * + * @see org.modeshape.clustering.ClusteredObservationBus#newChannel(java.lang.String) + */ + @Override + protected JChannel newChannel( String configuration ) { + return channel; + } + }; + + } + + protected Changes changes() { + DateTime now = context.getValueFactories().getDateFactory().create(); + Path path = context.getValueFactories().getPathFactory().create("/a"); + Name childName = context.getValueFactories().getNameFactory().create("b"); + Path childPath = context.getValueFactories().getPathFactory().create(path, childName); + CreateNodeRequest request = new CreateNodeRequest(Location.create(path), "workspaceName", childName); + request.setActualLocationOfNode(Location.create(childPath)); + List requests = Collections.singletonList((ChangeRequest)request); + return new Changes("processId", "contextId", "username", "sourceName", now, requests, null); + } + + protected static class CustomObserver implements Observer { + private final List receivedChanges = new ArrayList(); + private final CountDownLatch latch; + + protected CustomObserver( CountDownLatch latch ) { + this.latch = latch; + } + + /** + * {@inheritDoc} + * + * @see org.modeshape.graph.observe.Observer#notify(org.modeshape.graph.observe.Changes) + */ + @Override + public void notify( Changes changes ) { + receivedChanges.add(changes); + latch.countDown(); + } + + public void await() throws InterruptedException { + latch.await(250, TimeUnit.MILLISECONDS); + } + + public List getObservedChanges() { + return receivedChanges; + } + } +} Index: extensions/modeshape-clustering/src/test/java/org/modeshape/clustering/ClusteringI18nTest.java new file mode 100644 =================================================================== --- /dev/null (revision 1989) +++ extensions/modeshape-clustering/src/test/java/org/modeshape/clustering/ClusteringI18nTest.java (working copy) @@ -0,0 +1,33 @@ +/* + * ModeShape (http://www.modeshape.org) + * See the COPYRIGHT.txt file distributed with this work for information + * regarding copyright ownership. Some portions may be licensed + * to Red Hat, Inc. under one or more contributor license agreements. + * See the AUTHORS.txt file in the distribution for a full listing of + * individual contributors. + * + * ModeShape is free software. Unless otherwise indicated, all code in ModeShape + * is licensed to you under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * ModeShape is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.modeshape.clustering; + +import org.modeshape.common.AbstractI18nTest; + +public class ClusteringI18nTest extends AbstractI18nTest { + + public ClusteringI18nTest() { + super(ClusteringI18n.class); + } +} Index: extensions/modeshape-clustering/src/test/resources/log4j.properties new file mode 100644 =================================================================== --- /dev/null (revision 1989) +++ extensions/modeshape-clustering/src/test/resources/log4j.properties (working copy) @@ -0,0 +1,15 @@ +# Direct log messages to stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %m%n + +# Root logger option +log4j.rootLogger=INFO, stdout + +# Set up the default logging to be INFO level, then override specific units +log4j.logger.org.modeshape=DEBUG + +# JGroups +log4j.logger.org.jgroups=ERROR + Index: modeshape-common/src/main/java/org/modeshape/common/component/ComponentLibrary.java =================================================================== --- modeshape-common/src/main/java/org/modeshape/common/component/ComponentLibrary.java (revision 1989) +++ modeshape-common/src/main/java/org/modeshape/common/component/ComponentLibrary.java (working copy) @@ -220,6 +220,27 @@ public class ComponentLibrary } } + public boolean removeAll() { + try { + this.lock.lock(); + this.configs.clear(); + this.instances.clear(); + return true; + } finally { + this.lock.unlock(); + } + } + + public boolean removeAllAndAdd( ConfigType config ) { + try { + this.lock.lock(); + removeAll(); + return add(config); + } finally { + this.lock.unlock(); + } + } + /** * Refresh the instances by attempting to re-instantiate each registered configuration. * @@ -282,6 +303,7 @@ public class ComponentLibrary reflection.invokeSetterMethodOnTarget(entry.getKey(), newInstance, entry.getValue()); } } + configure(newInstance, config); } catch (Throwable e) { throw new SystemFailureException(e); } @@ -291,6 +313,11 @@ public class ComponentLibrary return newInstance; } + protected void configure( ComponentType newInstance, + ConfigType configuration ) throws Exception { + // do nothing + } + /** * Method that instantiates the supplied class. This method can be overridden by subclasses that may need to wrap or adapt the * instance to be a ComponentType. Index: modeshape-graph/src/main/java/org/modeshape/graph/ExecutionContext.java =================================================================== --- modeshape-graph/src/main/java/org/modeshape/graph/ExecutionContext.java (revision 1989) +++ modeshape-graph/src/main/java/org/modeshape/graph/ExecutionContext.java (working copy) @@ -71,6 +71,7 @@ public class ExecutionContext implements ClassLoaderFactory, Cloneable { private final SecurityContext securityContext; /** The unique ID string, which is always generated so that it can be final and not volatile. */ private final String id = UUID.randomUUID().toString(); + private final String processId; private final Map data; /** @@ -80,7 +81,7 @@ public class ExecutionContext implements ClassLoaderFactory, Cloneable { */ @SuppressWarnings( "synthetic-access" ) public ExecutionContext() { - this(new NullSecurityContext(), null, null, null, null, null, null); + this(new NullSecurityContext(), null, null, null, null, null, null, null); initializeDefaultNamespaces(this.getNamespaceRegistry()); assert securityContext != null; @@ -101,6 +102,7 @@ public class ExecutionContext implements ClassLoaderFactory, Cloneable { this.classLoaderFactory = original.getClassLoaderFactory(); this.mimeTypeDetector = original.getMimeTypeDetector(); this.data = original.getData(); + this.processId = original.getProcessId(); } /** @@ -121,6 +123,7 @@ public class ExecutionContext implements ClassLoaderFactory, Cloneable { this.classLoaderFactory = original.getClassLoaderFactory(); this.mimeTypeDetector = original.getMimeTypeDetector(); this.data = original.getData(); + this.processId = original.getProcessId(); } /** @@ -138,6 +141,7 @@ public class ExecutionContext implements ClassLoaderFactory, Cloneable { * @param classLoaderFactory the {@link ClassLoaderFactory} implementation, or null if a {@link StandardClassLoaderFactory} * instance should be used * @param data the custom data for this context, or null if there is no such data + * @param processId the unique identifier of the process in which this context exists, or null if it should be generated */ protected ExecutionContext( SecurityContext securityContext, NamespaceRegistry namespaceRegistry, @@ -145,7 +149,8 @@ public class ExecutionContext implements ClassLoaderFactory, Cloneable { PropertyFactory propertyFactory, MimeTypeDetector mimeTypeDetector, ClassLoaderFactory classLoaderFactory, - Map data ) { + Map data, + String processId ) { assert securityContext != null; this.securityContext = securityContext; this.namespaceRegistry = namespaceRegistry != null ? namespaceRegistry : new ThreadSafeNamespaceRegistry( @@ -155,6 +160,7 @@ public class ExecutionContext implements ClassLoaderFactory, Cloneable { this.classLoaderFactory = classLoaderFactory == null ? new StandardClassLoaderFactory() : classLoaderFactory; this.mimeTypeDetector = mimeTypeDetector != null ? mimeTypeDetector : createDefaultMimeTypeDetector(); this.data = data != null ? data : Collections.emptyMap(); + this.processId = processId != null ? processId : UUID.randomUUID().toString(); } private MimeTypeDetector createDefaultMimeTypeDetector() { @@ -253,7 +259,7 @@ public class ExecutionContext implements ClassLoaderFactory, Cloneable { } /** - * Get the unique identifier for this context. + * Get the unique identifier for this context. Each context will always have a unique identifier. * * @return the unique identifier string; never null and never empty */ @@ -262,6 +268,16 @@ public class ExecutionContext implements ClassLoaderFactory, Cloneable { } /** + * Get the identifier for the process in which this context exists. Multiple contexts running in the same "process" will all + * have the same identifier. + * + * @return the identifier for the process; never null and never empty + */ + public String getProcessId() { + return processId; + } + + /** * Get the immutable map of custom data that is affiliated with this context. * * @return the custom data; never null but possibly empty @@ -283,7 +299,7 @@ public class ExecutionContext implements ClassLoaderFactory, Cloneable { // Don't supply the value factories or property factories, since they'll have to be recreated // to reference the supplied namespace registry ... return new ExecutionContext(this.getSecurityContext(), namespaceRegistry, null, null, this.getMimeTypeDetector(), - this.getClassLoaderFactory(), this.getData()); + this.getClassLoaderFactory(), this.getData(), getProcessId()); } /** @@ -299,7 +315,7 @@ public class ExecutionContext implements ClassLoaderFactory, Cloneable { // Don't supply the value factories or property factories, since they'll have to be recreated // to reference the supplied namespace registry ... return new ExecutionContext(this.getSecurityContext(), getNamespaceRegistry(), getValueFactories(), getPropertyFactory(), - mimeTypeDetector, getClassLoaderFactory(), this.getData()); + mimeTypeDetector, getClassLoaderFactory(), this.getData(), getProcessId()); } /** @@ -314,7 +330,7 @@ public class ExecutionContext implements ClassLoaderFactory, Cloneable { // Don't supply the value factories or property factories, since they'll have to be recreated // to reference the supplied namespace registry ... return new ExecutionContext(this.getSecurityContext(), getNamespaceRegistry(), getValueFactories(), getPropertyFactory(), - getMimeTypeDetector(), classLoaderFactory, this.getData()); + getMimeTypeDetector(), classLoaderFactory, this.getData(), getProcessId()); } /** @@ -348,7 +364,7 @@ public class ExecutionContext implements ClassLoaderFactory, Cloneable { newData = Collections.unmodifiableMap(new HashMap(data)); } return new ExecutionContext(this.getSecurityContext(), getNamespaceRegistry(), getValueFactories(), getPropertyFactory(), - getMimeTypeDetector(), getClassLoaderFactory(), newData); + getMimeTypeDetector(), getClassLoaderFactory(), newData, getProcessId()); } /** @@ -380,7 +396,20 @@ public class ExecutionContext implements ClassLoaderFactory, Cloneable { newData = Collections.unmodifiableMap(newData); } return new ExecutionContext(getSecurityContext(), getNamespaceRegistry(), getValueFactories(), getPropertyFactory(), - getMimeTypeDetector(), getClassLoaderFactory(), newData); + getMimeTypeDetector(), getClassLoaderFactory(), newData, getProcessId()); + } + + /** + * Create a new execution context that mirrors this context but that contains the supplied process identifier. + * + * @param processId the identifier of the process + * @return the execution context that is identical with this execution context, but which uses the supplied process + * identifier; never null + * @since 2.1 + */ + public ExecutionContext with( String processId ) { + return new ExecutionContext(getSecurityContext(), getNamespaceRegistry(), getValueFactories(), getPropertyFactory(), + getMimeTypeDetector(), getClassLoaderFactory(), getData(), processId); } /** Index: modeshape-graph/src/main/java/org/modeshape/graph/Graph.java =================================================================== --- modeshape-graph/src/main/java/org/modeshape/graph/Graph.java (revision 1989) +++ modeshape-graph/src/main/java/org/modeshape/graph/Graph.java (working copy) @@ -7041,6 +7041,34 @@ public class Graph { return getLocation().hashCode(); } + /** + * {@inheritDoc} + * + * @see java.lang.Object#equals(java.lang.Object) + */ + @Override + public boolean equals( Object obj ) { + if (obj instanceof SubgraphResults) { + SubgraphResults that = (SubgraphResults)obj; + return getLocation().equals(that.getLocation()) && request.equals(that.request); + } else if (obj instanceof Subgraph) { + Subgraph that = (Subgraph)obj; + if (!getLocation().equals(that.getLocation())) return false; + Iterator thisIter = this.iterator(); + Iterator thatIter = that.iterator(); + while (thisIter.hasNext() && thatIter.hasNext()) { + SubgraphNode thisNode = thisIter.next(); + SubgraphNode thatNode = thatIter.next(); + if (!thisNode.getLocation().equals(thatNode.getLocation())) return false; + if (!thisNode.getProperties().equals(thatNode.getProperties())) return false; + if (!thisNode.getChildren().equals(thatNode.getChildren())) return false; + } + if (thisIter.hasNext() || thatIter.hasNext()) return false; + return true; + } + return false; + } + @Override public String toString() { return "Subgraph\n" + getToString(getContext()); Index: modeshape-graph/src/main/java/org/modeshape/graph/ModeShapeLexicon.java =================================================================== --- modeshape-graph/src/main/java/org/modeshape/graph/ModeShapeLexicon.java (revision 1989) +++ modeshape-graph/src/main/java/org/modeshape/graph/ModeShapeLexicon.java (working copy) @@ -48,7 +48,12 @@ public class ModeShapeLexicon { public static final Name RESOURCE = new BasicName(Namespace.URI, "resource"); public static final Name ROOT = new BasicName(Namespace.URI, "root"); public static final Name TIME_TO_EXPIRE = new BasicName(Namespace.URI, "timeToExpire"); - public static final Name NAMESPACE_URI = new BasicName(Namespace.URI, "uri"); + public static final Name URI = new BasicName(Namespace.URI, "uri"); + /** + * @deprecated Use {@link #URI} instead. + */ + @Deprecated + public static final Name NAMESPACE_URI = URI; public static final Name WORKSPACES = new BasicName(Namespace.URI, "workspaces"); public static final Name SOURCE_NAME = new BasicName(Namespace.URI, "source"); Index: modeshape-graph/src/main/java/org/modeshape/graph/observe/LocalObservationBus.java new file mode 100644 =================================================================== --- /dev/null (revision 1989) +++ modeshape-graph/src/main/java/org/modeshape/graph/observe/LocalObservationBus.java (working copy) @@ -0,0 +1,103 @@ +/* + * ModeShape (http://www.modeshape.org) + * See the COPYRIGHT.txt file distributed with this work for information + * regarding copyright ownership. Some portions may be licensed + * to Red Hat, Inc. under one or more contributor license agreements. + * See the AUTHORS.txt file in the distribution for a full listing of + * individual contributors. + * + * ModeShape is free software. Unless otherwise indicated, all code in ModeShape + * is licensed to you under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * ModeShape is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.modeshape.graph.observe; + +import net.jcip.annotations.ThreadSafe; +import org.modeshape.common.util.Logger; +import org.modeshape.graph.GraphI18n; + +/** + * A simple {@link Observer} that is itself {@link Observable}. This class essentially multiplexes the events from a single + * Observable to disseminate each event to multiple Observers. + */ +@ThreadSafe +public class LocalObservationBus implements ObservationBus { + private final ChangeObservers observers = new ChangeObservers(); + private static final Logger LOGGER = Logger.getLogger(LocalObservationBus.class); + + public LocalObservationBus() { + } + + /** + * {@inheritDoc} + * + * @see org.modeshape.graph.observe.ObservationBus#start() + */ + public void start() { + // nothing to do + } + + /** + * {@inheritDoc} + * + * @see org.modeshape.graph.observe.Observable#register(org.modeshape.graph.observe.Observer) + */ + public boolean register( Observer observer ) { + if (observer == null) return false; + return observers.register(observer); + } + + /** + * {@inheritDoc} + * + * @see org.modeshape.graph.observe.Observable#unregister(org.modeshape.graph.observe.Observer) + */ + public boolean unregister( Observer observer ) { + return observers.unregister(observer); + } + + /** + * {@inheritDoc} + * + * @see org.modeshape.graph.observe.Observer#notify(org.modeshape.graph.observe.Changes) + */ + public void notify( Changes changes ) { + if (changes != null) { + // Broadcast the changes to the registered observers ... + try { + observers.broadcast(changes); + } catch (RuntimeException t) { + LOGGER.error(t, GraphI18n.errorNotifyingObserver, t.getLocalizedMessage()); + } + } + } + + /** + * {@inheritDoc} + * + * @see org.modeshape.graph.observe.ObservationBus#hasObservers() + */ + public boolean hasObservers() { + return !observers.isEmpty(); + } + + /** + * {@inheritDoc} + * + * @see org.modeshape.graph.observe.ObservationBus#shutdown() + */ + public void shutdown() { + observers.shutdown(); + } +} Index: modeshape-graph/src/main/java/org/modeshape/graph/observe/ObservationBus.java =================================================================== --- modeshape-graph/src/main/java/org/modeshape/graph/observe/ObservationBus.java (revision 1989) +++ modeshape-graph/src/main/java/org/modeshape/graph/observe/ObservationBus.java (working copy) @@ -24,69 +24,28 @@ package org.modeshape.graph.observe; import net.jcip.annotations.ThreadSafe; -import org.modeshape.common.util.Logger; -import org.modeshape.graph.GraphI18n; /** - * A simple {@link Observer} that is itself {@link Observable}. This class essentially multiplexes the events from a single + * A simple {@link Observer} that is itself {@link Observable}. This interface essentially multiplexes the events from a single * Observable to disseminate each event to multiple Observers. */ @ThreadSafe -public class ObservationBus implements Observable, Observer { - private final ChangeObservers observers = new ChangeObservers(); - private static final Logger LOGGER = Logger.getLogger(ObservationBus.class); - - public ObservationBus() { - } - - /** - * {@inheritDoc} - * - * @see org.modeshape.graph.observe.Observable#register(org.modeshape.graph.observe.Observer) - */ - public boolean register( Observer observer ) { - if (observer == null) return false; - return observers.register(observer); - } +public interface ObservationBus extends Observable, Observer { /** - * {@inheritDoc} - * - * @see org.modeshape.graph.observe.Observable#unregister(org.modeshape.graph.observe.Observer) - */ - public boolean unregister( Observer observer ) { - return observers.unregister(observer); - } - - /** - * {@inheritDoc} - * - * @see org.modeshape.graph.observe.Observer#notify(org.modeshape.graph.observe.Changes) + * Prepare this bus for operation by starting any resources. */ - public void notify( Changes changes ) { - if (changes != null) { - // Broadcast the changes to the registered observers ... - try { - observers.broadcast(changes); - } catch (RuntimeException t) { - LOGGER.error(t, GraphI18n.errorNotifyingObserver, t.getLocalizedMessage()); - } - } - } + public void start(); /** * Determine whether this particular bus currently has any observers. * * @return true if there is at least one observer, or false otherwise */ - public boolean hasObservers() { - return !observers.isEmpty(); - } + public boolean hasObservers(); /** * Unregister all registered observers, and mark this as no longer accepting new registered observers. */ - public void shutdown() { - observers.shutdown(); - } + public void shutdown(); } Index: modeshape-graph/src/main/java/org/modeshape/graph/property/basic/GraphNamespaceRegistry.java =================================================================== --- modeshape-graph/src/main/java/org/modeshape/graph/property/basic/GraphNamespaceRegistry.java (revision 1989) +++ modeshape-graph/src/main/java/org/modeshape/graph/property/basic/GraphNamespaceRegistry.java (working copy) @@ -53,7 +53,7 @@ import org.modeshape.graph.property.ValueFactory; @NotThreadSafe public class GraphNamespaceRegistry implements NamespaceRegistry { - public static final Name DEFAULT_URI_PROPERTY_NAME = ModeShapeLexicon.NAMESPACE_URI; + public static final Name DEFAULT_URI_PROPERTY_NAME = ModeShapeLexicon.URI; public static final String GENERATED_PREFIX = "ns"; private SimpleNamespaceRegistry cache; Index: modeshape-graph/src/main/java/org/modeshape/graph/request/processor/RequestProcessor.java =================================================================== --- modeshape-graph/src/main/java/org/modeshape/graph/request/processor/RequestProcessor.java (revision 1989) +++ modeshape-graph/src/main/java/org/modeshape/graph/request/processor/RequestProcessor.java (working copy) @@ -1026,7 +1026,7 @@ public abstract class RequestProcessor { String userName = context.getSecurityContext() != null ? context.getSecurityContext().getUserName() : null; if (userName == null) userName = ""; String contextId = context.getId(); - String processId = null; + String processId = context.getProcessId(); Map userData = context.getData(); Changes changes = new Changes(processId, contextId, userName, getSourceName(), getNowInUtc(), this.changes, userData); observer.notify(changes); Index: modeshape-integration-tests/pom.xml =================================================================== --- modeshape-integration-tests/pom.xml (revision 1989) +++ modeshape-integration-tests/pom.xml (working copy) @@ -41,7 +41,7 @@ + --> junit junit @@ -75,6 +75,12 @@ test + + org.modeshape + modeshape-clustering + ${project.version} + test + org.modeshape modeshape-connector-jbosscache Index: modeshape-integration-tests/src/test/java/org/modeshape/test/integration/ClusteringTest.java new file mode 100644 =================================================================== --- /dev/null (revision 1989) +++ modeshape-integration-tests/src/test/java/org/modeshape/test/integration/ClusteringTest.java (working copy) @@ -0,0 +1,317 @@ +/* + * ModeShape (http://www.modeshape.org) + * See the COPYRIGHT.txt file distributed with this work for information + * regarding copyright ownership. Some portions may be licensed + * to Red Hat, Inc. under one or more contributor license agreements. + * See the AUTHORS.txt file in the distribution for a full listing of + * individual contributors. + * + * ModeShape is free software. Unless otherwise indicated, all code in ModeShape + * is licensed to you under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * ModeShape is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.modeshape.test.integration; + +import static org.hamcrest.core.Is.is; +import static org.hamcrest.core.IsNull.notNullValue; +import static org.junit.Assert.assertThat; +import java.io.InputStream; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import javax.jcr.ImportUUIDBehavior; +import javax.jcr.Node; +import javax.jcr.Repository; +import javax.jcr.RepositoryException; +import javax.jcr.Session; +import javax.jcr.UnsupportedRepositoryOperationException; +import javax.jcr.observation.Event; +import javax.jcr.observation.EventIterator; +import javax.jcr.observation.EventListener; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.modeshape.common.util.FileUtil; +import org.modeshape.connector.store.jpa.JpaSource; +import org.modeshape.jcr.JcrConfiguration; +import org.modeshape.jcr.JcrEngine; +import org.modeshape.jcr.ModeShapeRoles; +import org.modeshape.jcr.JcrRepository.Option; + +public class ClusteringTest { + + private static JcrConfiguration configuration; + private static JcrEngine engine1; + private static JcrEngine engine2; + private static JcrEngine engine3; + private static List sessions = new ArrayList(); + + @BeforeClass + public static void beforeEach() throws Exception { + // Delete the database files, if there are any ... + FileUtil.delete("target/db"); + + // Set up the database and keep this connection open ... + String url = "jdbc:hsqldb:file:target/db/ClusteredObservationBusTest"; + String username = "sa"; + String password = ""; + + configuration = new JcrConfiguration(); + configuration.repositorySource("car-source") + .usingClass(JpaSource.class) + .setDescription("The automobile content") + .setProperty("dialect", "org.hibernate.dialect.HSQLDialect") + .setProperty("driverClassName", "org.hsqldb.jdbcDriver") + .setProperty("url", url) + .setProperty("username", username) + .setProperty("password", password) + .setProperty("maximumConnectionsInPool", "2") + .setProperty("minimumConnectionsInPool", "1") + .setProperty("numberOfConnectionsToAcquireAsNeeded", "1") + .setProperty("maximumSizeOfStatementCache", "100") + .setProperty("maximumConnectionIdleTimeInSeconds", "10") + .setProperty("referentialIntegrityEnforced", "true") + .setProperty("largeValueSizeInBytes", "150") + .setProperty("autoGenerateSchema", "update") + .setProperty("retryLimit", "3") + .setProperty("showSql", "false"); + configuration.repository("cars") + .setSource("car-source") + .registerNamespace("car", "http://www.modeshape.org/examples/cars/1.0") + .addNodeTypes(resourceUrl("cars.cnd")) + .setOption(Option.ANONYMOUS_USER_ROLES, ModeShapeRoles.ADMIN); + configuration.clustering().setProperty("clusterName", "MyCluster");// .setProperty("configuration", ""); + + // Create an engine and use it to populate the source ... + engine1 = configuration.build(); + engine1.start(); + + Repository repository = engine1.getRepository("cars"); + + // Use a session to load the contents ... + Session session = repository.login(); + try { + InputStream stream = resourceStream("io/cars-system-view.xml"); + try { + session.getWorkspace().importXML("/", stream, ImportUUIDBehavior.IMPORT_UUID_CREATE_NEW); + } catch (Throwable t) { + t.printStackTrace(); + } finally { + stream.close(); + } + } finally { + session.logout(); + } + + // Make sure the data was imported ... + session = repository.login(); + try { + Node node = session.getRootNode().getNode("Cars/Hybrid/Toyota Highlander"); + assertThat(node, is(notNullValue())); + assertThat(session.getRootNode().getNodes().getSize(), is(2L)); // "Cars" and "jcr:system" + } finally { + session.logout(); + } + + // Start the other engines ... + engine2 = configuration.build(); + engine2.start(); + + engine3 = configuration.build(); + engine3.start(); + } + + @AfterClass + public static void afterAll() throws Exception { + // Close all of the sessions ... + for (Session session : sessions) { + if (session.isLive()) session.logout(); + } + sessions.clear(); + + // Shut down the engines ... + if (engine1 != null) engine1.shutdown(); + if (engine2 != null) engine2.shutdown(); + if (engine3 != null) engine3.shutdown(); + engine1 = null; + engine2 = null; + engine3 = null; + } + + // ---------------------------------------------------------------------------------------------------------------- + // Tests + // ---------------------------------------------------------------------------------------------------------------- + + @Test + public void shouldAllowMultipleEnginesToAccessSameDatabase() throws Exception { + Session session1 = sessionFrom(engine1); + Node node1 = session1.getRootNode().getNode("Cars/Hybrid/Toyota Highlander"); + assertThat(node1, is(notNullValue())); + + Session session2 = sessionFrom(engine2); + Node node = session2.getRootNode().getNode("Cars/Hybrid/Toyota Highlander"); + assertThat(node, is(notNullValue())); + } + + @Test + public void shouldReceiveNotificationsFromAllEnginesWhenChangingContentInOne() throws Exception { + Session session1 = sessionFrom(engine1); + Session session2 = sessionFrom(engine2); + Session session3 = sessionFrom(engine3); + + int eventTypes = Event.NODE_ADDED | Event.NODE_REMOVED; // |Event.PROPERTY_ADDED|Event.PROPERTY_CHANGED|Event.PROPERTY_REMOVED + CustomListener listener1 = addListenerTo(session1, eventTypes, 1); + CustomListener listener2 = addListenerTo(session2, eventTypes, 1); + CustomListener listener3 = addListenerTo(session3, eventTypes, 1); + CustomListener remoteListener1 = addRemoteListenerTo(session1, eventTypes, 0); + CustomListener remoteListener2 = addRemoteListenerTo(session2, eventTypes, 1); + CustomListener remoteListener3 = addRemoteListenerTo(session2, eventTypes, 1); + + // Make some changes ... + session1.getRootNode().addNode("SomeNewNode"); + session1.save(); + + // Wait for all the listeners ... + listener1.await(); + listener2.await(); + listener3.await(); + remoteListener1.await(); + remoteListener2.await(); + remoteListener3.await(); + + // Disconnect the listeners ... + listener1.disconnect(); + listener2.disconnect(); + listener3.disconnect(); + remoteListener1.disconnect(); + remoteListener2.disconnect(); + remoteListener3.disconnect(); + + // Now check the events ... + listener1.checkObservedEvents(); + listener2.checkObservedEvents(); + listener3.checkObservedEvents(); + remoteListener1.checkObservedEvents(); + remoteListener2.checkObservedEvents(); + remoteListener3.checkObservedEvents(); + } + + // ---------------------------------------------------------------------------------------------------------------- + // Utility Methods + // ---------------------------------------------------------------------------------------------------------------- + + protected Session sessionFrom( JcrEngine engine ) throws RepositoryException { + Repository repository = engine.getRepository("cars"); + Session session = repository.login(); + sessions.add(session); + return session; + } + + /** + * Add a listener for only remote events. + * + * @param session the session + * @param eventTypes the type of events + * @param expectedEventCount the number of expected events + * @return the listener + * @throws UnsupportedRepositoryOperationException + * @throws RepositoryException + */ + protected CustomListener addRemoteListenerTo( Session session, + int eventTypes, + int expectedEventCount ) + throws UnsupportedRepositoryOperationException, RepositoryException { + CustomListener listener = new CustomListener(session, expectedEventCount); + session.getWorkspace().getObservationManager().addEventListener(listener, eventTypes, null, true, null, null, true); + return listener; + } + + /** + * Add a listener for local and remote events. + * + * @param session the session + * @param eventTypes the type of events + * @param expectedEventCount the number of expected events + * @return the listener + * @throws UnsupportedRepositoryOperationException + * @throws RepositoryException + */ + protected CustomListener addListenerTo( Session session, + int eventTypes, + int expectedEventCount ) + throws UnsupportedRepositoryOperationException, RepositoryException { + CustomListener listener = new CustomListener(session, expectedEventCount); + session.getWorkspace().getObservationManager().addEventListener(listener, eventTypes, null, true, null, null, false); + return listener; + } + + protected static URL resourceUrl( String name ) { + return ClusteringTest.class.getClassLoader().getResource(name); + } + + protected static InputStream resourceStream( String name ) { + return ClusteringTest.class.getClassLoader().getResourceAsStream(name); + } + + protected static class CustomListener implements EventListener { + private final List receivedEvents = new ArrayList(); + private final int expectedEventCount; + private final CountDownLatch latch; + private final Session session; + + protected CustomListener( Session session, + int expectedEventCount ) { + this.latch = new CountDownLatch(expectedEventCount); + this.expectedEventCount = expectedEventCount; + this.session = session; + } + + /** + * {@inheritDoc} + * + * @see javax.jcr.observation.EventListener#onEvent(javax.jcr.observation.EventIterator) + */ + @Override + public void onEvent( EventIterator events ) { + while (events.hasNext()) { + receivedEvents.add(events.nextEvent()); + latch.countDown(); + } + } + + public void await() throws InterruptedException { + latch.await(3, TimeUnit.SECONDS); + } + + public List getObservedEvents() { + return receivedEvents; + } + + public void checkObservedEvents() { + StringBuilder msg = new StringBuilder("Expected "); + msg.append(expectedEventCount); + msg.append(" events but received "); + msg.append(receivedEvents.size()); + msg.append(": "); + msg.append(receivedEvents); + assertThat(msg.toString(), receivedEvents.size(), is(expectedEventCount)); + } + + public void disconnect() throws RepositoryException { + this.session.getWorkspace().getObservationManager().removeEventListener(this); + } + } +} Index: modeshape-jcr/src/main/java/org/modeshape/jcr/JcrEngine.java =================================================================== --- modeshape-jcr/src/main/java/org/modeshape/jcr/JcrEngine.java (revision 1989) +++ modeshape-jcr/src/main/java/org/modeshape/jcr/JcrEngine.java (working copy) @@ -118,9 +118,9 @@ public class JcrEngine extends ModeShapeEngine implements Repositories { } @Override - public void shutdown() { + protected void preShutdown() { scheduler.shutdown(); - super.shutdown(); + super.preShutdown(); try { this.repositoriesLock.lock(); @@ -253,7 +253,7 @@ public class JcrEngine extends ModeShapeEngine implements Repositories { Node namespacesNode = subgraph.getNode(ModeShapeLexicon.NAMESPACES); if (namespacesNode != null) { GraphNamespaceRegistry registry = new GraphNamespaceRegistry(configuration, namespacesNode.getLocation().getPath(), - ModeShapeLexicon.NAMESPACE_URI); + ModeShapeLexicon.URI); context = context.with(registry); } Index: modeshape-jcr/src/main/java/org/modeshape/jcr/JcrRepository.java =================================================================== --- modeshape-jcr/src/main/java/org/modeshape/jcr/JcrRepository.java (revision 1989) +++ modeshape-jcr/src/main/java/org/modeshape/jcr/JcrRepository.java (working copy) @@ -540,7 +540,7 @@ public class JcrRepository implements Repository { // Create the namespace registry and corresponding execution context. // Note that this persistent registry has direct access to the system workspace. - Name uriProperty = ModeShapeLexicon.NAMESPACE_URI; + Name uriProperty = ModeShapeLexicon.URI; PathFactory pathFactory = executionContext.getValueFactories().getPathFactory(); Path systemPath = pathFactory.create(JcrLexicon.SYSTEM); Path namespacesPath = pathFactory.create(systemPath, ModeShapeLexicon.NAMESPACES); @@ -1612,6 +1612,8 @@ public class JcrRepository implements Repository { private final Observable repositoryObservable; private final String sourceName; private final String systemSourceName; + private final String repositorySourceName; + private final String processId; /** * @param repositoryObservable the repository library observable this observer should register with @@ -1621,6 +1623,8 @@ public class JcrRepository implements Repository { this.repositoryObservable.register(this); this.sourceName = getObservableSourceName(); this.systemSourceName = getSystemSourceName(); + this.repositorySourceName = getRepositorySourceName(); + this.processId = getExecutionContext().getProcessId(); } /** @@ -1628,10 +1632,9 @@ public class JcrRepository implements Repository { * * @see org.modeshape.graph.observe.Observer#notify(org.modeshape.graph.observe.Changes) */ - public void notify( final Changes changes ) { - // We only care about events that come from the repository source or the system source ... - String changedSourceName = changes.getSourceName(); - if (sourceName.equals(changedSourceName) || systemSourceName.equals(changedSourceName)) { + public void notify( Changes changes ) { + final Changes acceptableChanges = filter(changes); + if (acceptableChanges != null) { // We're still in the thread where the connector published its changes, // so we need to create a runnable that will send these changes to all @@ -1647,7 +1650,7 @@ public class JcrRepository implements Repository { while (observerIterator.hasNext()) { Observer observer = observerIterator.next(); assert observer != null; - observer.notify(changes); + observer.notify(acceptableChanges); } } }; @@ -1657,6 +1660,33 @@ public class JcrRepository implements Repository { } } + private Changes filter( Changes changes ) { + // We only care about events that come from the repository source, the system source, + // or the repository source name (for remote events only) ... + String changedSourceName = changes.getSourceName(); + if (sourceName.equals(changedSourceName)) { + // These are changes made locally by this repository ... + return changes; + } + if (systemSourceName.equals(changedSourceName)) { + // These are changes made locally by this repository ... + return changes; + } + if (repositorySourceName.equals(changedSourceName)) { + // These may be events generated locally or from a remote engine in the cluster ... + if (this.processId.equals(changes.getProcessId())) { + // These events were made locally and are being handled above, so we can ignore these ... + return null; + } + // Otherwise, the changes were recieved from another engine in the cluster and + // we do want to respond to these changes. However, the source name of the changes + // needs to be altered to match the 'sourceName' ... + return new Changes(changes.getProcessId(), changes.getContextId(), changes.getUserName(), sourceName, + changes.getTimestamp(), changes.getChangeRequests(), changes.getData()); + } + return null; + } + /** * {@inheritDoc} * Index: modeshape-jcr/src/main/java/org/modeshape/jcr/ModeShapeLexicon.java =================================================================== --- modeshape-jcr/src/main/java/org/modeshape/jcr/ModeShapeLexicon.java (revision 1989) +++ modeshape-jcr/src/main/java/org/modeshape/jcr/ModeShapeLexicon.java (working copy) @@ -36,16 +36,15 @@ public class ModeShapeLexicon extends org.modeshape.repository.ModeShapeLexicon public static final Name BASE = new BasicName(Namespace.URI, "base"); public static final Name EXPIRATION_DATE = new BasicName(Namespace.URI, "expirationDate"); public static final Name IS_HELD_BY_SESSION = new BasicName(Namespace.URI, "isHeldBySession"); - public static final Name IS_SESSION_SCOPED = new BasicName(Namespace.URI, "isSessionScoped"); - public static final Name LOCK = new BasicName(Namespace.URI, "lock"); + public static final Name IS_SESSION_SCOPED = new BasicName(Namespace.URI, "isSessionScoped"); + public static final Name LOCK = new BasicName(Namespace.URI, "lock"); public static final Name LOCKED_UUID = new BasicName(Namespace.URI, "lockedUuid"); public static final Name LOCKING_SESSION = new BasicName(Namespace.URI, "lockingSession"); - public static final Name LOCKS = new BasicName(Namespace.URI, "locks"); + public static final Name LOCKS = new BasicName(Namespace.URI, "locks"); public static final Name NAMESPACE = new BasicName(Namespace.URI, "namespace"); public static final Name NODE_TYPES = new BasicName(Namespace.URI, "nodeTypes"); public static final Name REPOSITORIES = new BasicName(Namespace.URI, "repositories"); public static final Name SYSTEM = new BasicName(Namespace.URI, "system"); - public static final Name URI = new BasicName(Namespace.URI, "uri"); public static final Name VERSION_STORAGE = new BasicName(Namespace.URI, "versionStorage"); - public static final Name WORKSPACE = new BasicName(Namespace.URI, "workspace"); + public static final Name WORKSPACE = new BasicName(Namespace.URI, "workspace"); } Index: modeshape-jcr/src/test/java/org/modeshape/jcr/JcrConfigurationTest.java =================================================================== --- modeshape-jcr/src/test/java/org/modeshape/jcr/JcrConfigurationTest.java (revision 1989) +++ modeshape-jcr/src/test/java/org/modeshape/jcr/JcrConfigurationTest.java (working copy) @@ -383,7 +383,7 @@ public class JcrConfigurationTest { assertThat(subgraph.getNode("/mode:repositories/Car Repository/mode:namespaces").getChildren(), hasChild(segment("modetest"))); assertThat(subgraph.getNode("/mode:repositories/Car Repository/mode:namespaces/modetest"), - hasProperty(ModeShapeLexicon.NAMESPACE_URI, "http://www.modeshape.org/test/1.0")); + hasProperty(ModeShapeLexicon.URI, "http://www.modeshape.org/test/1.0")); // Initialize IDTrust and a policy file (which defines the "modeshape-jcr" login config name) String configFile = "security/jaas.conf.xml"; Index: modeshape-repository/src/main/java/org/modeshape/repository/ModeShapeConfiguration.java =================================================================== --- modeshape-repository/src/main/java/org/modeshape/repository/ModeShapeConfiguration.java (revision 1989) +++ modeshape-repository/src/main/java/org/modeshape/repository/ModeShapeConfiguration.java (working copy) @@ -56,6 +56,7 @@ import org.modeshape.graph.Workspace; import org.modeshape.graph.connector.RepositorySource; import org.modeshape.graph.connector.inmemory.InMemoryRepositorySource; import org.modeshape.graph.mimetype.MimeTypeDetector; +import org.modeshape.graph.observe.ObservationBus; import org.modeshape.graph.property.Name; import org.modeshape.graph.property.NamespaceRegistry; import org.modeshape.graph.property.Path; @@ -101,6 +102,7 @@ public class ModeShapeConfiguration { private final Map> sequencerDefinitions = new HashMap>(); private final Map> repositorySourceDefinitions = new HashMap>(); private final Map> mimeTypeDetectorDefinitions = new HashMap>(); + private ClusterDefinition clusterDefinition; /** * Create a new configuration, using a default-constructed {@link ExecutionContext}. @@ -283,8 +285,9 @@ public class ModeShapeConfiguration { graph.importXmlFrom(configurationFileInputStream).skippingRootElement(true).into(pathToParent); // The file was imported successfully, so now create the content information ... - configurationContent = new ConfigurationDefinition(configurationContent.getName(), source, null, pathToParent, context, - null); + configurationContent = configurationContent.with(pathToParent) + .with(source) + .withWorkspace(source.getDefaultWorkspaceName()); return this; } @@ -348,6 +351,8 @@ public class ModeShapeConfiguration { workspace = graph.createWorkspace().named(workspaceName); } assert workspace.getRoot() != null; + } else { + workspaceName = graph.getCurrentWorkspaceName(); // will be the default } // Verify the path ... @@ -356,8 +361,7 @@ public class ModeShapeConfiguration { assert parent != null; // Now create the content information ... - configurationContent = new ConfigurationDefinition(configurationContent.getName(), source, workspaceName, path, context, - null); + configurationContent = configurationContent.with(source).withWorkspace(workspaceName).with(path); return this; } @@ -675,6 +679,15 @@ public class ModeShapeConfiguration { } /** + * Obtain the definition for this engine's clustering. If no clustering definition exists, one will be created. + * + * @return the clustering definition; never null + */ + public ClusterDefinition clustering() { + return clusterDefinition(this); + } + + /** * Convenience method to make the code that sets up this configuration easier to read. This method simply returns this object. * * @return this configuration component; never null @@ -950,6 +963,15 @@ public class ModeShapeConfiguration { } /** + * Interface used to set up and define the cluster configuration. + * + * @param the type of the configuration component that owns this definition object + */ + public interface ClusterDefinition + extends Returnable, SetProperties>, Removable { + } + + /** * Interface used to set up and define a MIME type detector instance. * * @param the type of the configuration component that owns this definition object @@ -1094,6 +1116,21 @@ public class ModeShapeConfiguration { return definition; } + /** + * Utility method to construct a definition object for the clustering with the supplied name and return type. + * + * @param the type of the return object + * @param returnObject the return object + * @return the definition for the clustering; never null + */ + @SuppressWarnings( "unchecked" ) + protected ClusterDefinition clusterDefinition( ReturnType returnObject ) { + if (clusterDefinition == null) { + clusterDefinition = new ClusterBuilder(returnObject, changes(), path(), ModeShapeLexicon.CLUSTERING); + } + return (ClusterDefinition)clusterDefinition; + } + protected static class BaseReturnable implements Returnable { protected final ReturnType returnObject; @@ -1463,6 +1500,23 @@ public class ModeShapeConfiguration { } } + protected static class ClusterBuilder + extends GraphComponentBuilder, ObservationBus> + implements ClusterDefinition { + + protected ClusterBuilder( ReturnType returnObject, + Graph.Batch batch, + Path path, + Name... names ) { + super(returnObject, batch, path, names); + } + + @Override + protected ClusterDefinition thisType() { + return this; + } + } + /** * Representation of the current configuration content. */ @@ -1483,6 +1537,7 @@ public class ModeShapeConfiguration { ExecutionContext context, ClassLoaderFactory classLoaderFactory ) { assert configurationName != null; + assert source != null; this.name = configurationName; this.source = source; this.path = path != null ? path : RootPath.INSTANCE; @@ -1581,6 +1636,18 @@ public class ModeShapeConfiguration { * @return the new configuration */ public ConfigurationDefinition with( ClassLoaderFactory classLoaderFactory ) { + CheckArg.isNotNull(source, "source"); + return new ConfigurationDefinition(name, source, workspace, path, context, classLoaderFactory); + } + + /** + * Return a copy of this configuration that uses the supplied repository source instead of this object's + * {@link #getRepositorySource() repository source}. + * + * @param source the repository source containing the configuration + * @return the new configuration + */ + public ConfigurationDefinition with( RepositorySource source ) { return new ConfigurationDefinition(name, source, workspace, path, context, classLoaderFactory); } Index: modeshape-repository/src/main/java/org/modeshape/repository/ModeShapeEngine.java =================================================================== --- modeshape-repository/src/main/java/org/modeshape/repository/ModeShapeEngine.java (revision 1989) +++ modeshape-repository/src/main/java/org/modeshape/repository/ModeShapeEngine.java (working copy) @@ -21,6 +21,17 @@ */ package org.modeshape.repository; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import net.jcip.annotations.Immutable; import org.modeshape.common.collection.Problem; import org.modeshape.common.collection.Problems; @@ -44,28 +55,19 @@ import org.modeshape.graph.mimetype.ExtensionBasedMimeTypeDetector; import org.modeshape.graph.mimetype.MimeTypeDetector; import org.modeshape.graph.mimetype.MimeTypeDetectorConfig; import org.modeshape.graph.mimetype.MimeTypeDetectors; -import org.modeshape.graph.observe.ObservationBus; import org.modeshape.graph.property.Name; import org.modeshape.graph.property.Path; import org.modeshape.graph.property.PathExpression; import org.modeshape.graph.property.PathNotFoundException; import org.modeshape.graph.property.Property; +import org.modeshape.repository.cluster.ClusteringConfig; +import org.modeshape.repository.cluster.ClusteringService; import org.modeshape.repository.sequencer.SequencerConfig; import org.modeshape.repository.sequencer.SequencingService; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; - /** - * A single instance of the ModeShape services, which is obtained after setting up the {@link ModeShapeConfiguration#build() configuration}. + * A single instance of the ModeShape services, which is obtained after setting up the {@link ModeShapeConfiguration#build() + * configuration}. * * @see ModeShapeConfiguration */ @@ -82,16 +84,18 @@ public class ModeShapeEngine { private final RepositoryService repositoryService; private final SequencingService sequencingService; private final ExecutorService executorService; + private final ClusteringService clusteringService; private final MimeTypeDetectors detectors; + private final String engineId = UUID.randomUUID().toString(); private static final Logger LOGGER = Logger.getLogger(ModeShapeEngine.class); protected ModeShapeEngine( ExecutionContext context, - ModeShapeConfiguration.ConfigurationDefinition configuration ) { + ModeShapeConfiguration.ConfigurationDefinition configuration ) { this.problems = new SimpleProblems(); // Use the configuration's context ... this.detectors = new MimeTypeDetectors(); - this.context = context.with(detectors); + this.context = context.with(detectors).with(engineId); // And set up the scanner ... this.configuration = configuration; @@ -105,23 +109,24 @@ public class ModeShapeEngine { detectors.addDetector(new MimeTypeDetectorConfig("ExtensionDetector", "Extension-based MIME type detector", ExtensionBasedMimeTypeDetector.class)); - // Create the RepositoryContext that the configuration repository source should use ... - ObservationBus configurationChangeBus = new ObservationBus(); - RepositoryContext configContext = new SimpleRepositoryContext(context, configurationChangeBus, null); - final RepositorySource configSource = this.configuration.getRepositorySource(); - configSource.initialize(configContext); + // Create the clustering service ... + ClusteringConfig clusterConfig = scanner.getClusteringConfiguration(); + clusteringService = new ClusteringService(); + clusteringService.setExecutionContext(context); + clusteringService.setClusteringConfig(clusterConfig); // Create the RepositoryService, pointing it to the configuration repository ... Path pathToConfigurationRoot = this.configuration.getPath(); String configWorkspaceName = this.configuration.getWorkspace(); - repositoryService = new RepositoryService(configSource, configWorkspaceName, pathToConfigurationRoot, context, problems); + RepositorySource configSource = this.configuration.getRepositorySource(); + repositoryService = new RepositoryService(configSource, configWorkspaceName, pathToConfigurationRoot, context, + clusteringService, problems); - // Now register the repository service to be notified of changes to the configuration ... - configurationChangeBus.register(repositoryService); + // Create the executor service (which starts out with 0 threads, so it's okay to do here) ... + ThreadFactory threadPoolFactory = new NamedThreadFactory(configuration.getName()); + executorService = Executors.newCachedThreadPool(threadPoolFactory); // Create the sequencing service ... - ThreadFactory threadPoolFactory = new NamedThreadFactory(configuration.getName()); - executorService = Executors.newScheduledThreadPool(10, threadPoolFactory); sequencingService = new SequencingService(); sequencingService.setExecutionContext(context); sequencingService.setExecutorService(executorService); @@ -129,8 +134,9 @@ public class ModeShapeEngine { for (SequencerConfig sequencerConfig : scanner.getSequencingConfigurations()) { sequencingService.addSequencer(sequencerConfig); } - } + // The rest of the instantiation/configuration will be done in start() + } /** * Get the problems that were encountered when setting up this engine from the configuration. @@ -270,8 +276,18 @@ public class ModeShapeEngine { // Then throw an exception ... throw new IllegalStateException(RepositoryI18n.errorsPreventStarting.text()); } + + // Create the RepositoryContext that the configuration repository source should use ... + RepositoryContext configContext = new SimpleRepositoryContext(context, clusteringService, null); + configuration.getRepositorySource().initialize(configContext); + + // Start the various services ... + clusteringService.getAdministrator().start(); repositoryService.getAdministrator().start(); sequencingService.getAdministrator().start(); + + // Now register the repository service to be notified of changes to the configuration ... + clusteringService.register(repositoryService); } /** @@ -282,17 +298,27 @@ public class ModeShapeEngine { * @see #start() */ public void shutdown() { - // Then terminate the executor service, which may be running background jobs that are not yet completed + preShutdown(); + postShutdown(); + } + + protected void preShutdown() { + // Terminate the executor service, which may be running background jobs that are not yet completed // and which will prevent new jobs being submitted (to the sequencing service) ... executorService.shutdown(); - // First, shutdown the sequencing service, which will prevent any additional jobs from going through ... + // Next, shutdown the sequencing service, which will prevent any additional jobs from going through ... sequencingService.getAdministrator().shutdown(); - // Finally shut down the repository source, which closes all connections ... + // Shut down the repository source, which closes all connections ... repositoryService.getAdministrator().shutdown(); } + protected void postShutdown() { + // Finally shut down the clustering service ... + clusteringService.shutdown(); + } + /** * Blocks until the shutdown has completed, or the timeout occurs, or the current thread is interrupted, whichever happens * first. @@ -387,6 +413,49 @@ public class ModeShapeEngine { return detectors; } + public ClusteringConfig getClusteringConfiguration() { + Graph graph = Graph.create(configurationRepository.getRepositorySource(), context); + Path pathToClusteringNode = context.getValueFactories().getPathFactory().create(configurationRepository.getPath(), + ModeShapeLexicon.CLUSTERING); + try { + Subgraph subgraph = graph.getSubgraphOfDepth(2).at(pathToClusteringNode); + + Set skipProperties = new HashSet(); + skipProperties.add(ModeShapeLexicon.DESCRIPTION); + skipProperties.add(ModeShapeLexicon.CLASSNAME); + skipProperties.add(ModeShapeLexicon.CLASSPATH); + Set skipNamespaces = new HashSet(); + skipNamespaces.add(JcrLexicon.Namespace.URI); + skipNamespaces.add(JcrNtLexicon.Namespace.URI); + skipNamespaces.add(JcrMixLexicon.Namespace.URI); + + Node clusterNode = subgraph.getRoot(); + String name = stringValueOf(clusterNode); + String desc = stringValueOf(clusterNode, ModeShapeLexicon.DESCRIPTION); + String classname = stringValueOf(clusterNode, ModeShapeLexicon.CLASSNAME); + String[] classpath = stringValuesOf(clusterNode, ModeShapeLexicon.CLASSPATH); + if (classname == null || classname.trim().length() == 0) { + classname = "org.modeshape.clustering.ClusteredObservationBus"; + } + + Map properties = new HashMap(); + for (Property property : clusterNode.getProperties()) { + Name propertyName = property.getName(); + if (skipNamespaces.contains(propertyName.getNamespaceUri())) continue; + if (skipProperties.contains(propertyName)) continue; + if (property.isSingle()) { + properties.put(propertyName.getLocalName(), property.getFirstValue()); + } else { + properties.put(propertyName.getLocalName(), property.getValuesAsArray()); + } + } + return new ClusteringConfig(name, desc, properties, classname, classpath); + } catch (PathNotFoundException e) { + // no detectors registered ... + } + return null; + } + public List getSequencingConfigurations() { List configs = new ArrayList(); Graph graph = Graph.create(configurationRepository.getRepositorySource(), context); Index: modeshape-repository/src/main/java/org/modeshape/repository/ModeShapeLexicon.java =================================================================== --- modeshape-repository/src/main/java/org/modeshape/repository/ModeShapeLexicon.java (revision 1989) +++ modeshape-repository/src/main/java/org/modeshape/repository/ModeShapeLexicon.java (working copy) @@ -48,4 +48,7 @@ public class ModeShapeLexicon extends org.modeshape.graph.ModeShapeLexicon { public static final Name VALUE = new BasicName(Namespace.URI, "value"); public static final Name RETRY_LIMIT = new BasicName(Namespace.URI, "retryLimit"); public static final Name DEFAULT_CACHE_POLICY = new BasicName(Namespace.URI, "defaultCachePolicy"); + public static final Name CLUSTERING = new BasicName(Namespace.URI, "clustering"); + public static final Name CONFIGURATION = new BasicName(Namespace.URI, "configuration"); + public static final Name CLUSTER_NAME = new BasicName(Namespace.URI, "clusterName"); } Index: modeshape-repository/src/main/java/org/modeshape/repository/RepositoryI18n.java =================================================================== --- modeshape-repository/src/main/java/org/modeshape/repository/RepositoryI18n.java (revision 1989) +++ modeshape-repository/src/main/java/org/modeshape/repository/RepositoryI18n.java (working copy) @@ -55,27 +55,14 @@ public final class RepositoryI18n { public static I18n errorFindingPropertyNameInPropertyChangedEvent; public static I18n errorFindingPropertyNameInPropertyRemovedEvent; - // Rules - public static I18n unableToObtainJsr94RuleAdministrator; - public static I18n errorUsingJsr94RuleAdministrator; - public static I18n unableToObtainJsr94ServiceProvider; - public static I18n errorAddingOrUpdatingRuleSet; - public static I18n errorRollingBackRuleSetAfterUpdateFailed; - public static I18n errorReadingRulesAndProperties; - public static I18n errorDeregisteringRuleSetBeforeUpdatingIt; - public static I18n errorRecreatingRuleSet; - public static I18n errorRemovingRuleSet; - public static I18n errorRemovingRuleSetUponShutdown; - public static I18n unableToFindRuleSet; - public static I18n errorExecutingRuleSetWithGlobalsAndFacts; - public static I18n unableToBuildRuleSetRegularExpressionPattern; - - public static I18n errorObtainingSessionToRepositoryWorkspace; - public static I18n errorWritingProblemsOnRuleSet; - - public static I18n federationServiceName; - public static I18n observationServiceName; - public static I18n ruleServiceName; + // Repository service ... + public static I18n repositoryServiceName; + + // Clustering service ... + public static I18n clusteringServiceName; + public static I18n unableToRegisterObserverOnUnstartedClusteringService; + public static I18n unableToUnregisterObserverOnUnstartedClusteringService; + public static I18n unableToNotifyObserversOnUnstartedClusteringService; // Sequencing public static I18n sequencingServiceName; Index: modeshape-repository/src/main/java/org/modeshape/repository/RepositoryLibrary.java =================================================================== --- modeshape-repository/src/main/java/org/modeshape/repository/RepositoryLibrary.java (revision 1989) +++ modeshape-repository/src/main/java/org/modeshape/repository/RepositoryLibrary.java (working copy) @@ -65,7 +65,7 @@ public class RepositoryLibrary implements RepositoryConnectionFactory, Observabl protected class Administrator extends AbstractServiceAdministrator { protected Administrator() { - super(RepositoryI18n.federationServiceName, State.STARTED); + super(RepositoryI18n.repositoryServiceName, State.STARTED); } /** @@ -109,7 +109,7 @@ public class RepositoryLibrary implements RepositoryConnectionFactory, Observabl private final Map pools = new HashMap(); private RepositoryConnectionFactory delegate; private final ExecutionContext executionContext; - private final ObservationBus observationBus = new ObservationBus(); + private final ObservationBus observationBus; private final RepositorySource configurationSource; private final String configurationWorkspaceName; private final Path pathToConfigurationRoot; @@ -121,21 +121,27 @@ public class RepositoryLibrary implements RepositoryConnectionFactory, Observabl * @param configurationWorkspaceName the name of the workspace in the {@link RepositorySource} that is the configuration * repository, or null if the default workspace of the source should be used (if there is one) * @param pathToSourcesConfigurationRoot the path of the node in the configuration source repository that should be treated by - * this service as the root of the service's configuration; if null, then "/dna:system" is used + * this service as the root of the service's configuration * @param context the execution context in which this service should run - * @throws IllegalArgumentException if the executionContextFactory reference is null + * @param observationBus the {@link ObservationBus} instance that should be used for changes in the sources + * @throws IllegalArgumentException if any of the configurationSource, + * pathToSourcesConfigurationRoot, observationBus, or context references are + * null */ public RepositoryLibrary( RepositorySource configurationSource, String configurationWorkspaceName, Path pathToSourcesConfigurationRoot, - final ExecutionContext context ) { + final ExecutionContext context, + ObservationBus observationBus ) { CheckArg.isNotNull(configurationSource, "configurationSource"); CheckArg.isNotNull(context, "context"); CheckArg.isNotNull(pathToSourcesConfigurationRoot, "pathToSourcesConfigurationRoot"); + CheckArg.isNotNull(observationBus, "observationBus"); this.executionContext = context; this.configurationSource = configurationSource; this.configurationWorkspaceName = configurationWorkspaceName; this.pathToConfigurationRoot = pathToSourcesConfigurationRoot; + this.observationBus = observationBus; } /** @@ -225,8 +231,6 @@ public class RepositoryLibrary implements RepositoryConnectionFactory, Observabl } finally { this.sourcesLock.readLock().unlock(); } - // Remove all listeners ... - this.observationBus.shutdown(); } /** Index: modeshape-repository/src/main/java/org/modeshape/repository/RepositoryService.java =================================================================== --- modeshape-repository/src/main/java/org/modeshape/repository/RepositoryService.java (revision 1989) +++ modeshape-repository/src/main/java/org/modeshape/repository/RepositoryService.java (working copy) @@ -23,6 +23,11 @@ */ package org.modeshape.repository; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import net.jcip.annotations.ThreadSafe; import org.modeshape.common.collection.Problems; import org.modeshape.common.collection.SimpleProblems; @@ -38,6 +43,7 @@ import org.modeshape.graph.Subgraph; import org.modeshape.graph.connector.RepositorySource; import org.modeshape.graph.observe.Changes; import org.modeshape.graph.observe.NetChangeObserver; +import org.modeshape.graph.observe.ObservationBus; import org.modeshape.graph.observe.Observer; import org.modeshape.graph.property.Name; import org.modeshape.graph.property.Path; @@ -52,12 +58,6 @@ import org.modeshape.repository.service.AbstractServiceAdministrator; import org.modeshape.repository.service.AdministeredService; import org.modeshape.repository.service.ServiceAdministrator; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - /** * A service that manages the {@link RepositorySource}es defined within a configuration repository. */ @@ -72,7 +72,7 @@ public class RepositoryService implements AdministeredService, Observer { protected class Administrator extends AbstractServiceAdministrator { protected Administrator() { - super(RepositoryI18n.federationServiceName, State.PAUSED); + super(RepositoryI18n.repositoryServiceName, State.PAUSED); } /** @@ -135,6 +135,7 @@ public class RepositoryService implements AdministeredService, Observer { * @param pathToConfigurationRoot the path of the node in the configuration source repository that should be treated by this * service as the root of the service's configuration; if null, then "/dna:system" is used * @param context the execution context in which this service should run + * @param observationBus the {@link ObservationBus} instance that should be used for changes in the sources * @param problems the {@link Problems} instance that this service should use to report problems starting repositories * @throws IllegalArgumentException if the bootstrap source is null or the execution context is null */ @@ -142,15 +143,18 @@ public class RepositoryService implements AdministeredService, Observer { String configurationWorkspaceName, Path pathToConfigurationRoot, ExecutionContext context, + ObservationBus observationBus, Problems problems ) { CheckArg.isNotNull(configurationSource, "configurationSource"); CheckArg.isNotNull(context, "context"); + CheckArg.isNotNull(observationBus, "observationBus"); PathFactory pathFactory = context.getValueFactories().getPathFactory(); if (pathToConfigurationRoot == null) pathToConfigurationRoot = pathFactory.create("/dna:system"); if (problems == null) problems = new SimpleProblems(); Path sourcesPath = pathFactory.create(pathToConfigurationRoot, ModeShapeLexicon.SOURCES); - this.sources = new RepositoryLibrary(configurationSource, configurationWorkspaceName, sourcesPath, context); + this.sources = new RepositoryLibrary(configurationSource, configurationWorkspaceName, sourcesPath, context, + observationBus); this.sources.addSource(configurationSource); this.pathToConfigurationRoot = pathToConfigurationRoot; this.configurationSourceName = configurationSource.getName(); @@ -535,6 +539,10 @@ public class RepositoryService implements AdministeredService, Observer { */ @Override protected void notify( NetChanges netChanges ) { + if (getConfigurationWorkspaceName() == null) { + // This was a transient configuration source, so it should never change ... + return; + } if (!getConfigurationSourceName().equals(netChanges.getSourceName())) return; for (NetChange change : netChanges.getNetChanges()) { if (!getConfigurationWorkspaceName().equals(change.getRepositoryWorkspaceName())) return; Index: modeshape-repository/src/main/java/org/modeshape/repository/cluster/ClusteringConfig.java new file mode 100644 =================================================================== --- /dev/null (revision 1989) +++ modeshape-repository/src/main/java/org/modeshape/repository/cluster/ClusteringConfig.java (working copy) @@ -0,0 +1,59 @@ +/* + * ModeShape (http://www.modeshape.org) + * See the COPYRIGHT.txt file distributed with this work for information + * regarding copyright ownership. Some portions may be licensed + * to Red Hat, Inc. under one or more contributor license agreements. + * See the AUTHORS.txt file in the distribution for a full listing of + * individual contributors. + * + * ModeShape is free software. Unless otherwise indicated, all code in ModeShape + * is licensed to you under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * ModeShape is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.modeshape.repository.cluster; + +import java.util.Map; +import net.jcip.annotations.Immutable; +import org.modeshape.common.component.ComponentConfig; + +/** + * A configuration for a cluster. + */ +@Immutable +public class ClusteringConfig extends ComponentConfig { + + public ClusteringConfig( String name, + String description, + String classname, + String[] classpath ) { + this(name, description, System.currentTimeMillis(), null, classname, classpath); + } + + public ClusteringConfig( String name, + String description, + Map properties, + String classname, + String[] classpath ) { + this(name, description, System.currentTimeMillis(), properties, classname, classpath); + } + + public ClusteringConfig( String name, + String description, + long timestamp, + Map properties, + String classname, + String[] classpath ) { + super(name, description, timestamp, properties, classname, classpath); + } +} Index: modeshape-repository/src/main/java/org/modeshape/repository/cluster/ClusteringService.java new file mode 100644 =================================================================== --- /dev/null (revision 1989) +++ modeshape-repository/src/main/java/org/modeshape/repository/cluster/ClusteringService.java (working copy) @@ -0,0 +1,238 @@ +/* + * ModeShape (http://www.modeshape.org) + * See the COPYRIGHT.txt file distributed with this work for information + * regarding copyright ownership. Some portions may be licensed + * to Red Hat, Inc. under one or more contributor license agreements. + * See the AUTHORS.txt file in the distribution for a full listing of + * individual contributors. + * + * ModeShape is free software. Unless otherwise indicated, all code in ModeShape + * is licensed to you under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * ModeShape is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.modeshape.repository.cluster; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.modeshape.common.component.ComponentLibrary; +import org.modeshape.common.util.CheckArg; +import org.modeshape.graph.ExecutionContext; +import org.modeshape.graph.observe.Changes; +import org.modeshape.graph.observe.LocalObservationBus; +import org.modeshape.graph.observe.ObservationBus; +import org.modeshape.graph.observe.Observer; +import org.modeshape.repository.RepositoryI18n; +import org.modeshape.repository.service.AbstractServiceAdministrator; +import org.modeshape.repository.service.AdministeredService; +import org.modeshape.repository.service.ServiceAdministrator; + +/** + * The service that provides the observation bus for a clustered (or unclustered) environment. + */ +public class ClusteringService implements AdministeredService, ObservationBus { + + /** + * The administrative component for this service. + */ + protected class Administrator extends AbstractServiceAdministrator { + + protected Administrator() { + super(RepositoryI18n.clusteringServiceName, State.PAUSED); + } + + /** + * {@inheritDoc} + */ + @Override + protected void doStart( State fromState ) { + super.doStart(fromState); + startService(); + } + + /** + * {@inheritDoc} + */ + @Override + protected void doShutdown( State fromState ) { + super.doShutdown(fromState); + shutdownService(); + } + + /** + * {@inheritDoc} + */ + @Override + protected boolean doCheckIsTerminated() { + return isServiceTerminated(); + } + + /** + * {@inheritDoc} + */ + public boolean awaitTermination( long timeout, + TimeUnit unit ) { + return true; // nothing to wait for + } + + } + + private ExecutionContext executionContext; + private ObservationBus bus; + private final ComponentLibrary busLibrary = new ComponentLibrary(); + + /** + * @return executionContext + */ + public ExecutionContext getExecutionContext() { + return this.executionContext; + } + + /** + * @param executionContext Sets executionContext to the specified value. + */ + public void setExecutionContext( ExecutionContext executionContext ) { + CheckArg.isNotNull(executionContext, "execution context"); + if (this.getAdministrator().isStarted()) { + throw new IllegalStateException(RepositoryI18n.unableToChangeExecutionContextWhileRunning.text()); + } + this.executionContext = executionContext; + this.busLibrary.setClassLoaderFactory(executionContext); + } + + /** + * Set the configuration for the clustering. This method will replace any existing configuration. + * + * @param config the new configuration, or null if the default configuration should be used + * @return true if the configuration was set, or false otherwise + */ + public boolean setClusteringConfig( ClusteringConfig config ) { + if (config == null) config = createDefaultConfiguration(); + return this.busLibrary.removeAllAndAdd(config); + } + + /** + * {@inheritDoc} + * + * @see org.modeshape.graph.observe.ObservationBus#hasObservers() + */ + @Override + public boolean hasObservers() { + return bus != null && bus.hasObservers(); + } + + /** + * {@inheritDoc} + * + * @see org.modeshape.graph.observe.Observable#register(org.modeshape.graph.observe.Observer) + */ + @Override + public boolean register( Observer observer ) { + if (bus == null) { + throw new IllegalStateException(RepositoryI18n.unableToRegisterObserverOnUnstartedClusteringService.text()); + } + return bus.register(observer); + } + + /** + * {@inheritDoc} + * + * @see org.modeshape.graph.observe.Observable#unregister(org.modeshape.graph.observe.Observer) + */ + @Override + public boolean unregister( Observer observer ) { + if (bus == null) { + throw new IllegalStateException(RepositoryI18n.unableToUnregisterObserverOnUnstartedClusteringService.text()); + } + return bus.unregister(observer); + } + + /** + * {@inheritDoc} + * + * @see org.modeshape.graph.observe.Observer#notify(org.modeshape.graph.observe.Changes) + */ + @Override + public void notify( Changes changes ) { + if (bus == null) { + throw new IllegalStateException(RepositoryI18n.unableToNotifyObserversOnUnstartedClusteringService.text()); + } + bus.notify(changes); + } + + /** + * {@inheritDoc} + * + * @see org.modeshape.repository.service.AdministeredService#getAdministrator() + */ + @Override + public ServiceAdministrator getAdministrator() { + return new Administrator(); + } + + /** + * {@inheritDoc} + *

+ * This is equivalent to calling getAdminstrator().start() and can be called multiple times. + *

+ * + * @see org.modeshape.graph.observe.ObservationBus#start() + */ + @Override + public void start() { + getAdministrator().start(); + } + + /** + * {@inheritDoc} + *

+ * This is equivalent to calling getAdminstrator().shutdown(). + *

+ * + * @see org.modeshape.graph.observe.ObservationBus#shutdown() + */ + @Override + public void shutdown() { + getAdministrator().shutdown(); + } + + protected void startService() { + List instances = busLibrary.getInstances(); + if (instances.isEmpty()) { + setClusteringConfig(null); + instances = busLibrary.getInstances(); + assert instances.size() > 0; + } + this.bus = instances.get(0); + this.bus.start(); + } + + protected void shutdownService() { + // Unregister our observer ... + try { + if (this.bus != null) { + this.bus.shutdown(); + } + } finally { + this.bus = null; + } + } + + protected boolean isServiceTerminated() { + return this.bus != null; + } + + protected ClusteringConfig createDefaultConfiguration() { + return new ClusteringConfig("bus", "Local observation bus", LocalObservationBus.class.getName(), null); + } +} Index: modeshape-repository/src/main/resources/org/modeshape/repository/RepositoryI18n.properties =================================================================== --- modeshape-repository/src/main/resources/org/modeshape/repository/RepositoryI18n.properties (revision 1989) +++ modeshape-repository/src/main/resources/org/modeshape/repository/RepositoryI18n.properties (working copy) @@ -40,26 +40,12 @@ errorFindingPropertyNameInPropertyAddedEvent = Error finding the name of the add errorFindingPropertyNameInPropertyChangedEvent = Error finding the name of the changed property in the event path {0} errorFindingPropertyNameInPropertyRemovedEvent = Error finding the name of the removed property in the event path {0} -unableToObtainJsr94RuleAdministrator = Unable to obtain the rule administrator for JSR-94 service provider {0} ({1}) while adding/updating rule set {2} -errorUsingJsr94RuleAdministrator = Error using rule administrator for JSR-94 service provider {0} ({1}) while adding/updating rule set {2} -unableToObtainJsr94ServiceProvider = Error using rule administrator for JSR-94 service provider {0} ({1}) -errorAddingOrUpdatingRuleSet = Error adding/updating rule set "{0}" -errorRollingBackRuleSetAfterUpdateFailed = Error rolling back rule set "{0}" after new rule set failed -errorReadingRulesAndProperties = Error reading the rules and properties while adding/updating rule set "{0}" -errorDeregisteringRuleSetBeforeUpdatingIt = Error deregistering rule set "{0}" before updating it -errorRecreatingRuleSet = Error (re)creating the rule set "{0}" -errorRemovingRuleSet = Error removing rule set "{0}" -errorRemovingRuleSetUponShutdown = Error removing rule set "{0}" upon shutdown -unableToFindRuleSet = Unable to find rule set with name "{0}" -errorExecutingRuleSetWithGlobalsAndFacts = Error executing rule set "{0}" and with globals {1} and facts {2} -unableToBuildRuleSetRegularExpressionPattern = Unable to build the rule set name pattern "{0}" using the supplied absolute path ("{1}"): {2} +repositoryServiceName = Repository Service -errorObtainingSessionToRepositoryWorkspace = Error obtaining JCR session to repository workspace {0} -errorWritingProblemsOnRuleSet = Error while writing problems on rule set node {0} - -federationServiceName = Federation Service -observationServiceName = Observation Service -ruleServiceName = Rule Service +clusteringServiceName = Clustering Service +unableToRegisterObserverOnUnstartedClusteringService = Unable to register an observer until the clustering service is started +unableToUnregisterObserverOnUnstartedClusteringService = Unable to unregister an observer until the clustering service is started +unableToNotifyObserversOnUnstartedClusteringService = Unable to notify observers until the clustering service is started sequencingServiceName = Sequencing Service unableToChangeExecutionContextWhileRunning = Unable to change the execution context while running Index: modeshape-repository/src/test/java/org/modeshape/repository/RepositoryServiceTest.java =================================================================== --- modeshape-repository/src/test/java/org/modeshape/repository/RepositoryServiceTest.java (revision 1989) +++ modeshape-repository/src/test/java/org/modeshape/repository/RepositoryServiceTest.java (working copy) @@ -29,25 +29,27 @@ import static org.hamcrest.core.IsNull.notNullValue; import static org.hamcrest.core.IsNull.nullValue; import static org.hamcrest.core.IsSame.sameInstance; import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.when; import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; import java.util.concurrent.TimeUnit; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; import org.modeshape.common.collection.Problems; import org.modeshape.common.collection.SimpleProblems; import org.modeshape.common.util.Logger; -import org.modeshape.graph.ModeShapeLexicon; import org.modeshape.graph.ExecutionContext; import org.modeshape.graph.Graph; +import org.modeshape.graph.ModeShapeLexicon; import org.modeshape.graph.connector.RepositoryConnection; import org.modeshape.graph.connector.RepositorySource; import org.modeshape.graph.connector.inmemory.InMemoryRepositorySource; +import org.modeshape.graph.observe.LocalObservationBus; +import org.modeshape.graph.observe.ObservationBus; import org.modeshape.graph.property.Path; import org.modeshape.repository.service.ServiceAdministrator; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.mockito.MockitoAnnotations; -import org.mockito.Mock; /** * @author Randall Hauch @@ -62,6 +64,7 @@ public class RepositoryServiceTest { private ExecutionContext context; private Path root; private Problems problems; + private ObservationBus bus; @Mock private RepositoryLibrary sources; @@ -80,7 +83,8 @@ public class RepositoryServiceTest { when(sources.createConnection(configSourceName)).thenReturn(configRepositoryConnection); root = context.getValueFactories().getPathFactory().createRootPath(); problems = new SimpleProblems(); - service = new RepositoryService(configRepositorySource, configWorkspaceName, root, context, problems); + bus = new LocalObservationBus(); + service = new RepositoryService(configRepositorySource, configWorkspaceName, root, context, bus, problems); } @After @@ -193,7 +197,7 @@ public class RepositoryServiceTest { @Test public void shouldConfigureRepositorySourceWithSetterThatTakesArrayButWithSingleValues() { Path configPath = context.getValueFactories().getPathFactory().create("/mode:system"); - service = new RepositoryService(configRepositorySource, configWorkspaceName, configPath, context, problems); + service = new RepositoryService(configRepositorySource, configWorkspaceName, configPath, context, bus, problems); // Set up the configuration repository ... configRepository.useWorkspace("default"); Index: modeshape-repository/src/test/java/org/modeshape/repository/sequencer/SequencingServiceTest.java =================================================================== --- modeshape-repository/src/test/java/org/modeshape/repository/sequencer/SequencingServiceTest.java (revision 1989) +++ modeshape-repository/src/test/java/org/modeshape/repository/sequencer/SequencingServiceTest.java (working copy) @@ -39,6 +39,7 @@ import org.junit.Test; import org.modeshape.graph.ExecutionContext; import org.modeshape.graph.Graph; import org.modeshape.graph.connector.inmemory.InMemoryRepositorySource; +import org.modeshape.graph.observe.LocalObservationBus; import org.modeshape.graph.property.Path; import org.modeshape.repository.RepositoryLibrary; import org.modeshape.repository.service.ServiceAdministrator; @@ -62,7 +63,7 @@ public class SequencingServiceTest { configSource.setDefaultWorkspaceName("default"); Path configPath = context.getValueFactories().getPathFactory().create("/"); - sources = new RepositoryLibrary(configSource, "default", configPath, context); + sources = new RepositoryLibrary(configSource, "default", configPath, context, new LocalObservationBus()); InMemoryRepositorySource source = new InMemoryRepositorySource(); source.setName(REPOSITORY_SOURCE_NAME); sources.addSource(source); Index: pom.xml =================================================================== --- pom.xml (revision 1989) +++ pom.xml (working copy) @@ -131,6 +131,7 @@ modeshape-repository modeshape-cnd extensions/modeshape-search-lucene + extensions/modeshape-clustering modeshape-jcr-api modeshape-jcr extensions/modeshape-classloader-maven