Index: .gitignore =================================================================== --- .gitignore (revision 1976) +++ .gitignore (working copy) @@ -55,6 +55,7 @@ /deploy/jbossas/modeshape-jbossas-service/target/ /deploy/jbossas/modeshape-jbossas-web-rest-war/target/ +/extensions/modeshape-clustering/target /extensions/modeshape-search-lucene/target /extensions/modeshape-classloader-maven/target /extensions/modeshape-common-jdbc/target Index: extensions/modeshape-clustering/.classpath new file mode 100644 =================================================================== --- /dev/null (revision 1976) +++ extensions/modeshape-clustering/.classpath (working copy) @@ -0,0 +1,10 @@ + + + + + + + + + + Index: extensions/modeshape-clustering/.project new file mode 100644 =================================================================== --- /dev/null (revision 1976) +++ extensions/modeshape-clustering/.project (working copy) @@ -0,0 +1,23 @@ + + + modeshape-clustering + + + + + + org.maven.ide.eclipse.maven2Builder + + + + + org.eclipse.jdt.core.javabuilder + + + + + + org.eclipse.jdt.core.javanature + org.maven.ide.eclipse.maven2Nature + + Index: extensions/modeshape-clustering/pom.xml new file mode 100644 =================================================================== --- /dev/null (revision 1976) +++ extensions/modeshape-clustering/pom.xml (working copy) @@ -0,0 +1,131 @@ + + + 4.0.0 + + org.modeshape + modeshape + 2.1-SNAPSHOT + ../.. + + + modeshape-clustering + jar + ModeShape Clustering Component + ModeShape clustering mechanism. + http://www.modeshape.org + + + + + org.modeshape + modeshape-graph + + + + jgroups + jgroups + 2.9.0.GA + + + + junit + junit + + + org.mockito + mockito-all + + + org.modeshape + modeshape-common + ${project.version} + test-jar + test + + + + org.slf4j + slf4j-api + + + org.slf4j + slf4j-log4j12 + + + log4j + log4j + + + + net.jcip + jcip-annotations + + + + + + org.apache.maven.plugins + maven-surefire-report-plugin + + + + + + + + maven-assembly-plugin + + + ${project.basedir}/../../src/main/assembly/connector-with-dependencies.xml + + ${project.artifactId}-${project.version} + + + + package + + single + + + + + + + maven-jar-plugin + + + ${project.build.outputDirectory}/META-INF/MANIFEST.MF + + + + + org.apache.felix + maven-bundle-plugin + + + bundle-manifest + process-classes + + manifest + + + + + + + Index: extensions/modeshape-clustering/src/main/java/org/modeshape/clustering/ClusteredObservationBus.java new file mode 100644 =================================================================== --- /dev/null (revision 1976) +++ extensions/modeshape-clustering/src/main/java/org/modeshape/clustering/ClusteredObservationBus.java (working copy) @@ -0,0 +1,481 @@ +/* + * ModeShape (http://www.modeshape.org) + * See the COPYRIGHT.txt file distributed with this work for information + * regarding copyright ownership. Some portions may be licensed + * to Red Hat, Inc. under one or more contributor license agreements. + * See the AUTHORS.txt file in the distribution for a full listing of + * individual contributors. + * + * ModeShape is free software. Unless otherwise indicated, all code in ModeShape + * is licensed to you under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * ModeShape is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.modeshape.clustering; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.atomic.AtomicBoolean; +import org.jgroups.Address; +import org.jgroups.Channel; +import org.jgroups.ChannelClosedException; +import org.jgroups.ChannelException; +import org.jgroups.ChannelListener; +import org.jgroups.ChannelNotConnectedException; +import org.jgroups.JChannel; +import org.jgroups.Message; +import org.jgroups.View; +import org.jgroups.conf.ProtocolStackConfigurator; +import org.jgroups.conf.XmlConfigurator; +import org.jgroups.util.Util; +import org.modeshape.common.SystemFailureException; +import org.modeshape.common.util.CheckArg; +import org.modeshape.common.util.Logger; +import org.modeshape.graph.observe.ChangeObservers; +import org.modeshape.graph.observe.Changes; +import org.modeshape.graph.observe.ObservationBus; +import org.modeshape.graph.observe.Observer; + +/** + * An implementation of a cluster-aware {@link ObservationBus}. + */ +public class ClusteredObservationBus implements ObservationBus { + + protected static final Logger LOGGER = Logger.getLogger(ClusteredObservationBus.class); + + /** + * The list of {@link Observer} instances that should receive events from the bus. These Observer objects are all local. + */ + protected final ChangeObservers observers = new ChangeObservers(); + + /** + * The listener for channel changes. + */ + private final Listener listener = new Listener(); + + /** + * The component that will receive the JGroups messages and broadcast them to this bus' observers. + */ + private final Receiver receiver = new Receiver(); + + /** + * Flag that dictates whether this bus has connected to the cluster. + */ + protected final AtomicBoolean isOpen = new AtomicBoolean(false); + + /** + * Flag that dictates whether there are multiple participants in the cluster; if not, then the changes are propogated only to + * the local observers. + */ + protected final AtomicBoolean multipleAddressesInCluster = new AtomicBoolean(false); + + /** + * The configuration for JGroups + */ + private String configuration; + + /** + * The name of the JGroups cluster. + */ + private String clusterName; + + /** + * The JGroups channel to which all {@link #notify(Changes) change notifications} will be sent and from which all changes will + * be received and sent to the observers. + *

+ * It is important that the order of the {@link Changes} instances are maintained across the cluster, and JGroups will do this + * for us as long as we push all local changes into the channel and receive all local/remote changes from the channel. + *

+ */ + private JChannel channel; + + /** + * Get the configuration for JGroups. This configuration may be a string that refers to a resource on the classpath, the path + * of the local configuration, the URL to the configuration file, or the configuration specified using the newer-style XML + * form or older-style string form. + * + * @return the location of the JGroups configuration, or null if no configuration has been defined + */ + public String getConfiguration() { + return configuration; + } + + /** + * Set the JGroups configuration, which may be a string that refers to a resource on the classpath, the path of the local + * configuration, the URL to the configuration file, or the configuration specified using the newer-style XML form or + * older-style string form. + * + * @param configuration the relative path to a classpath resource, path to a local file, URL to the configuration file, or the + * configuration specified using the newer-style XML form or older-style string form. + * @throws IllegalStateException if this method is called after this bus has been {@link #start() started} but before it has + * been {@link #shutdown() shutdown} + */ + public void setConfigurationFile( String configuration ) { + if (channel != null) { + String name = this.clusterName; + throw new IllegalStateException(ClusteringI18n.clusteringChannelIsRunningAndCannotBeChangedUnlessShutdown.text(name)); + } + this.configuration = configuration; + } + + /** + * Get the name of the JGroups cluster. + * + * @return the cluster name, or null if the cluster name was not yet defined + * @see JChannel#connect(String) + */ + public String getClusterName() { + return clusterName; + } + + /** + * Set the name of the JGroups cluster. This must be called with a non-null cluster name before {@link #start()} is called. + * + * @param clusterName Sets clusterName to the specified value. + * @throws IllegalStateException if this method is called after this bus has been {@link #start() started} but before it has + * been {@link #shutdown() shutdown} + * @see JChannel#connect(String) + */ + public void setClusterName( String clusterName ) { + CheckArg.isNotNull(clusterName, "clusterName"); + if (channel != null) { + String name = this.clusterName; + throw new IllegalStateException(ClusteringI18n.clusteringChannelIsRunningAndCannotBeChangedUnlessShutdown.text(name)); + } + this.clusterName = clusterName; + } + + /** + * {@inheritDoc} + * + * @see org.modeshape.graph.observe.ObservationBus#start() + */ + @Override + public synchronized void start() { + if (clusterName == null) { + throw new IllegalStateException(ClusteringI18n.clusterNameRequired.text()); + } + if (channel != null) { + // Disconnect from any previous channel ... + channel.removeChannelListener(listener); + channel.setReceiver(null); + } + try { + if (configuration == null) { + channel = new JChannel(); + } else { + // Try the XML configuration first ... + ProtocolStackConfigurator configurator = null; + InputStream stream = new ByteArrayInputStream(configuration.getBytes()); + try { + configurator = XmlConfigurator.getInstance(stream); + } catch (IOException e) { + // ignore, since the configuration may be of another form ... + } finally { + try { + stream.close(); + } catch (IOException e) { + // ignore this + } + } + if (configurator != null) { + channel = new JChannel(configurator); + } else { + // Otherwise, just try the regular configuration ... + channel = new JChannel(configuration); + } + } + assert channel != null; + // Add a listener through which we'll know what's going on within the cluster ... + channel.addChannelListener(listener); + + // Now connect to the cluster ... + channel.connect(clusterName); + + // Set the receiver through which we'll receive all of the changes ... + channel.setReceiver(receiver); + } catch (ChannelException e) { + throw new IllegalStateException(ClusteringI18n.errorWhileStartingJGroups.text(configuration), e); + } + } + + /** + * {@inheritDoc} + * + * @see org.modeshape.graph.observe.Observer#notify(org.modeshape.graph.observe.Changes) + */ + @Override + public void notify( Changes changes ) { + if (changes == null) return; // do nothing + if (!isOpen.get()) { + // The channel is not open ... + return; + } + if (!multipleAddressesInCluster.get()) { + // We are in clustered mode, but there is only one participant in the cluster (us). + // So short-circuit the cluster and just notify the local observers ... + if (!observers.isEmpty()) { + observers.broadcast(changes); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Received on cluster '{0}' {1} changes in source '{2}' made by {3} from process '{4}' at {5}", + getClusterName(), + changes.getChangeRequests().size(), + changes.getSourceName(), + changes.getUserName(), + changes.getProcessId(), + changes.getTimestamp()); + } + } + return; + } + + // There are multiple participants in the cluster, so send all changes out to JGroups, + // letting JGroups do the ordering of messages... + try { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Sending to cluster '{0}' {1} changes in source '{2}' made by {3} from process '{4}' at {5}", + clusterName, + changes.getChangeRequests().size(), + changes.getSourceName(), + changes.getUserName(), + changes.getProcessId(), + changes.getTimestamp()); + } + byte[] data = Util.objectToByteBuffer(changes); + Message message = new Message(null, null, data); + channel.send(message); + } catch (ChannelClosedException e) { + LOGGER.warn(ClusteringI18n.unableToNotifyChangesBecauseClusterChannelHasClosed, + clusterName, + changes.getChangeRequests().size(), + changes.getSourceName(), + changes.getUserName(), + changes.getProcessId(), + changes.getTimestamp()); + } catch (ChannelNotConnectedException e) { + LOGGER.warn(ClusteringI18n.unableToNotifyChangesBecauseClusterChannelIsNotConnected, + clusterName, + changes.getChangeRequests().size(), + changes.getSourceName(), + changes.getUserName(), + changes.getProcessId(), + changes.getTimestamp()); + } catch (Exception e) { + // Something went wrong here (this should not happen) ... + String msg = ClusteringI18n.errorSerializingChanges.text(clusterName, + changes.getChangeRequests().size(), + changes.getSourceName(), + changes.getUserName(), + changes.getProcessId(), + changes.getTimestamp(), + changes); + throw new SystemFailureException(msg, e); + } + } + + /** + * {@inheritDoc} + * + * @see org.modeshape.graph.observe.Observable#register(org.modeshape.graph.observe.Observer) + */ + @Override + public boolean register( Observer observer ) { + return observers.register(observer); + } + + /** + * {@inheritDoc} + * + * @see org.modeshape.graph.observe.Observable#unregister(org.modeshape.graph.observe.Observer) + */ + @Override + public boolean unregister( Observer observer ) { + return observers.unregister(observer); + } + + /** + * {@inheritDoc} + * + * @see org.modeshape.graph.observe.ObservationBus#hasObservers() + */ + @Override + public boolean hasObservers() { + return !observers.isEmpty(); + } + + /** + * {@inheritDoc} + * + * @see org.modeshape.graph.observe.ObservationBus#shutdown() + */ + @Override + public synchronized void shutdown() { + if (channel != null) { + // Mark this as not accepting any more ... + isOpen.set(false); + try { + // Disconnect from the channel and close it ... + channel.removeChannelListener(listener); + channel.setReceiver(null); + channel.close(); + } finally { + channel = null; + // Now that we're not receiving any more messages, shut down the list of observers ... + observers.shutdown(); + } + } + } + + protected class Receiver implements org.jgroups.Receiver { + private byte[] state; + + /** + * {@inheritDoc} + * + * @see org.jgroups.MembershipListener#block() + */ + @Override + public void block() { + isOpen.set(false); + } + + /** + * {@inheritDoc} + * + * @see org.jgroups.MessageListener#receive(org.jgroups.Message) + */ + @Override + public void receive( Message message ) { + if (!observers.isEmpty()) { + // We have at least one observer ... + try { + // Deserialize the changes ... + Changes changes = (Changes)Util.objectFromByteBuffer(message.getBuffer()); + // and broadcast to all of our observers ... + observers.broadcast(changes); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Received on cluster '{0}' {1} changes in source '{2}' made by {3} from process '{4}' at {5}", + getClusterName(), + changes.getChangeRequests().size(), + changes.getSourceName(), + changes.getUserName(), + changes.getProcessId(), + changes.getTimestamp()); + } + } catch (Exception e) { + // Something went wrong here (this should not happen) ... + String msg = ClusteringI18n.errorDeserializingChanges.text(getClusterName()); + throw new SystemFailureException(msg, e); + } + } + } + + /** + * {@inheritDoc} + * + * @see org.jgroups.MessageListener#getState() + */ + @Override + public byte[] getState() { + return state; + } + + /** + * {@inheritDoc} + * + * @see org.jgroups.MessageListener#setState(byte[]) + */ + @Override + public void setState( byte[] state ) { + this.state = state; + } + + /** + * {@inheritDoc} + * + * @see org.jgroups.MembershipListener#suspect(org.jgroups.Address) + */ + @Override + public void suspect( Address suspectedMbr ) { + LOGGER.error(ClusteringI18n.memberOfClusterIsSuspect, getClusterName(), suspectedMbr); + } + + /** + * {@inheritDoc} + * + * @see org.jgroups.MembershipListener#viewAccepted(org.jgroups.View) + */ + @Override + public void viewAccepted( View newView ) { + if (newView.getMembers().size() > 1) { + multipleAddressesInCluster.compareAndSet(false, true); + } else { + multipleAddressesInCluster.compareAndSet(true, false); + } + LOGGER.debug("Members of '{0}' cluster have changed: {1}", getClusterName(), newView); + } + } + + protected class Listener implements ChannelListener { + /** + * {@inheritDoc} + * + * @see org.jgroups.ChannelListener#channelClosed(org.jgroups.Channel) + */ + @Override + public void channelClosed( Channel channel ) { + isOpen.set(false); + } + + /** + * {@inheritDoc} + * + * @see org.jgroups.ChannelListener#channelConnected(org.jgroups.Channel) + */ + @Override + public void channelConnected( Channel channel ) { + isOpen.set(true); + } + + /** + * {@inheritDoc} + * + * @see org.jgroups.ChannelListener#channelDisconnected(org.jgroups.Channel) + */ + @Override + public void channelDisconnected( Channel channel ) { + isOpen.set(false); + } + + /** + * {@inheritDoc} + * + * @see org.jgroups.ChannelListener#channelReconnected(org.jgroups.Address) + */ + @Override + public void channelReconnected( Address addr ) { + isOpen.set(true); + } + + /** + * {@inheritDoc} + * + * @see org.jgroups.ChannelListener#channelShunned() + */ + @Override + public void channelShunned() { + isOpen.set(false); + } + } +} Index: extensions/modeshape-clustering/src/main/java/org/modeshape/clustering/ClusteringI18n.java new file mode 100644 =================================================================== --- /dev/null (revision 1976) +++ extensions/modeshape-clustering/src/main/java/org/modeshape/clustering/ClusteringI18n.java (working copy) @@ -0,0 +1,49 @@ +/* + * ModeShape (http://www.modeshape.org) + * See the COPYRIGHT.txt file distributed with this work for information + * regarding copyright ownership. Some portions may be licensed + * to Red Hat, Inc. under one or more contributor license agreements. +* See the AUTHORS.txt file in the distribution for a full listing of +* individual contributors. + * + * ModeShape is free software. Unless otherwise indicated, all code in ModeShape + * is licensed to you under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * ModeShape is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.modeshape.clustering; + +import org.modeshape.common.i18n.I18n; + +/** + * The internationalized string constants for the org.modeshape.connector.filesystem* packages. + */ +public final class ClusteringI18n { + + public static I18n errorWhileStartingJGroups; + public static I18n clusterNameRequired; + public static I18n unableToNotifyChangesBecauseClusterChannelHasClosed; + public static I18n unableToNotifyChangesBecauseClusterChannelIsNotConnected; + public static I18n errorSerializingChanges; + public static I18n errorDeserializingChanges; + public static I18n clusteringChannelIsRunningAndCannotBeChangedUnlessShutdown; + public static I18n memberOfClusterIsSuspect; + + static { + try { + I18n.initialize(ClusteringI18n.class); + } catch (final Exception err) { + System.err.println(err); + } + } +} Index: extensions/modeshape-clustering/src/main/resources/org/modeshape/clustering/ClusteringI18n.properties new file mode 100644 =================================================================== --- /dev/null (revision 1976) +++ extensions/modeshape-clustering/src/main/resources/org/modeshape/clustering/ClusteringI18n.properties (working copy) @@ -0,0 +1,31 @@ +# +# ModeShape (http://www.modeshape.org) +# See the COPYRIGHT.txt file distributed with this work for information +# regarding copyright ownership. Some portions may be licensed +# to Red Hat, Inc. under one or more contributor license agreements. +# See the AUTHORS.txt file in the distribution for a full listing of +# individual contributors. +# +# ModeShape is free software. Unless otherwise indicated, all code in ModeShape +# is licensed to you under the terms of the GNU Lesser General Public License as +# published by the Free Software Foundation; either version 2.1 of +# the License, or (at your option) any later version. +# +# ModeShape is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this software; if not, write to the Free +# Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA +# 02110-1301 USA, or see the FSF site: http://www.fsf.org. +# +errorWhileStartingJGroups = Error while starting JGroups with configuration: {0} +clusterNameRequired = A cluster name is required for JGroups-based clustering +unableToNotifyChangesBecauseClusterChannelHasClosed = Cluster channel '{0}' has closed, so unable to send {1} changes in source '{2}' made by {3} from process '{4}' at {5} +unableToNotifyChangesBecauseClusterChannelIsNotConnected = Cluster channel '{0}' is not connected, so unable to send {1} changes in source '{2}' made by {3} from process '{4}' at {5} +errorSerializingChanges = Error in channel '{0}' while serializing {1} changes in source '{2}' made by {3} from process '{4}' at {5}: {6} +errorDeserializingChanges = Error deserializing changes obtained from channel '{0}' +clusteringChannelIsRunningAndCannotBeChangedUnlessShutdown = The cluster channel '{0}' is running and cannot be changed unless shut down +memberOfClusterIsSuspect = Member of '{0}' cluster is suspect at '{1}' Index: extensions/modeshape-clustering/src/test/java/org/modeshape/clustering/ClusteredObservationBusTest.java new file mode 100644 =================================================================== --- /dev/null (revision 1976) +++ extensions/modeshape-clustering/src/test/java/org/modeshape/clustering/ClusteredObservationBusTest.java (working copy) @@ -0,0 +1,47 @@ +/* + * ModeShape (http://www.modeshape.org) + * See the COPYRIGHT.txt file distributed with this work for information + * regarding copyright ownership. Some portions may be licensed + * to Red Hat, Inc. under one or more contributor license agreements. + * See the AUTHORS.txt file in the distribution for a full listing of + * individual contributors. + * + * ModeShape is free software. Unless otherwise indicated, all code in ModeShape + * is licensed to you under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * ModeShape is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.modeshape.clustering; + +import org.junit.Before; +import org.junit.Test; + +public class ClusteredObservationBusTest { + + private ClusteredObservationBus bus; + + @Before + public void beforeEach() { + bus = new ClusteredObservationBus(); + } + + @Test( expected = IllegalArgumentException.class ) + public void shouldNotAllowSettingNullClusterName() { + bus.setClusterName(null); + } + + @Test + public void shouldAllowSettingBlankClusterName() { + bus.setClusterName(""); + } +} Index: extensions/modeshape-clustering/src/test/java/org/modeshape/clustering/ClusteringI18nTest.java new file mode 100644 =================================================================== --- /dev/null (revision 1976) +++ extensions/modeshape-clustering/src/test/java/org/modeshape/clustering/ClusteringI18nTest.java (working copy) @@ -0,0 +1,33 @@ +/* + * ModeShape (http://www.modeshape.org) + * See the COPYRIGHT.txt file distributed with this work for information + * regarding copyright ownership. Some portions may be licensed + * to Red Hat, Inc. under one or more contributor license agreements. + * See the AUTHORS.txt file in the distribution for a full listing of + * individual contributors. + * + * ModeShape is free software. Unless otherwise indicated, all code in ModeShape + * is licensed to you under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * ModeShape is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.modeshape.clustering; + +import org.modeshape.common.AbstractI18nTest; + +public class ClusteringI18nTest extends AbstractI18nTest { + + public ClusteringI18nTest() { + super(ClusteringI18n.class); + } +} Index: extensions/modeshape-clustering/src/test/resources/log4j.properties new file mode 100644 =================================================================== --- /dev/null (revision 1976) +++ extensions/modeshape-clustering/src/test/resources/log4j.properties (working copy) @@ -0,0 +1,19 @@ +# Direct log messages to stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %m%n + +# Root logger option +log4j.rootLogger=INFO, stdout + +# Set up the default logging to be INFO level, then override specific units +log4j.logger.org.modeshape=INFO + +# Hibernate +log4j.logger.org.hibernate=ERROR +log4j.logger.org.hibernate.hql=ERROR +#log4j.logger.org.hibernate.tool.hbm2ddl=DEBUG +# C3P0 +log4j.logger.com.mchange=ERROR + Index: modeshape-common/src/main/java/org/modeshape/common/component/ComponentLibrary.java =================================================================== --- modeshape-common/src/main/java/org/modeshape/common/component/ComponentLibrary.java (revision 1976) +++ modeshape-common/src/main/java/org/modeshape/common/component/ComponentLibrary.java (working copy) @@ -220,6 +220,27 @@ public class ComponentLibrary } } + public boolean removeAll() { + try { + this.lock.lock(); + this.configs.clear(); + this.instances.clear(); + return true; + } finally { + this.lock.unlock(); + } + } + + public boolean removeAllAndAdd( ConfigType config ) { + try { + this.lock.lock(); + removeAll(); + return add(config); + } finally { + this.lock.unlock(); + } + } + /** * Refresh the instances by attempting to re-instantiate each registered configuration. * @@ -282,6 +303,7 @@ public class ComponentLibrary reflection.invokeSetterMethodOnTarget(entry.getKey(), newInstance, entry.getValue()); } } + configure(newInstance, config); } catch (Throwable e) { throw new SystemFailureException(e); } @@ -291,6 +313,11 @@ public class ComponentLibrary return newInstance; } + protected void configure( ComponentType newInstance, + ConfigType configuration ) throws Exception { + // do nothing + } + /** * Method that instantiates the supplied class. This method can be overridden by subclasses that may need to wrap or adapt the * instance to be a ComponentType. Index: modeshape-graph/src/main/java/org/modeshape/graph/Graph.java =================================================================== --- modeshape-graph/src/main/java/org/modeshape/graph/Graph.java (revision 1976) +++ modeshape-graph/src/main/java/org/modeshape/graph/Graph.java (working copy) @@ -7041,6 +7041,34 @@ public class Graph { return getLocation().hashCode(); } + /** + * {@inheritDoc} + * + * @see java.lang.Object#equals(java.lang.Object) + */ + @Override + public boolean equals( Object obj ) { + if (obj instanceof SubgraphResults) { + SubgraphResults that = (SubgraphResults)obj; + return getLocation().equals(that.getLocation()) && request.equals(that.request); + } else if (obj instanceof Subgraph) { + Subgraph that = (Subgraph)obj; + if (!getLocation().equals(that.getLocation())) return false; + Iterator thisIter = this.iterator(); + Iterator thatIter = that.iterator(); + while (thisIter.hasNext() && thatIter.hasNext()) { + SubgraphNode thisNode = thisIter.next(); + SubgraphNode thatNode = thatIter.next(); + if (!thisNode.getLocation().equals(thatNode.getLocation())) return false; + if (!thisNode.getProperties().equals(thatNode.getProperties())) return false; + if (!thisNode.getChildren().equals(thatNode.getChildren())) return false; + } + if (thisIter.hasNext() || thatIter.hasNext()) return false; + return true; + } + return false; + } + @Override public String toString() { return "Subgraph\n" + getToString(getContext()); Index: modeshape-graph/src/main/java/org/modeshape/graph/ModeShapeLexicon.java =================================================================== --- modeshape-graph/src/main/java/org/modeshape/graph/ModeShapeLexicon.java (revision 1976) +++ modeshape-graph/src/main/java/org/modeshape/graph/ModeShapeLexicon.java (working copy) @@ -48,7 +48,12 @@ public class ModeShapeLexicon { public static final Name RESOURCE = new BasicName(Namespace.URI, "resource"); public static final Name ROOT = new BasicName(Namespace.URI, "root"); public static final Name TIME_TO_EXPIRE = new BasicName(Namespace.URI, "timeToExpire"); - public static final Name NAMESPACE_URI = new BasicName(Namespace.URI, "uri"); + public static final Name URI = new BasicName(Namespace.URI, "uri"); + /** + * @deprecated Use {@link #URI} instead. + */ + @Deprecated + public static final Name NAMESPACE_URI = URI; public static final Name WORKSPACES = new BasicName(Namespace.URI, "workspaces"); public static final Name SOURCE_NAME = new BasicName(Namespace.URI, "source"); Index: modeshape-graph/src/main/java/org/modeshape/graph/observe/LocalObservationBus.java new file mode 100644 =================================================================== --- /dev/null (revision 1976) +++ modeshape-graph/src/main/java/org/modeshape/graph/observe/LocalObservationBus.java (working copy) @@ -0,0 +1,103 @@ +/* + * ModeShape (http://www.modeshape.org) + * See the COPYRIGHT.txt file distributed with this work for information + * regarding copyright ownership. Some portions may be licensed + * to Red Hat, Inc. under one or more contributor license agreements. + * See the AUTHORS.txt file in the distribution for a full listing of + * individual contributors. + * + * ModeShape is free software. Unless otherwise indicated, all code in ModeShape + * is licensed to you under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * ModeShape is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.modeshape.graph.observe; + +import net.jcip.annotations.ThreadSafe; +import org.modeshape.common.util.Logger; +import org.modeshape.graph.GraphI18n; + +/** + * A simple {@link Observer} that is itself {@link Observable}. This class essentially multiplexes the events from a single + * Observable to disseminate each event to multiple Observers. + */ +@ThreadSafe +public class LocalObservationBus implements ObservationBus { + private final ChangeObservers observers = new ChangeObservers(); + private static final Logger LOGGER = Logger.getLogger(LocalObservationBus.class); + + public LocalObservationBus() { + } + + /** + * {@inheritDoc} + * + * @see org.modeshape.graph.observe.ObservationBus#start() + */ + public void start() { + // nothing to do + } + + /** + * {@inheritDoc} + * + * @see org.modeshape.graph.observe.Observable#register(org.modeshape.graph.observe.Observer) + */ + public boolean register( Observer observer ) { + if (observer == null) return false; + return observers.register(observer); + } + + /** + * {@inheritDoc} + * + * @see org.modeshape.graph.observe.Observable#unregister(org.modeshape.graph.observe.Observer) + */ + public boolean unregister( Observer observer ) { + return observers.unregister(observer); + } + + /** + * {@inheritDoc} + * + * @see org.modeshape.graph.observe.Observer#notify(org.modeshape.graph.observe.Changes) + */ + public void notify( Changes changes ) { + if (changes != null) { + // Broadcast the changes to the registered observers ... + try { + observers.broadcast(changes); + } catch (RuntimeException t) { + LOGGER.error(t, GraphI18n.errorNotifyingObserver, t.getLocalizedMessage()); + } + } + } + + /** + * {@inheritDoc} + * + * @see org.modeshape.graph.observe.ObservationBus#hasObservers() + */ + public boolean hasObservers() { + return !observers.isEmpty(); + } + + /** + * {@inheritDoc} + * + * @see org.modeshape.graph.observe.ObservationBus#shutdown() + */ + public void shutdown() { + observers.shutdown(); + } +} Index: modeshape-graph/src/main/java/org/modeshape/graph/observe/ObservationBus.java =================================================================== --- modeshape-graph/src/main/java/org/modeshape/graph/observe/ObservationBus.java (revision 1976) +++ modeshape-graph/src/main/java/org/modeshape/graph/observe/ObservationBus.java (working copy) @@ -24,69 +24,28 @@ package org.modeshape.graph.observe; import net.jcip.annotations.ThreadSafe; -import org.modeshape.common.util.Logger; -import org.modeshape.graph.GraphI18n; /** - * A simple {@link Observer} that is itself {@link Observable}. This class essentially multiplexes the events from a single + * A simple {@link Observer} that is itself {@link Observable}. This interface essentially multiplexes the events from a single * Observable to disseminate each event to multiple Observers. */ @ThreadSafe -public class ObservationBus implements Observable, Observer { - private final ChangeObservers observers = new ChangeObservers(); - private static final Logger LOGGER = Logger.getLogger(ObservationBus.class); - - public ObservationBus() { - } - - /** - * {@inheritDoc} - * - * @see org.modeshape.graph.observe.Observable#register(org.modeshape.graph.observe.Observer) - */ - public boolean register( Observer observer ) { - if (observer == null) return false; - return observers.register(observer); - } +public interface ObservationBus extends Observable, Observer { /** - * {@inheritDoc} - * - * @see org.modeshape.graph.observe.Observable#unregister(org.modeshape.graph.observe.Observer) - */ - public boolean unregister( Observer observer ) { - return observers.unregister(observer); - } - - /** - * {@inheritDoc} - * - * @see org.modeshape.graph.observe.Observer#notify(org.modeshape.graph.observe.Changes) + * Prepare this bus for operation by starting any resources. */ - public void notify( Changes changes ) { - if (changes != null) { - // Broadcast the changes to the registered observers ... - try { - observers.broadcast(changes); - } catch (RuntimeException t) { - LOGGER.error(t, GraphI18n.errorNotifyingObserver, t.getLocalizedMessage()); - } - } - } + public void start(); /** * Determine whether this particular bus currently has any observers. * * @return true if there is at least one observer, or false otherwise */ - public boolean hasObservers() { - return !observers.isEmpty(); - } + public boolean hasObservers(); /** * Unregister all registered observers, and mark this as no longer accepting new registered observers. */ - public void shutdown() { - observers.shutdown(); - } + public void shutdown(); } Index: modeshape-graph/src/main/java/org/modeshape/graph/property/basic/GraphNamespaceRegistry.java =================================================================== --- modeshape-graph/src/main/java/org/modeshape/graph/property/basic/GraphNamespaceRegistry.java (revision 1976) +++ modeshape-graph/src/main/java/org/modeshape/graph/property/basic/GraphNamespaceRegistry.java (working copy) @@ -53,7 +53,7 @@ import org.modeshape.graph.property.ValueFactory; @NotThreadSafe public class GraphNamespaceRegistry implements NamespaceRegistry { - public static final Name DEFAULT_URI_PROPERTY_NAME = ModeShapeLexicon.NAMESPACE_URI; + public static final Name DEFAULT_URI_PROPERTY_NAME = ModeShapeLexicon.URI; public static final String GENERATED_PREFIX = "ns"; private SimpleNamespaceRegistry cache; Index: modeshape-integration-tests/pom.xml =================================================================== --- modeshape-integration-tests/pom.xml (revision 1976) +++ modeshape-integration-tests/pom.xml (working copy) @@ -41,7 +41,7 @@ + --> junit junit @@ -75,6 +75,12 @@ test + + org.modeshape + modeshape-clustering + ${project.version} + test + org.modeshape modeshape-connector-jbosscache Index: modeshape-integration-tests/src/test/java/org/modeshape/test/integration/ClusteringTest.java new file mode 100644 =================================================================== --- /dev/null (revision 1976) +++ modeshape-integration-tests/src/test/java/org/modeshape/test/integration/ClusteringTest.java (working copy) @@ -0,0 +1,182 @@ +/* + * ModeShape (http://www.modeshape.org) + * See the COPYRIGHT.txt file distributed with this work for information + * regarding copyright ownership. Some portions may be licensed + * to Red Hat, Inc. under one or more contributor license agreements. + * See the AUTHORS.txt file in the distribution for a full listing of + * individual contributors. + * + * ModeShape is free software. Unless otherwise indicated, all code in ModeShape + * is licensed to you under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * ModeShape is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.modeshape.test.integration; + +import static org.hamcrest.core.Is.is; +import static org.hamcrest.core.IsNull.notNullValue; +import static org.junit.Assert.assertThat; +import java.io.InputStream; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; +import javax.jcr.ImportUUIDBehavior; +import javax.jcr.Node; +import javax.jcr.Repository; +import javax.jcr.RepositoryException; +import javax.jcr.Session; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.modeshape.common.util.FileUtil; +import org.modeshape.connector.store.jpa.JpaSource; +import org.modeshape.jcr.JcrConfiguration; +import org.modeshape.jcr.JcrEngine; +import org.modeshape.jcr.ModeShapeRoles; +import org.modeshape.jcr.JcrRepository.Option; + +public class ClusteringTest { + + private static JcrConfiguration configuration; + private static JcrEngine engine1; + private static JcrEngine engine2; + private static JcrEngine engine3; + private static List sessions = new ArrayList(); + + @BeforeClass + public static void beforeEach() throws Exception { + // Delete the database files, if there are any ... + FileUtil.delete("target/db"); + + // Set up the database and keep this connection open ... + String url = "jdbc:hsqldb:file:target/db/ClusteredObservationBusTest"; + String username = "sa"; + String password = ""; + + configuration = new JcrConfiguration(); + configuration.repositorySource("car-source") + .usingClass(JpaSource.class) + .setDescription("The automobile content") + .setProperty("dialect", "org.hibernate.dialect.HSQLDialect") + .setProperty("driverClassName", "org.hsqldb.jdbcDriver") + .setProperty("url", url) + .setProperty("username", username) + .setProperty("password", password) + .setProperty("maximumConnectionsInPool", "2") + .setProperty("minimumConnectionsInPool", "1") + .setProperty("numberOfConnectionsToAcquireAsNeeded", "1") + .setProperty("maximumSizeOfStatementCache", "100") + .setProperty("maximumConnectionIdleTimeInSeconds", "10") + .setProperty("referentialIntegrityEnforced", "true") + .setProperty("largeValueSizeInBytes", "150") + .setProperty("autoGenerateSchema", "update") + .setProperty("retryLimit", "3") + .setProperty("showSql", "false"); + configuration.repository("cars") + .setSource("car-source") + .registerNamespace("car", "http://www.modeshape.org/examples/cars/1.0") + .addNodeTypes(resourceUrl("cars.cnd")) + .setOption(Option.ANONYMOUS_USER_ROLES, ModeShapeRoles.ADMIN); + configuration.clustering().setProperty("clusterName", "MyCluster");// .setProperty("configuration", ""); + + // Create an engine and use it to populate the source ... + engine1 = configuration.build(); + engine1.start(); + + Repository repository = engine1.getRepository("cars"); + + // Use a session to load the contents ... + Session session = repository.login(); + try { + InputStream stream = resourceStream("io/cars-system-view.xml"); + try { + session.getWorkspace().importXML("/", stream, ImportUUIDBehavior.IMPORT_UUID_CREATE_NEW); + } catch (Throwable t) { + t.printStackTrace(); + } finally { + stream.close(); + } + } finally { + session.logout(); + } + + // Make sure the data was imported ... + session = repository.login(); + try { + Node node = session.getRootNode().getNode("Cars/Hybrid/Toyota Highlander"); + assertThat(node, is(notNullValue())); + assertThat(session.getRootNode().getNodes().getSize(), is(2L)); // "Cars" and "jcr:system" + } finally { + session.logout(); + } + + // Start the other engines ... + engine2 = configuration.build(); + engine2.start(); + + engine3 = configuration.build(); + engine3.start(); + } + + @AfterClass + public static void afterAll() throws Exception { + // Close all of the sessions ... + for (Session session : sessions) { + if (session.isLive()) session.logout(); + } + sessions.clear(); + + // Shut down the engines ... + if (engine1 != null) engine1.shutdown(); + if (engine2 != null) engine2.shutdown(); + if (engine3 != null) engine3.shutdown(); + engine1 = null; + engine2 = null; + engine3 = null; + } + + // ---------------------------------------------------------------------------------------------------------------- + // Tests + // ---------------------------------------------------------------------------------------------------------------- + + @Test + public void shouldAllowMultipleEnginesToAccessSameDatabase() throws Exception { + Session session1 = sessionFrom(engine1); + Node node1 = session1.getRootNode().getNode("Cars/Hybrid/Toyota Highlander"); + assertThat(node1, is(notNullValue())); + + Session session2 = sessionFrom(engine2); + Node node = session2.getRootNode().getNode("Cars/Hybrid/Toyota Highlander"); + assertThat(node, is(notNullValue())); + } + + // ---------------------------------------------------------------------------------------------------------------- + // Utility Methods + // ---------------------------------------------------------------------------------------------------------------- + + protected Session sessionFrom( JcrEngine engine ) throws RepositoryException { + Repository repository = engine.getRepository("cars"); + Session session = repository.login(); + sessions.add(session); + return session; + } + + protected static URL resourceUrl( String name ) { + return ClusteringTest.class.getClassLoader().getResource(name); + } + + protected static InputStream resourceStream( String name ) { + return ClusteringTest.class.getClassLoader().getResourceAsStream(name); + } + +} Index: modeshape-jcr/src/main/java/org/modeshape/jcr/JcrEngine.java =================================================================== --- modeshape-jcr/src/main/java/org/modeshape/jcr/JcrEngine.java (revision 1976) +++ modeshape-jcr/src/main/java/org/modeshape/jcr/JcrEngine.java (working copy) @@ -253,7 +253,7 @@ public class JcrEngine extends ModeShapeEngine implements Repositories { Node namespacesNode = subgraph.getNode(ModeShapeLexicon.NAMESPACES); if (namespacesNode != null) { GraphNamespaceRegistry registry = new GraphNamespaceRegistry(configuration, namespacesNode.getLocation().getPath(), - ModeShapeLexicon.NAMESPACE_URI); + ModeShapeLexicon.URI); context = context.with(registry); } Index: modeshape-jcr/src/main/java/org/modeshape/jcr/JcrRepository.java =================================================================== --- modeshape-jcr/src/main/java/org/modeshape/jcr/JcrRepository.java (revision 1976) +++ modeshape-jcr/src/main/java/org/modeshape/jcr/JcrRepository.java (working copy) @@ -540,7 +540,7 @@ public class JcrRepository implements Repository { // Create the namespace registry and corresponding execution context. // Note that this persistent registry has direct access to the system workspace. - Name uriProperty = ModeShapeLexicon.NAMESPACE_URI; + Name uriProperty = ModeShapeLexicon.URI; PathFactory pathFactory = executionContext.getValueFactories().getPathFactory(); Path systemPath = pathFactory.create(JcrLexicon.SYSTEM); Path namespacesPath = pathFactory.create(systemPath, ModeShapeLexicon.NAMESPACES); Index: modeshape-jcr/src/main/java/org/modeshape/jcr/ModeShapeLexicon.java =================================================================== --- modeshape-jcr/src/main/java/org/modeshape/jcr/ModeShapeLexicon.java (revision 1976) +++ modeshape-jcr/src/main/java/org/modeshape/jcr/ModeShapeLexicon.java (working copy) @@ -36,16 +36,15 @@ public class ModeShapeLexicon extends org.modeshape.repository.ModeShapeLexicon public static final Name BASE = new BasicName(Namespace.URI, "base"); public static final Name EXPIRATION_DATE = new BasicName(Namespace.URI, "expirationDate"); public static final Name IS_HELD_BY_SESSION = new BasicName(Namespace.URI, "isHeldBySession"); - public static final Name IS_SESSION_SCOPED = new BasicName(Namespace.URI, "isSessionScoped"); - public static final Name LOCK = new BasicName(Namespace.URI, "lock"); + public static final Name IS_SESSION_SCOPED = new BasicName(Namespace.URI, "isSessionScoped"); + public static final Name LOCK = new BasicName(Namespace.URI, "lock"); public static final Name LOCKED_UUID = new BasicName(Namespace.URI, "lockedUuid"); public static final Name LOCKING_SESSION = new BasicName(Namespace.URI, "lockingSession"); - public static final Name LOCKS = new BasicName(Namespace.URI, "locks"); + public static final Name LOCKS = new BasicName(Namespace.URI, "locks"); public static final Name NAMESPACE = new BasicName(Namespace.URI, "namespace"); public static final Name NODE_TYPES = new BasicName(Namespace.URI, "nodeTypes"); public static final Name REPOSITORIES = new BasicName(Namespace.URI, "repositories"); public static final Name SYSTEM = new BasicName(Namespace.URI, "system"); - public static final Name URI = new BasicName(Namespace.URI, "uri"); public static final Name VERSION_STORAGE = new BasicName(Namespace.URI, "versionStorage"); - public static final Name WORKSPACE = new BasicName(Namespace.URI, "workspace"); + public static final Name WORKSPACE = new BasicName(Namespace.URI, "workspace"); } Index: modeshape-jcr/src/test/java/org/modeshape/jcr/JcrConfigurationTest.java =================================================================== --- modeshape-jcr/src/test/java/org/modeshape/jcr/JcrConfigurationTest.java (revision 1976) +++ modeshape-jcr/src/test/java/org/modeshape/jcr/JcrConfigurationTest.java (working copy) @@ -383,7 +383,7 @@ public class JcrConfigurationTest { assertThat(subgraph.getNode("/mode:repositories/Car Repository/mode:namespaces").getChildren(), hasChild(segment("modetest"))); assertThat(subgraph.getNode("/mode:repositories/Car Repository/mode:namespaces/modetest"), - hasProperty(ModeShapeLexicon.NAMESPACE_URI, "http://www.modeshape.org/test/1.0")); + hasProperty(ModeShapeLexicon.URI, "http://www.modeshape.org/test/1.0")); // Initialize IDTrust and a policy file (which defines the "modeshape-jcr" login config name) String configFile = "security/jaas.conf.xml"; Index: modeshape-repository/src/main/java/org/modeshape/repository/ModeShapeConfiguration.java =================================================================== --- modeshape-repository/src/main/java/org/modeshape/repository/ModeShapeConfiguration.java (revision 1976) +++ modeshape-repository/src/main/java/org/modeshape/repository/ModeShapeConfiguration.java (working copy) @@ -56,6 +56,7 @@ import org.modeshape.graph.Workspace; import org.modeshape.graph.connector.RepositorySource; import org.modeshape.graph.connector.inmemory.InMemoryRepositorySource; import org.modeshape.graph.mimetype.MimeTypeDetector; +import org.modeshape.graph.observe.ObservationBus; import org.modeshape.graph.property.Name; import org.modeshape.graph.property.NamespaceRegistry; import org.modeshape.graph.property.Path; @@ -101,6 +102,7 @@ public class ModeShapeConfiguration { private final Map> sequencerDefinitions = new HashMap>(); private final Map> repositorySourceDefinitions = new HashMap>(); private final Map> mimeTypeDetectorDefinitions = new HashMap>(); + private ClusterDefinition clusterDefinition; /** * Create a new configuration, using a default-constructed {@link ExecutionContext}. @@ -283,8 +285,9 @@ public class ModeShapeConfiguration { graph.importXmlFrom(configurationFileInputStream).skippingRootElement(true).into(pathToParent); // The file was imported successfully, so now create the content information ... - configurationContent = new ConfigurationDefinition(configurationContent.getName(), source, null, pathToParent, context, - null); + configurationContent = configurationContent.with(pathToParent) + .with(source) + .withWorkspace(source.getDefaultWorkspaceName()); return this; } @@ -348,6 +351,8 @@ public class ModeShapeConfiguration { workspace = graph.createWorkspace().named(workspaceName); } assert workspace.getRoot() != null; + } else { + workspaceName = graph.getCurrentWorkspaceName(); // will be the default } // Verify the path ... @@ -356,8 +361,7 @@ public class ModeShapeConfiguration { assert parent != null; // Now create the content information ... - configurationContent = new ConfigurationDefinition(configurationContent.getName(), source, workspaceName, path, context, - null); + configurationContent = configurationContent.with(source).withWorkspace(workspaceName).with(path); return this; } @@ -675,6 +679,15 @@ public class ModeShapeConfiguration { } /** + * Obtain the definition for this engine's clustering. If no clustering definition exists, one will be created. + * + * @return the clustering definition; never null + */ + public ClusterDefinition clustering() { + return clusterDefinition(this); + } + + /** * Convenience method to make the code that sets up this configuration easier to read. This method simply returns this object. * * @return this configuration component; never null @@ -950,6 +963,15 @@ public class ModeShapeConfiguration { } /** + * Interface used to set up and define the cluster configuration. + * + * @param the type of the configuration component that owns this definition object + */ + public interface ClusterDefinition + extends Returnable, SetProperties>, Removable { + } + + /** * Interface used to set up and define a MIME type detector instance. * * @param the type of the configuration component that owns this definition object @@ -1094,6 +1116,21 @@ public class ModeShapeConfiguration { return definition; } + /** + * Utility method to construct a definition object for the clustering with the supplied name and return type. + * + * @param the type of the return object + * @param returnObject the return object + * @return the definition for the clustering; never null + */ + @SuppressWarnings( "unchecked" ) + protected ClusterDefinition clusterDefinition( ReturnType returnObject ) { + if (clusterDefinition == null) { + clusterDefinition = new ClusterBuilder(returnObject, changes(), path(), ModeShapeLexicon.CLUSTERING); + } + return (ClusterDefinition)clusterDefinition; + } + protected static class BaseReturnable implements Returnable { protected final ReturnType returnObject; @@ -1463,6 +1500,23 @@ public class ModeShapeConfiguration { } } + protected static class ClusterBuilder + extends GraphComponentBuilder, ObservationBus> + implements ClusterDefinition { + + protected ClusterBuilder( ReturnType returnObject, + Graph.Batch batch, + Path path, + Name... names ) { + super(returnObject, batch, path, names); + } + + @Override + protected ClusterDefinition thisType() { + return this; + } + } + /** * Representation of the current configuration content. */ @@ -1483,6 +1537,7 @@ public class ModeShapeConfiguration { ExecutionContext context, ClassLoaderFactory classLoaderFactory ) { assert configurationName != null; + assert source != null; this.name = configurationName; this.source = source; this.path = path != null ? path : RootPath.INSTANCE; @@ -1581,6 +1636,18 @@ public class ModeShapeConfiguration { * @return the new configuration */ public ConfigurationDefinition with( ClassLoaderFactory classLoaderFactory ) { + CheckArg.isNotNull(source, "source"); + return new ConfigurationDefinition(name, source, workspace, path, context, classLoaderFactory); + } + + /** + * Return a copy of this configuration that uses the supplied repository source instead of this object's + * {@link #getRepositorySource() repository source}. + * + * @param source the repository source containing the configuration + * @return the new configuration + */ + public ConfigurationDefinition with( RepositorySource source ) { return new ConfigurationDefinition(name, source, workspace, path, context, classLoaderFactory); } Index: modeshape-repository/src/main/java/org/modeshape/repository/ModeShapeEngine.java =================================================================== --- modeshape-repository/src/main/java/org/modeshape/repository/ModeShapeEngine.java (revision 1976) +++ modeshape-repository/src/main/java/org/modeshape/repository/ModeShapeEngine.java (working copy) @@ -21,6 +21,16 @@ */ package org.modeshape.repository; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import net.jcip.annotations.Immutable; import org.modeshape.common.collection.Problem; import org.modeshape.common.collection.Problems; @@ -44,28 +54,19 @@ import org.modeshape.graph.mimetype.ExtensionBasedMimeTypeDetector; import org.modeshape.graph.mimetype.MimeTypeDetector; import org.modeshape.graph.mimetype.MimeTypeDetectorConfig; import org.modeshape.graph.mimetype.MimeTypeDetectors; -import org.modeshape.graph.observe.ObservationBus; import org.modeshape.graph.property.Name; import org.modeshape.graph.property.Path; import org.modeshape.graph.property.PathExpression; import org.modeshape.graph.property.PathNotFoundException; import org.modeshape.graph.property.Property; +import org.modeshape.repository.cluster.ClusteringConfig; +import org.modeshape.repository.cluster.ClusteringService; import org.modeshape.repository.sequencer.SequencerConfig; import org.modeshape.repository.sequencer.SequencingService; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; - /** - * A single instance of the ModeShape services, which is obtained after setting up the {@link ModeShapeConfiguration#build() configuration}. + * A single instance of the ModeShape services, which is obtained after setting up the {@link ModeShapeConfiguration#build() + * configuration}. * * @see ModeShapeConfiguration */ @@ -82,11 +83,12 @@ public class ModeShapeEngine { private final RepositoryService repositoryService; private final SequencingService sequencingService; private final ExecutorService executorService; + private final ClusteringService clusteringService; private final MimeTypeDetectors detectors; private static final Logger LOGGER = Logger.getLogger(ModeShapeEngine.class); protected ModeShapeEngine( ExecutionContext context, - ModeShapeConfiguration.ConfigurationDefinition configuration ) { + ModeShapeConfiguration.ConfigurationDefinition configuration ) { this.problems = new SimpleProblems(); // Use the configuration's context ... @@ -105,23 +107,23 @@ public class ModeShapeEngine { detectors.addDetector(new MimeTypeDetectorConfig("ExtensionDetector", "Extension-based MIME type detector", ExtensionBasedMimeTypeDetector.class)); - // Create the RepositoryContext that the configuration repository source should use ... - ObservationBus configurationChangeBus = new ObservationBus(); - RepositoryContext configContext = new SimpleRepositoryContext(context, configurationChangeBus, null); - final RepositorySource configSource = this.configuration.getRepositorySource(); - configSource.initialize(configContext); + // Create the clustering service ... + ClusteringConfig clusterConfig = scanner.getClusteringConfiguration(); + clusteringService = new ClusteringService(); + clusteringService.setExecutionContext(context); + clusteringService.setClusteringConfig(clusterConfig); // Create the RepositoryService, pointing it to the configuration repository ... Path pathToConfigurationRoot = this.configuration.getPath(); String configWorkspaceName = this.configuration.getWorkspace(); + RepositorySource configSource = this.configuration.getRepositorySource(); repositoryService = new RepositoryService(configSource, configWorkspaceName, pathToConfigurationRoot, context, problems); - // Now register the repository service to be notified of changes to the configuration ... - configurationChangeBus.register(repositoryService); + // Create the executor service (which starts out with 0 threads, so it's okay to do here) ... + ThreadFactory threadPoolFactory = new NamedThreadFactory(configuration.getName()); + executorService = Executors.newCachedThreadPool(threadPoolFactory); // Create the sequencing service ... - ThreadFactory threadPoolFactory = new NamedThreadFactory(configuration.getName()); - executorService = Executors.newScheduledThreadPool(10, threadPoolFactory); sequencingService = new SequencingService(); sequencingService.setExecutionContext(context); sequencingService.setExecutorService(executorService); @@ -129,8 +131,9 @@ public class ModeShapeEngine { for (SequencerConfig sequencerConfig : scanner.getSequencingConfigurations()) { sequencingService.addSequencer(sequencerConfig); } - } + // The rest of the instantiation/configuration will be done in start() + } /** * Get the problems that were encountered when setting up this engine from the configuration. @@ -270,8 +273,18 @@ public class ModeShapeEngine { // Then throw an exception ... throw new IllegalStateException(RepositoryI18n.errorsPreventStarting.text()); } + + // Create the RepositoryContext that the configuration repository source should use ... + RepositoryContext configContext = new SimpleRepositoryContext(context, clusteringService, null); + configuration.getRepositorySource().initialize(configContext); + + // Start the various services ... + clusteringService.getAdministrator().start(); repositoryService.getAdministrator().start(); sequencingService.getAdministrator().start(); + + // Now register the repository service to be notified of changes to the configuration ... + clusteringService.register(repositoryService); } /** @@ -282,15 +295,19 @@ public class ModeShapeEngine { * @see #start() */ public void shutdown() { - // Then terminate the executor service, which may be running background jobs that are not yet completed + + // Terminate the executor service, which may be running background jobs that are not yet completed // and which will prevent new jobs being submitted (to the sequencing service) ... executorService.shutdown(); - // First, shutdown the sequencing service, which will prevent any additional jobs from going through ... + // Next, shutdown the sequencing service, which will prevent any additional jobs from going through ... sequencingService.getAdministrator().shutdown(); - // Finally shut down the repository source, which closes all connections ... + // Shut down the repository source, which closes all connections ... repositoryService.getAdministrator().shutdown(); + + // Finally shut down the clustering service ... + clusteringService.shutdown(); } /** @@ -387,6 +404,49 @@ public class ModeShapeEngine { return detectors; } + public ClusteringConfig getClusteringConfiguration() { + Graph graph = Graph.create(configurationRepository.getRepositorySource(), context); + Path pathToClusteringNode = context.getValueFactories().getPathFactory().create(configurationRepository.getPath(), + ModeShapeLexicon.CLUSTERING); + try { + Subgraph subgraph = graph.getSubgraphOfDepth(2).at(pathToClusteringNode); + + Set skipProperties = new HashSet(); + skipProperties.add(ModeShapeLexicon.DESCRIPTION); + skipProperties.add(ModeShapeLexicon.CLASSNAME); + skipProperties.add(ModeShapeLexicon.CLASSPATH); + Set skipNamespaces = new HashSet(); + skipNamespaces.add(JcrLexicon.Namespace.URI); + skipNamespaces.add(JcrNtLexicon.Namespace.URI); + skipNamespaces.add(JcrMixLexicon.Namespace.URI); + + Node clusterNode = subgraph.getRoot(); + String name = stringValueOf(clusterNode); + String desc = stringValueOf(clusterNode, ModeShapeLexicon.DESCRIPTION); + String classname = stringValueOf(clusterNode, ModeShapeLexicon.CLASSNAME); + String[] classpath = stringValuesOf(clusterNode, ModeShapeLexicon.CLASSPATH); + if (classname == null || classname.trim().length() == 0) { + classname = "org.modeshape.clustering.ClusteredObservationBus"; + } + + Map properties = new HashMap(); + for (Property property : clusterNode.getProperties()) { + Name propertyName = property.getName(); + if (skipNamespaces.contains(propertyName.getNamespaceUri())) continue; + if (skipProperties.contains(propertyName)) continue; + if (property.isSingle()) { + properties.put(propertyName.getLocalName(), property.getFirstValue()); + } else { + properties.put(propertyName.getLocalName(), property.getValuesAsArray()); + } + } + return new ClusteringConfig(name, desc, properties, classname, classpath); + } catch (PathNotFoundException e) { + // no detectors registered ... + } + return null; + } + public List getSequencingConfigurations() { List configs = new ArrayList(); Graph graph = Graph.create(configurationRepository.getRepositorySource(), context); Index: modeshape-repository/src/main/java/org/modeshape/repository/ModeShapeLexicon.java =================================================================== --- modeshape-repository/src/main/java/org/modeshape/repository/ModeShapeLexicon.java (revision 1976) +++ modeshape-repository/src/main/java/org/modeshape/repository/ModeShapeLexicon.java (working copy) @@ -48,4 +48,7 @@ public class ModeShapeLexicon extends org.modeshape.graph.ModeShapeLexicon { public static final Name VALUE = new BasicName(Namespace.URI, "value"); public static final Name RETRY_LIMIT = new BasicName(Namespace.URI, "retryLimit"); public static final Name DEFAULT_CACHE_POLICY = new BasicName(Namespace.URI, "defaultCachePolicy"); + public static final Name CLUSTERING = new BasicName(Namespace.URI, "clustering"); + public static final Name CONFIGURATION = new BasicName(Namespace.URI, "configuration"); + public static final Name CLUSTER_NAME = new BasicName(Namespace.URI, "clusterName"); } Index: modeshape-repository/src/main/java/org/modeshape/repository/RepositoryI18n.java =================================================================== --- modeshape-repository/src/main/java/org/modeshape/repository/RepositoryI18n.java (revision 1976) +++ modeshape-repository/src/main/java/org/modeshape/repository/RepositoryI18n.java (working copy) @@ -55,27 +55,14 @@ public final class RepositoryI18n { public static I18n errorFindingPropertyNameInPropertyChangedEvent; public static I18n errorFindingPropertyNameInPropertyRemovedEvent; - // Rules - public static I18n unableToObtainJsr94RuleAdministrator; - public static I18n errorUsingJsr94RuleAdministrator; - public static I18n unableToObtainJsr94ServiceProvider; - public static I18n errorAddingOrUpdatingRuleSet; - public static I18n errorRollingBackRuleSetAfterUpdateFailed; - public static I18n errorReadingRulesAndProperties; - public static I18n errorDeregisteringRuleSetBeforeUpdatingIt; - public static I18n errorRecreatingRuleSet; - public static I18n errorRemovingRuleSet; - public static I18n errorRemovingRuleSetUponShutdown; - public static I18n unableToFindRuleSet; - public static I18n errorExecutingRuleSetWithGlobalsAndFacts; - public static I18n unableToBuildRuleSetRegularExpressionPattern; - - public static I18n errorObtainingSessionToRepositoryWorkspace; - public static I18n errorWritingProblemsOnRuleSet; - - public static I18n federationServiceName; - public static I18n observationServiceName; - public static I18n ruleServiceName; + // Repository service ... + public static I18n repositoryServiceName; + + // Clustering service ... + public static I18n clusteringServiceName; + public static I18n unableToRegisterObserverOnUnstartedClusteringService; + public static I18n unableToUnregisterObserverOnUnstartedClusteringService; + public static I18n unableToNotifyObserversOnUnstartedClusteringService; // Sequencing public static I18n sequencingServiceName; Index: modeshape-repository/src/main/java/org/modeshape/repository/RepositoryLibrary.java =================================================================== --- modeshape-repository/src/main/java/org/modeshape/repository/RepositoryLibrary.java (revision 1976) +++ modeshape-repository/src/main/java/org/modeshape/repository/RepositoryLibrary.java (working copy) @@ -43,6 +43,7 @@ import org.modeshape.graph.connector.RepositoryConnectionFactory; import org.modeshape.graph.connector.RepositoryConnectionPool; import org.modeshape.graph.connector.RepositoryContext; import org.modeshape.graph.connector.RepositorySource; +import org.modeshape.graph.observe.LocalObservationBus; import org.modeshape.graph.observe.Observable; import org.modeshape.graph.observe.ObservationBus; import org.modeshape.graph.observe.Observer; @@ -65,7 +66,7 @@ public class RepositoryLibrary implements RepositoryConnectionFactory, Observabl protected class Administrator extends AbstractServiceAdministrator { protected Administrator() { - super(RepositoryI18n.federationServiceName, State.STARTED); + super(RepositoryI18n.repositoryServiceName, State.STARTED); } /** @@ -109,7 +110,7 @@ public class RepositoryLibrary implements RepositoryConnectionFactory, Observabl private final Map pools = new HashMap(); private RepositoryConnectionFactory delegate; private final ExecutionContext executionContext; - private final ObservationBus observationBus = new ObservationBus(); + private final ObservationBus observationBus = new LocalObservationBus(); private final RepositorySource configurationSource; private final String configurationWorkspaceName; private final Path pathToConfigurationRoot; Index: modeshape-repository/src/main/java/org/modeshape/repository/RepositoryService.java =================================================================== --- modeshape-repository/src/main/java/org/modeshape/repository/RepositoryService.java (revision 1976) +++ modeshape-repository/src/main/java/org/modeshape/repository/RepositoryService.java (working copy) @@ -23,6 +23,11 @@ */ package org.modeshape.repository; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import net.jcip.annotations.ThreadSafe; import org.modeshape.common.collection.Problems; import org.modeshape.common.collection.SimpleProblems; @@ -52,12 +57,6 @@ import org.modeshape.repository.service.AbstractServiceAdministrator; import org.modeshape.repository.service.AdministeredService; import org.modeshape.repository.service.ServiceAdministrator; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - /** * A service that manages the {@link RepositorySource}es defined within a configuration repository. */ @@ -72,7 +71,7 @@ public class RepositoryService implements AdministeredService, Observer { protected class Administrator extends AbstractServiceAdministrator { protected Administrator() { - super(RepositoryI18n.federationServiceName, State.PAUSED); + super(RepositoryI18n.repositoryServiceName, State.PAUSED); } /** @@ -535,6 +534,10 @@ public class RepositoryService implements AdministeredService, Observer { */ @Override protected void notify( NetChanges netChanges ) { + if (getConfigurationWorkspaceName() == null) { + // This was a transient configuration source, so it should never change ... + return; + } if (!getConfigurationSourceName().equals(netChanges.getSourceName())) return; for (NetChange change : netChanges.getNetChanges()) { if (!getConfigurationWorkspaceName().equals(change.getRepositoryWorkspaceName())) return; Index: modeshape-repository/src/main/java/org/modeshape/repository/cluster/ClusteringConfig.java new file mode 100644 =================================================================== --- /dev/null (revision 1976) +++ modeshape-repository/src/main/java/org/modeshape/repository/cluster/ClusteringConfig.java (working copy) @@ -0,0 +1,59 @@ +/* + * ModeShape (http://www.modeshape.org) + * See the COPYRIGHT.txt file distributed with this work for information + * regarding copyright ownership. Some portions may be licensed + * to Red Hat, Inc. under one or more contributor license agreements. + * See the AUTHORS.txt file in the distribution for a full listing of + * individual contributors. + * + * ModeShape is free software. Unless otherwise indicated, all code in ModeShape + * is licensed to you under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * ModeShape is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.modeshape.repository.cluster; + +import java.util.Map; +import net.jcip.annotations.Immutable; +import org.modeshape.common.component.ComponentConfig; + +/** + * A configuration for a cluster. + */ +@Immutable +public class ClusteringConfig extends ComponentConfig { + + public ClusteringConfig( String name, + String description, + String classname, + String[] classpath ) { + this(name, description, System.currentTimeMillis(), null, classname, classpath); + } + + public ClusteringConfig( String name, + String description, + Map properties, + String classname, + String[] classpath ) { + this(name, description, System.currentTimeMillis(), properties, classname, classpath); + } + + public ClusteringConfig( String name, + String description, + long timestamp, + Map properties, + String classname, + String[] classpath ) { + super(name, description, timestamp, properties, classname, classpath); + } +} Index: modeshape-repository/src/main/java/org/modeshape/repository/cluster/ClusteringService.java new file mode 100644 =================================================================== --- /dev/null (revision 1976) +++ modeshape-repository/src/main/java/org/modeshape/repository/cluster/ClusteringService.java (working copy) @@ -0,0 +1,238 @@ +/* + * ModeShape (http://www.modeshape.org) + * See the COPYRIGHT.txt file distributed with this work for information + * regarding copyright ownership. Some portions may be licensed + * to Red Hat, Inc. under one or more contributor license agreements. + * See the AUTHORS.txt file in the distribution for a full listing of + * individual contributors. + * + * ModeShape is free software. Unless otherwise indicated, all code in ModeShape + * is licensed to you under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * ModeShape is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.modeshape.repository.cluster; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.modeshape.common.component.ComponentLibrary; +import org.modeshape.common.util.CheckArg; +import org.modeshape.graph.ExecutionContext; +import org.modeshape.graph.observe.Changes; +import org.modeshape.graph.observe.LocalObservationBus; +import org.modeshape.graph.observe.ObservationBus; +import org.modeshape.graph.observe.Observer; +import org.modeshape.repository.RepositoryI18n; +import org.modeshape.repository.service.AbstractServiceAdministrator; +import org.modeshape.repository.service.AdministeredService; +import org.modeshape.repository.service.ServiceAdministrator; + +/** + * The service that provides the observation bus for a clustered (or unclustered) environment. + */ +public class ClusteringService implements AdministeredService, ObservationBus { + + /** + * The administrative component for this service. + */ + protected class Administrator extends AbstractServiceAdministrator { + + protected Administrator() { + super(RepositoryI18n.clusteringServiceName, State.PAUSED); + } + + /** + * {@inheritDoc} + */ + @Override + protected void doStart( State fromState ) { + super.doStart(fromState); + startService(); + } + + /** + * {@inheritDoc} + */ + @Override + protected void doShutdown( State fromState ) { + super.doShutdown(fromState); + shutdownService(); + } + + /** + * {@inheritDoc} + */ + @Override + protected boolean doCheckIsTerminated() { + return isServiceTerminated(); + } + + /** + * {@inheritDoc} + */ + public boolean awaitTermination( long timeout, + TimeUnit unit ) { + return true; // nothing to wait for + } + + } + + private ExecutionContext executionContext; + private ObservationBus bus; + private final ComponentLibrary busLibrary = new ComponentLibrary(); + + /** + * @return executionContext + */ + public ExecutionContext getExecutionContext() { + return this.executionContext; + } + + /** + * @param executionContext Sets executionContext to the specified value. + */ + public void setExecutionContext( ExecutionContext executionContext ) { + CheckArg.isNotNull(executionContext, "execution context"); + if (this.getAdministrator().isStarted()) { + throw new IllegalStateException(RepositoryI18n.unableToChangeExecutionContextWhileRunning.text()); + } + this.executionContext = executionContext; + this.busLibrary.setClassLoaderFactory(executionContext); + } + + /** + * Set the configuration for the clustering. This method will replace any existing configuration. + * + * @param config the new configuration, or null if the default configuration should be used + * @return true if the configuration was set, or false otherwise + */ + public boolean setClusteringConfig( ClusteringConfig config ) { + if (config == null) config = createDefaultConfiguration(); + return this.busLibrary.removeAllAndAdd(config); + } + + /** + * {@inheritDoc} + * + * @see org.modeshape.graph.observe.ObservationBus#hasObservers() + */ + @Override + public boolean hasObservers() { + return bus != null && bus.hasObservers(); + } + + /** + * {@inheritDoc} + * + * @see org.modeshape.graph.observe.Observable#register(org.modeshape.graph.observe.Observer) + */ + @Override + public boolean register( Observer observer ) { + if (bus == null) { + throw new IllegalStateException(RepositoryI18n.unableToRegisterObserverOnUnstartedClusteringService.text()); + } + return bus.register(observer); + } + + /** + * {@inheritDoc} + * + * @see org.modeshape.graph.observe.Observable#unregister(org.modeshape.graph.observe.Observer) + */ + @Override + public boolean unregister( Observer observer ) { + if (bus == null) { + throw new IllegalStateException(RepositoryI18n.unableToUnregisterObserverOnUnstartedClusteringService.text()); + } + return bus.unregister(observer); + } + + /** + * {@inheritDoc} + * + * @see org.modeshape.graph.observe.Observer#notify(org.modeshape.graph.observe.Changes) + */ + @Override + public void notify( Changes changes ) { + if (bus == null) { + throw new IllegalStateException(RepositoryI18n.unableToNotifyObserversOnUnstartedClusteringService.text()); + } + bus.notify(changes); + } + + /** + * {@inheritDoc} + * + * @see org.modeshape.repository.service.AdministeredService#getAdministrator() + */ + @Override + public ServiceAdministrator getAdministrator() { + return new Administrator(); + } + + /** + * {@inheritDoc} + *

+ * This is equivalent to calling getAdminstrator().start() and can be called multiple times. + *

+ * + * @see org.modeshape.graph.observe.ObservationBus#start() + */ + @Override + public void start() { + getAdministrator().start(); + } + + /** + * {@inheritDoc} + *

+ * This is equivalent to calling getAdminstrator().shutdown(). + *

+ * + * @see org.modeshape.graph.observe.ObservationBus#shutdown() + */ + @Override + public void shutdown() { + getAdministrator().shutdown(); + } + + protected void startService() { + List instances = busLibrary.getInstances(); + if (instances.isEmpty()) { + setClusteringConfig(null); + instances = busLibrary.getInstances(); + assert instances.size() > 0; + } + this.bus = instances.get(0); + this.bus.start(); + } + + protected void shutdownService() { + // Unregister our observer ... + try { + if (this.bus != null) { + this.bus.shutdown(); + } + } finally { + this.bus = null; + } + } + + protected boolean isServiceTerminated() { + return this.bus != null; + } + + protected ClusteringConfig createDefaultConfiguration() { + return new ClusteringConfig("bus", "Local observation bus", LocalObservationBus.class.getName(), null); + } +} Index: modeshape-repository/src/main/resources/org/modeshape/repository/RepositoryI18n.properties =================================================================== --- modeshape-repository/src/main/resources/org/modeshape/repository/RepositoryI18n.properties (revision 1976) +++ modeshape-repository/src/main/resources/org/modeshape/repository/RepositoryI18n.properties (working copy) @@ -40,26 +40,12 @@ errorFindingPropertyNameInPropertyAddedEvent = Error finding the name of the add errorFindingPropertyNameInPropertyChangedEvent = Error finding the name of the changed property in the event path {0} errorFindingPropertyNameInPropertyRemovedEvent = Error finding the name of the removed property in the event path {0} -unableToObtainJsr94RuleAdministrator = Unable to obtain the rule administrator for JSR-94 service provider {0} ({1}) while adding/updating rule set {2} -errorUsingJsr94RuleAdministrator = Error using rule administrator for JSR-94 service provider {0} ({1}) while adding/updating rule set {2} -unableToObtainJsr94ServiceProvider = Error using rule administrator for JSR-94 service provider {0} ({1}) -errorAddingOrUpdatingRuleSet = Error adding/updating rule set "{0}" -errorRollingBackRuleSetAfterUpdateFailed = Error rolling back rule set "{0}" after new rule set failed -errorReadingRulesAndProperties = Error reading the rules and properties while adding/updating rule set "{0}" -errorDeregisteringRuleSetBeforeUpdatingIt = Error deregistering rule set "{0}" before updating it -errorRecreatingRuleSet = Error (re)creating the rule set "{0}" -errorRemovingRuleSet = Error removing rule set "{0}" -errorRemovingRuleSetUponShutdown = Error removing rule set "{0}" upon shutdown -unableToFindRuleSet = Unable to find rule set with name "{0}" -errorExecutingRuleSetWithGlobalsAndFacts = Error executing rule set "{0}" and with globals {1} and facts {2} -unableToBuildRuleSetRegularExpressionPattern = Unable to build the rule set name pattern "{0}" using the supplied absolute path ("{1}"): {2} +repositoryServiceName = Repository Service -errorObtainingSessionToRepositoryWorkspace = Error obtaining JCR session to repository workspace {0} -errorWritingProblemsOnRuleSet = Error while writing problems on rule set node {0} - -federationServiceName = Federation Service -observationServiceName = Observation Service -ruleServiceName = Rule Service +clusteringServiceName = Clustering Service +unableToRegisterObserverOnUnstartedClusteringService = Unable to register an observer until the clustering service is started +unableToUnregisterObserverOnUnstartedClusteringService = Unable to unregister an observer until the clustering service is started +unableToNotifyObserversOnUnstartedClusteringService = Unable to notify observers until the clustering service is started sequencingServiceName = Sequencing Service unableToChangeExecutionContextWhileRunning = Unable to change the execution context while running Index: pom.xml =================================================================== --- pom.xml (revision 1976) +++ pom.xml (working copy) @@ -131,6 +131,7 @@ modeshape-repository modeshape-cnd extensions/modeshape-search-lucene + extensions/modeshape-clustering modeshape-jcr-api modeshape-jcr extensions/modeshape-classloader-maven