Index: .gitignore
===================================================================
--- .gitignore (revision 1989)
+++ .gitignore (working copy)
@@ -55,6 +55,7 @@
/deploy/jbossas/modeshape-jbossas-service/target/
/deploy/jbossas/modeshape-jbossas-web-rest-war/target/
+/extensions/modeshape-clustering/target
/extensions/modeshape-search-lucene/target
/extensions/modeshape-classloader-maven/target
/extensions/modeshape-common-jdbc/target
Index: extensions/modeshape-clustering/.classpath
new file mode 100644
===================================================================
--- /dev/null (revision 1989)
+++ extensions/modeshape-clustering/.classpath (working copy)
@@ -0,0 +1,10 @@
+
+
+
+
+
+
+
+
+
+
Index: extensions/modeshape-clustering/.project
new file mode 100644
===================================================================
--- /dev/null (revision 1989)
+++ extensions/modeshape-clustering/.project (working copy)
@@ -0,0 +1,23 @@
+
+
+ modeshape-clustering
+
+
+
+
+
+ org.maven.ide.eclipse.maven2Builder
+
+
+
+
+ org.eclipse.jdt.core.javabuilder
+
+
+
+
+
+ org.eclipse.jdt.core.javanature
+ org.maven.ide.eclipse.maven2Nature
+
+
Index: extensions/modeshape-clustering/pom.xml
new file mode 100644
===================================================================
--- /dev/null (revision 1989)
+++ extensions/modeshape-clustering/pom.xml (working copy)
@@ -0,0 +1,131 @@
+
+
+ 4.0.0
+
+ org.modeshape
+ modeshape
+ 2.1-SNAPSHOT
+ ../..
+
+
+ modeshape-clustering
+ jar
+ ModeShape Clustering Component
+ ModeShape clustering mechanism.
+ http://www.modeshape.org
+
+
+
+
+ org.modeshape
+ modeshape-graph
+
+
+
+ jgroups
+ jgroups
+ 2.9.0.GA
+
+
+
+ junit
+ junit
+
+
+ org.mockito
+ mockito-all
+
+
+ org.modeshape
+ modeshape-common
+ ${project.version}
+ test-jar
+ test
+
+
+
+ org.slf4j
+ slf4j-api
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ log4j
+ log4j
+
+
+
+ net.jcip
+ jcip-annotations
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-report-plugin
+
+
+
+
+
+
+
+ maven-assembly-plugin
+
+
+ ${project.basedir}/../../src/main/assembly/connector-with-dependencies.xml
+
+ ${project.artifactId}-${project.version}
+
+
+
+ package
+
+ single
+
+
+
+
+
+
+ maven-jar-plugin
+
+
+ ${project.build.outputDirectory}/META-INF/MANIFEST.MF
+
+
+
+
+ org.apache.felix
+ maven-bundle-plugin
+
+
+ bundle-manifest
+ process-classes
+
+ manifest
+
+
+
+
+
+
+
Index: extensions/modeshape-clustering/src/main/java/org/modeshape/clustering/ClusteredObservationBus.java
new file mode 100644
===================================================================
--- /dev/null (revision 1989)
+++ extensions/modeshape-clustering/src/main/java/org/modeshape/clustering/ClusteredObservationBus.java (working copy)
@@ -0,0 +1,515 @@
+/*
+ * ModeShape (http://www.modeshape.org)
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * See the AUTHORS.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * ModeShape is free software. Unless otherwise indicated, all code in ModeShape
+ * is licensed to you under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * ModeShape is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.modeshape.clustering;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.jgroups.Address;
+import org.jgroups.Channel;
+import org.jgroups.ChannelClosedException;
+import org.jgroups.ChannelException;
+import org.jgroups.ChannelListener;
+import org.jgroups.ChannelNotConnectedException;
+import org.jgroups.JChannel;
+import org.jgroups.Message;
+import org.jgroups.View;
+import org.jgroups.conf.ProtocolStackConfigurator;
+import org.jgroups.conf.XmlConfigurator;
+import org.jgroups.util.Util;
+import org.modeshape.common.SystemFailureException;
+import org.modeshape.common.util.CheckArg;
+import org.modeshape.common.util.Logger;
+import org.modeshape.graph.observe.ChangeObservers;
+import org.modeshape.graph.observe.Changes;
+import org.modeshape.graph.observe.ObservationBus;
+import org.modeshape.graph.observe.Observer;
+
+/**
+ * An implementation of a cluster-aware {@link ObservationBus}.
+ */
+public class ClusteredObservationBus implements ObservationBus {
+
+ protected static final Logger LOGGER = Logger.getLogger(ClusteredObservationBus.class);
+
+ /**
+ * The list of {@link Observer} instances that should receive events from the bus. These Observer objects are all local.
+ */
+ protected final ChangeObservers observers = new ChangeObservers();
+
+ /**
+ * The listener for channel changes.
+ */
+ private final Listener listener = new Listener();
+
+ /**
+ * The component that will receive the JGroups messages and broadcast them to this bus' observers.
+ */
+ private final Receiver receiver = new Receiver();
+
+ /**
+ * Flag that dictates whether this bus has connected to the cluster.
+ */
+ protected final AtomicBoolean isOpen = new AtomicBoolean(false);
+
+ /**
+ * Flag that dictates whether there are multiple participants in the cluster; if not, then the changes are propogated only to
+ * the local observers.
+ */
+ protected final AtomicBoolean multipleAddressesInCluster = new AtomicBoolean(false);
+
+ /**
+ * The configuration for JGroups
+ */
+ private String configuration;
+
+ /**
+ * The name of the JGroups cluster.
+ */
+ private String clusterName;
+
+ /**
+ * The JGroups channel to which all {@link #notify(Changes) change notifications} will be sent and from which all changes will
+ * be received and sent to the observers.
+ *
+ * It is important that the order of the {@link Changes} instances are maintained across the cluster, and JGroups will do this
+ * for us as long as we push all local changes into the channel and receive all local/remote changes from the channel.
+ *
+ */
+ private JChannel channel;
+
+ /**
+ * Get the configuration for JGroups. This configuration may be a string that refers to a resource on the classpath, the path
+ * of the local configuration, the URL to the configuration file, or the configuration specified using the newer-style XML
+ * form or older-style string form.
+ *
+ * @return the location of the JGroups configuration, or null if no configuration has been defined
+ */
+ public String getConfiguration() {
+ return configuration;
+ }
+
+ /**
+ * Set the JGroups configuration, which may be a string that refers to a resource on the classpath, the path of the local
+ * configuration, the URL to the configuration file, or the configuration specified using the newer-style XML form or
+ * older-style string form.
+ *
+ * @param configuration the relative path to a classpath resource, path to a local file, URL to the configuration file, or the
+ * configuration specified using the newer-style XML form or older-style string form.
+ * @throws IllegalStateException if this method is called after this bus has been {@link #start() started} but before it has
+ * been {@link #shutdown() shutdown}
+ */
+ public void setConfiguration( String configuration ) {
+ if (channel != null) {
+ String name = this.clusterName;
+ throw new IllegalStateException(ClusteringI18n.clusteringChannelIsRunningAndCannotBeChangedUnlessShutdown.text(name));
+ }
+ this.configuration = configuration;
+ }
+
+ /**
+ * Get the name of the JGroups cluster.
+ *
+ * @return the cluster name, or null if the cluster name was not yet defined
+ * @see JChannel#connect(String)
+ */
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ /**
+ * Set the name of the JGroups cluster. This must be called with a non-null cluster name before {@link #start()} is called.
+ *
+ * @param clusterName Sets clusterName to the specified value.
+ * @throws IllegalStateException if this method is called after this bus has been {@link #start() started} but before it has
+ * been {@link #shutdown() shutdown}
+ * @see JChannel#connect(String)
+ */
+ public void setClusterName( String clusterName ) {
+ CheckArg.isNotNull(clusterName, "clusterName");
+ if (channel != null) {
+ String name = this.clusterName;
+ throw new IllegalStateException(ClusteringI18n.clusteringChannelIsRunningAndCannotBeChangedUnlessShutdown.text(name));
+ }
+ this.clusterName = clusterName;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.modeshape.graph.observe.ObservationBus#start()
+ */
+ @Override
+ public synchronized void start() {
+ if (clusterName == null) {
+ throw new IllegalStateException(ClusteringI18n.clusterNameRequired.text());
+ }
+ if (channel != null) {
+ // Disconnect from any previous channel ...
+ channel.removeChannelListener(listener);
+ channel.setReceiver(null);
+ }
+ try {
+ // Create the new channel by calling the delegate method ...
+ channel = newChannel(configuration);
+ assert channel != null;
+ // Add a listener through which we'll know what's going on within the cluster ...
+ channel.addChannelListener(listener);
+
+ // Now connect to the cluster ...
+ channel.connect(clusterName);
+
+ // Set the receiver through which we'll receive all of the changes ...
+ channel.setReceiver(receiver);
+ } catch (ChannelException e) {
+ throw new IllegalStateException(ClusteringI18n.errorWhileStartingJGroups.text(configuration), e);
+ }
+ }
+
+ /**
+ * A method that is used to instantiate the {@link JChannel} object with the supplied configuration. Subclasses can override
+ * this method to specialize this behavior.
+ *
+ * @param configuration the configuration; may be null if the default configuration should be used
+ * @return the JChannel instance; never null
+ * @throws ChannelException if there is a problem creating the new channel object
+ */
+ protected JChannel newChannel( String configuration ) throws ChannelException {
+ if (configuration == null) {
+ return new JChannel();
+ }
+ // Try the XML configuration first ...
+ ProtocolStackConfigurator configurator = null;
+ InputStream stream = new ByteArrayInputStream(configuration.getBytes());
+ try {
+ configurator = XmlConfigurator.getInstance(stream);
+ } catch (IOException e) {
+ // ignore, since the configuration may be of another form ...
+ } finally {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ // ignore this
+ }
+ }
+ if (configurator != null) {
+ return new JChannel(configurator);
+ }
+ // Otherwise, just try the regular configuration ...
+ return new JChannel(configuration);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.modeshape.graph.observe.Observer#notify(org.modeshape.graph.observe.Changes)
+ */
+ @Override
+ public void notify( Changes changes ) {
+ if (changes == null) return; // do nothing
+ if (!isOpen.get()) {
+ // The channel is not open ...
+ return;
+ }
+ if (!multipleAddressesInCluster.get()) {
+ // We are in clustered mode, but there is only one participant in the cluster (us).
+ // So short-circuit the cluster and just notify the local observers ...
+ if (!observers.isEmpty()) {
+ observers.broadcast(changes);
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("Received on cluster '{0}' {1} changes in source '{2}' made by {3} from process '{4}' at {5}",
+ getClusterName(),
+ changes.getChangeRequests().size(),
+ changes.getSourceName(),
+ changes.getUserName(),
+ changes.getProcessId(),
+ changes.getTimestamp());
+ }
+ }
+ return;
+ }
+
+ // There are multiple participants in the cluster, so send all changes out to JGroups,
+ // letting JGroups do the ordering of messages...
+ try {
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("Sending to cluster '{0}' {1} changes in source '{2}' made by {3} from process '{4}' at {5}",
+ clusterName,
+ changes.getChangeRequests().size(),
+ changes.getSourceName(),
+ changes.getUserName(),
+ changes.getProcessId(),
+ changes.getTimestamp());
+ }
+ byte[] data = serialize(changes);
+ Message message = new Message(null, null, data);
+ channel.send(message);
+ } catch (ChannelClosedException e) {
+ LOGGER.warn(ClusteringI18n.unableToNotifyChangesBecauseClusterChannelHasClosed,
+ clusterName,
+ changes.getChangeRequests().size(),
+ changes.getSourceName(),
+ changes.getUserName(),
+ changes.getProcessId(),
+ changes.getTimestamp());
+ } catch (ChannelNotConnectedException e) {
+ LOGGER.warn(ClusteringI18n.unableToNotifyChangesBecauseClusterChannelIsNotConnected,
+ clusterName,
+ changes.getChangeRequests().size(),
+ changes.getSourceName(),
+ changes.getUserName(),
+ changes.getProcessId(),
+ changes.getTimestamp());
+ } catch (Exception e) {
+ // Something went wrong here (this should not happen) ...
+ String msg = ClusteringI18n.errorSerializingChanges.text(clusterName,
+ changes.getChangeRequests().size(),
+ changes.getSourceName(),
+ changes.getUserName(),
+ changes.getProcessId(),
+ changes.getTimestamp(),
+ changes);
+ throw new SystemFailureException(msg, e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.modeshape.graph.observe.Observable#register(org.modeshape.graph.observe.Observer)
+ */
+ @Override
+ public boolean register( Observer observer ) {
+ return observers.register(observer);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.modeshape.graph.observe.Observable#unregister(org.modeshape.graph.observe.Observer)
+ */
+ @Override
+ public boolean unregister( Observer observer ) {
+ return observers.unregister(observer);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.modeshape.graph.observe.ObservationBus#hasObservers()
+ */
+ @Override
+ public boolean hasObservers() {
+ return !observers.isEmpty();
+ }
+
+ /**
+ * Return whether this bus has been {@link #start() started} and not yet {@link #shutdown() shut down}.
+ *
+ * @return true if {@link #start()} has been called but {@link #shutdown()} has not, or false otherwise
+ */
+ public boolean isStarted() {
+ return channel != null;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.modeshape.graph.observe.ObservationBus#shutdown()
+ */
+ @Override
+ public synchronized void shutdown() {
+ if (channel != null) {
+ // Mark this as not accepting any more ...
+ isOpen.set(false);
+ try {
+ // Disconnect from the channel and close it ...
+ channel.removeChannelListener(listener);
+ channel.setReceiver(null);
+ channel.close();
+ } finally {
+ channel = null;
+ // Now that we're not receiving any more messages, shut down the list of observers ...
+ observers.shutdown();
+ }
+ }
+ }
+
+ protected static byte[] serialize( Changes changes ) throws Exception {
+ return Util.objectToByteBuffer(changes);
+ }
+
+ protected static Changes deserialize( byte[] data ) throws Exception {
+ return (Changes)Util.objectFromByteBuffer(data);
+ }
+
+ protected class Receiver implements org.jgroups.Receiver {
+ private byte[] state;
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.jgroups.MembershipListener#block()
+ */
+ @Override
+ public void block() {
+ isOpen.set(false);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.jgroups.MessageListener#receive(org.jgroups.Message)
+ */
+ @Override
+ public void receive( Message message ) {
+ if (!observers.isEmpty()) {
+ // We have at least one observer ...
+ try {
+ // Deserialize the changes ...
+ Changes changes = deserialize(message.getBuffer());
+ // and broadcast to all of our observers ...
+ observers.broadcast(changes);
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("Received on cluster '{0}' {1} changes in source '{2}' made by {3} from process '{4}' at {5}",
+ getClusterName(),
+ changes.getChangeRequests().size(),
+ changes.getSourceName(),
+ changes.getUserName(),
+ changes.getProcessId(),
+ changes.getTimestamp());
+ }
+ } catch (Exception e) {
+ // Something went wrong here (this should not happen) ...
+ String msg = ClusteringI18n.errorDeserializingChanges.text(getClusterName());
+ throw new SystemFailureException(msg, e);
+ }
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.jgroups.MessageListener#getState()
+ */
+ @Override
+ public byte[] getState() {
+ return state;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.jgroups.MessageListener#setState(byte[])
+ */
+ @Override
+ public void setState( byte[] state ) {
+ this.state = state;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.jgroups.MembershipListener#suspect(org.jgroups.Address)
+ */
+ @Override
+ public void suspect( Address suspectedMbr ) {
+ LOGGER.error(ClusteringI18n.memberOfClusterIsSuspect, getClusterName(), suspectedMbr);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.jgroups.MembershipListener#viewAccepted(org.jgroups.View)
+ */
+ @Override
+ public void viewAccepted( View newView ) {
+ LOGGER.trace("Members of '{0}' cluster have changed: {1}", getClusterName(), newView);
+ if (newView.getMembers().size() > 1) {
+ if (multipleAddressesInCluster.compareAndSet(false, true)) {
+ LOGGER.debug("There are now multiple members of cluster '{0}'; changes will be propagated throughout the cluster",
+ getClusterName());
+ }
+ } else {
+ if (multipleAddressesInCluster.compareAndSet(true, false)) {
+ LOGGER.debug("There is only one member of cluster '{0}'; changes will be propagated locally only",
+ getClusterName());
+ }
+ }
+ }
+ }
+
+ protected class Listener implements ChannelListener {
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.jgroups.ChannelListener#channelClosed(org.jgroups.Channel)
+ */
+ @Override
+ public void channelClosed( Channel channel ) {
+ isOpen.set(false);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.jgroups.ChannelListener#channelConnected(org.jgroups.Channel)
+ */
+ @Override
+ public void channelConnected( Channel channel ) {
+ isOpen.set(true);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.jgroups.ChannelListener#channelDisconnected(org.jgroups.Channel)
+ */
+ @Override
+ public void channelDisconnected( Channel channel ) {
+ isOpen.set(false);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.jgroups.ChannelListener#channelReconnected(org.jgroups.Address)
+ */
+ @Override
+ public void channelReconnected( Address addr ) {
+ isOpen.set(true);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.jgroups.ChannelListener#channelShunned()
+ */
+ @Override
+ public void channelShunned() {
+ isOpen.set(false);
+ }
+ }
+}
Index: extensions/modeshape-clustering/src/main/java/org/modeshape/clustering/ClusteringI18n.java
new file mode 100644
===================================================================
--- /dev/null (revision 1989)
+++ extensions/modeshape-clustering/src/main/java/org/modeshape/clustering/ClusteringI18n.java (working copy)
@@ -0,0 +1,49 @@
+/*
+ * ModeShape (http://www.modeshape.org)
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+* See the AUTHORS.txt file in the distribution for a full listing of
+* individual contributors.
+ *
+ * ModeShape is free software. Unless otherwise indicated, all code in ModeShape
+ * is licensed to you under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * ModeShape is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.modeshape.clustering;
+
+import org.modeshape.common.i18n.I18n;
+
+/**
+ * The internationalized string constants for the org.modeshape.connector.filesystem*
packages.
+ */
+public final class ClusteringI18n {
+
+ public static I18n errorWhileStartingJGroups;
+ public static I18n clusterNameRequired;
+ public static I18n unableToNotifyChangesBecauseClusterChannelHasClosed;
+ public static I18n unableToNotifyChangesBecauseClusterChannelIsNotConnected;
+ public static I18n errorSerializingChanges;
+ public static I18n errorDeserializingChanges;
+ public static I18n clusteringChannelIsRunningAndCannotBeChangedUnlessShutdown;
+ public static I18n memberOfClusterIsSuspect;
+
+ static {
+ try {
+ I18n.initialize(ClusteringI18n.class);
+ } catch (final Exception err) {
+ System.err.println(err);
+ }
+ }
+}
Index: extensions/modeshape-clustering/src/main/resources/org/modeshape/clustering/ClusteringI18n.properties
new file mode 100644
===================================================================
--- /dev/null (revision 1989)
+++ extensions/modeshape-clustering/src/main/resources/org/modeshape/clustering/ClusteringI18n.properties (working copy)
@@ -0,0 +1,31 @@
+#
+# ModeShape (http://www.modeshape.org)
+# See the COPYRIGHT.txt file distributed with this work for information
+# regarding copyright ownership. Some portions may be licensed
+# to Red Hat, Inc. under one or more contributor license agreements.
+# See the AUTHORS.txt file in the distribution for a full listing of
+# individual contributors.
+#
+# ModeShape is free software. Unless otherwise indicated, all code in ModeShape
+# is licensed to you under the terms of the GNU Lesser General Public License as
+# published by the Free Software Foundation; either version 2.1 of
+# the License, or (at your option) any later version.
+#
+# ModeShape is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this software; if not, write to the Free
+# Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+# 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+#
+errorWhileStartingJGroups = Error while starting JGroups with configuration: {0}
+clusterNameRequired = A cluster name is required for JGroups-based clustering
+unableToNotifyChangesBecauseClusterChannelHasClosed = Cluster channel '{0}' has closed, so unable to send {1} changes in source '{2}' made by {3} from process '{4}' at {5}
+unableToNotifyChangesBecauseClusterChannelIsNotConnected = Cluster channel '{0}' is not connected, so unable to send {1} changes in source '{2}' made by {3} from process '{4}' at {5}
+errorSerializingChanges = Error in channel '{0}' while serializing {1} changes in source '{2}' made by {3} from process '{4}' at {5}: {6}
+errorDeserializingChanges = Error deserializing changes obtained from channel '{0}'
+clusteringChannelIsRunningAndCannotBeChangedUnlessShutdown = The cluster channel '{0}' is running and cannot be changed unless shut down
+memberOfClusterIsSuspect = Member of '{0}' cluster is suspect at '{1}'
Index: extensions/modeshape-clustering/src/test/java/org/modeshape/clustering/ClusteredObservationBusTest.java
new file mode 100644
===================================================================
--- /dev/null (revision 1989)
+++ extensions/modeshape-clustering/src/test/java/org/modeshape/clustering/ClusteredObservationBusTest.java (working copy)
@@ -0,0 +1,352 @@
+/*
+ * ModeShape (http://www.modeshape.org)
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * See the AUTHORS.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * ModeShape is free software. Unless otherwise indicated, all code in ModeShape
+ * is licensed to you under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * ModeShape is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.modeshape.clustering;
+
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsNot.not;
+import static org.hamcrest.core.IsNull.nullValue;
+import static org.hamcrest.core.IsSame.sameInstance;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.jgroups.ChannelListener;
+import org.jgroups.JChannel;
+import org.jgroups.Message;
+import org.jgroups.Receiver;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+import org.modeshape.graph.ExecutionContext;
+import org.modeshape.graph.Location;
+import org.modeshape.graph.observe.Changes;
+import org.modeshape.graph.observe.Observer;
+import org.modeshape.graph.property.DateTime;
+import org.modeshape.graph.property.Name;
+import org.modeshape.graph.property.Path;
+import org.modeshape.graph.request.ChangeRequest;
+import org.modeshape.graph.request.CreateNodeRequest;
+
+public class ClusteredObservationBusTest {
+
+ private ClusteredObservationBus bus;
+ private ExecutionContext context = new ExecutionContext();
+ protected JChannel mockChannel;
+
+ @Before
+ public void beforeEach() {
+ mockChannel = Mockito.mock(JChannel.class);
+ // Create a clustered bus that does NOT use a real JChannel ...
+ bus = newBus(mockChannel);
+ }
+
+ @Test
+ public void shouldSerializeAndDeserializeChanges() throws Exception {
+ Changes changes = changes();
+ byte[] data = ClusteredObservationBus.serialize(changes);
+ Changes deserialized = ClusteredObservationBus.deserialize(data);
+ // Should be equal ...
+ assertThat(changes, is(deserialized));
+ // but not == ...
+ assertThat(changes, is(not(sameInstance(deserialized))));
+ }
+
+ @Test( expected = IllegalArgumentException.class )
+ public void shouldNotAllowSettingClusterNameToNull() {
+ bus.setClusterName(null);
+ }
+
+ @Test
+ public void shouldAllowSettingClusterNameToBlankString() {
+ setAndGetClusterName("");
+ }
+
+ @Test
+ public void shouldAllowSettingClusterNameToStringWithAlphaNumericCharacters() {
+ setAndGetClusterName("abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ");
+ }
+
+ @Test
+ public void shouldAllowSettingClusterNameToStringWithAlphaNumericAndPunctuationCharacters() {
+ setAndGetClusterName("valid.cluster!name@#$%^&*()<>?,./:\"'[]\\{}|_+-=");
+ }
+
+ @Test
+ public void shouldAllowSettingClusterNameToStringWithAlphaNumericAndWhitespaceCharacters() {
+ setAndGetClusterName("valid cluster name");
+ }
+
+ @Test
+ public void shouldAllowSettingConfigurationToNull() {
+ setAndGetConfiguration(null);
+ }
+
+ @Test
+ public void shouldAllowSettingConfigurationToBlankString() {
+ setAndGetConfiguration(null);
+ }
+
+ @Test( expected = IllegalStateException.class )
+ public void shouldNotAllowStartingWithoutSettingClusterName() {
+ assertThat(bus.getClusterName(), is(nullValue()));
+ assertThat(bus.isStarted(), is(false));
+ bus.start();
+ assertThat(bus.isStarted(), is(true));
+ bus.shutdown();
+ assertThat(bus.isStarted(), is(false));
+ }
+
+ @Test
+ public void shouldAllowStartingWithoutSettingConfiguration() {
+ bus.setClusterName("clusterName");
+ assertThat(bus.isStarted(), is(false));
+ bus.start();
+ assertThat(bus.isStarted(), is(true));
+ bus.shutdown();
+ assertThat(bus.isStarted(), is(false));
+ }
+
+ @Test
+ public void shouldAllowShuttingDownWithoutHavingStarted() {
+ assertThat(bus.isStarted(), is(false));
+ bus.shutdown();
+ assertThat(bus.isStarted(), is(false));
+ }
+
+ @Test( expected = IllegalStateException.class )
+ public void shouldNotAllowSettingConfigurationAfterBusHasBeenStartedButBeforeBusHasBeenShutdown() {
+ bus.setClusterName("clusterName");
+ bus.setConfiguration("old configuration");
+ assertThat(bus.isStarted(), is(false));
+ bus.start();
+ assertThat(bus.isStarted(), is(true));
+ bus.setConfiguration("new configuration"); // !! should fail !!
+ }
+
+ @Test
+ public void shouldAllowSettingConfigurationAfterBusHasBeenStartedAndShutdown() {
+ bus.setClusterName("clusterName");
+ bus.setConfiguration("old configuration");
+ assertThat(bus.isStarted(), is(false));
+ bus.start();
+ assertThat(bus.isStarted(), is(true));
+ bus.shutdown();
+ assertThat(bus.isStarted(), is(false));
+ bus.setConfiguration("new configuration");
+ }
+
+ @Test
+ public void shouldAllowNotifyToBeCalledBeforeStartButShouldDoNothing() throws Exception {
+ bus.setClusterName("clusterName");
+ bus.notify(changes());
+ ArgumentCaptor argument = ArgumentCaptor.forClass(Message.class);
+ verify(mockChannel, never()).send(argument.capture());
+ }
+
+ @Test
+ public void shouldAllowNotifyToBeCalledAfterStartWithMultipleMembersAndShouldSendMessageToJGroups() throws Exception {
+ bus.setClusterName("clusterName");
+ bus.start();
+ verify(mockChannel, times(1)).addChannelListener(ArgumentCaptor.forClass(ChannelListener.class).capture());
+ verify(mockChannel, times(1)).connect("clusterName");
+ verify(mockChannel, times(1)).setReceiver(ArgumentCaptor.forClass(Receiver.class).capture());
+
+ // When connected, JGroups will call back to the listener and the bus will record it as open.
+ // But we have to do this manually because we've stubbed out JGroups ...
+ bus.isOpen.set(true);
+
+ // JGroups also normally calls Receiver.viewAccepted(...), and the bus' receiver sets whether there are
+ // multiple members in the cluster. We need to set this manually because we've stubbed out JGroups ...
+ bus.multipleAddressesInCluster.set(true);
+
+ // Now call the notify method ...
+ bus.notify(changes());
+ verify(mockChannel, times(1)).send(ArgumentCaptor.forClass(Message.class).capture());
+ verifyNoMoreInteractions(mockChannel);
+ }
+
+ @Test
+ public void shouldAllowNotifyToBeCalledAfterStartWithOneMemberAndShouldSendMessageToLocalObserversBuNotJGroups()
+ throws Exception {
+ bus.setClusterName("clusterName");
+ bus.start();
+ verify(mockChannel, times(1)).addChannelListener(ArgumentCaptor.forClass(ChannelListener.class).capture());
+ verify(mockChannel, times(1)).connect("clusterName");
+ verify(mockChannel, times(1)).setReceiver(ArgumentCaptor.forClass(Receiver.class).capture());
+
+ // When connected, JGroups will call back to the listener and the bus will record it as open.
+ // But we have to do this manually because we've stubbed out JGroups ...
+ bus.isOpen.set(true);
+
+ // JGroups also normally calls Receiver.viewAccepted(...), and the bus' receiver sets whether there are
+ // multiple members in the cluster. We need to set this manually because we've stubbed out JGroups ...
+ bus.multipleAddressesInCluster.set(false);
+
+ // Add a local listener ...
+ Observer observer = mock(Observer.class);
+ bus.register(observer);
+
+ // Now call the notify method ...
+ Changes changes = changes();
+ bus.notify(changes);
+
+ verify(mockChannel, never()).send(ArgumentCaptor.forClass(Message.class).capture());
+ verifyNoMoreInteractions(mockChannel);
+ verify(observer, times(1)).notify(changes);
+ }
+
+ @Test
+ public void shouldProperlySendChangesThroughRealJGroupsCluster() throws Exception {
+
+ // Create three observers ...
+ CountDownLatch latch = new CountDownLatch(3);
+ CustomObserver observer1 = new CustomObserver(latch);
+ CustomObserver observer2 = new CustomObserver(latch);
+ CustomObserver observer3 = new CustomObserver(latch);
+
+ // Create three busses using a real JGroups cluster ...
+ String name = "MyCluster";
+ ClusteredObservationBus bus1 = startNewBus(name, observer1);
+ try {
+ ClusteredObservationBus bus2 = startNewBus(name, observer2);
+ try {
+ ClusteredObservationBus bus3 = startNewBus(name, observer3);
+ try {
+
+ // Send changes to one of the busses ...
+ Changes changes = changes();
+ bus1.notify(changes);
+
+ // Wait for the observers to be notified ...
+ observer1.await();
+ observer2.await();
+ observer3.await();
+
+ // Now verify that all of the observers received the notification ...
+ assertThat(observer1.getObservedChanges().size(), is(1));
+ assertThat(observer2.getObservedChanges().size(), is(1));
+ assertThat(observer3.getObservedChanges().size(), is(1));
+ assertThat(observer1.getObservedChanges().get(0), is(changes));
+ assertThat(observer2.getObservedChanges().get(0), is(changes));
+ assertThat(observer3.getObservedChanges().get(0), is(changes));
+
+ // Stop the busses ...
+ } finally {
+ bus3.shutdown();
+ }
+ } finally {
+ bus2.shutdown();
+ }
+ } finally {
+ bus1.shutdown();
+ }
+ }
+
+ // ----------------------------------------------------------------------------------------------------------------
+ // Utility methods
+ // ----------------------------------------------------------------------------------------------------------------
+
+ protected void setAndGetClusterName( String name ) {
+ bus.setClusterName(name);
+ String nameAfter = bus.getClusterName();
+ assertThat(nameAfter, is(name));
+ }
+
+ protected void setAndGetConfiguration( String config ) {
+ bus.setConfiguration(config);
+ String configAfter = bus.getConfiguration();
+ assertThat(configAfter, is(config));
+ }
+
+ protected ClusteredObservationBus startNewBus( String name,
+ Observer localObserver ) {
+ ClusteredObservationBus bus = newBus(null);
+ bus.setClusterName(name);
+ bus.start();
+ bus.register(localObserver);
+ return bus;
+ }
+
+ protected ClusteredObservationBus newBus( final JChannel channel ) {
+ return channel == null ? new ClusteredObservationBus() : new ClusteredObservationBus() {
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.modeshape.clustering.ClusteredObservationBus#newChannel(java.lang.String)
+ */
+ @Override
+ protected JChannel newChannel( String configuration ) {
+ return channel;
+ }
+ };
+
+ }
+
+ protected Changes changes() {
+ DateTime now = context.getValueFactories().getDateFactory().create();
+ Path path = context.getValueFactories().getPathFactory().create("/a");
+ Name childName = context.getValueFactories().getNameFactory().create("b");
+ Path childPath = context.getValueFactories().getPathFactory().create(path, childName);
+ CreateNodeRequest request = new CreateNodeRequest(Location.create(path), "workspaceName", childName);
+ request.setActualLocationOfNode(Location.create(childPath));
+ List requests = Collections.singletonList((ChangeRequest)request);
+ return new Changes("processId", "contextId", "username", "sourceName", now, requests, null);
+ }
+
+ protected static class CustomObserver implements Observer {
+ private final List receivedChanges = new ArrayList();
+ private final CountDownLatch latch;
+
+ protected CustomObserver( CountDownLatch latch ) {
+ this.latch = latch;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.modeshape.graph.observe.Observer#notify(org.modeshape.graph.observe.Changes)
+ */
+ @Override
+ public void notify( Changes changes ) {
+ receivedChanges.add(changes);
+ latch.countDown();
+ }
+
+ public void await() throws InterruptedException {
+ latch.await(250, TimeUnit.MILLISECONDS);
+ }
+
+ public List getObservedChanges() {
+ return receivedChanges;
+ }
+ }
+}
Index: extensions/modeshape-clustering/src/test/java/org/modeshape/clustering/ClusteringI18nTest.java
new file mode 100644
===================================================================
--- /dev/null (revision 1989)
+++ extensions/modeshape-clustering/src/test/java/org/modeshape/clustering/ClusteringI18nTest.java (working copy)
@@ -0,0 +1,33 @@
+/*
+ * ModeShape (http://www.modeshape.org)
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * See the AUTHORS.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * ModeShape is free software. Unless otherwise indicated, all code in ModeShape
+ * is licensed to you under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * ModeShape is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.modeshape.clustering;
+
+import org.modeshape.common.AbstractI18nTest;
+
+public class ClusteringI18nTest extends AbstractI18nTest {
+
+ public ClusteringI18nTest() {
+ super(ClusteringI18n.class);
+ }
+}
Index: extensions/modeshape-clustering/src/test/resources/log4j.properties
new file mode 100644
===================================================================
--- /dev/null (revision 1989)
+++ extensions/modeshape-clustering/src/test/resources/log4j.properties (working copy)
@@ -0,0 +1,15 @@
+# Direct log messages to stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %m%n
+
+# Root logger option
+log4j.rootLogger=INFO, stdout
+
+# Set up the default logging to be INFO level, then override specific units
+log4j.logger.org.modeshape=DEBUG
+
+# JGroups
+log4j.logger.org.jgroups=ERROR
+
Index: modeshape-common/src/main/java/org/modeshape/common/component/ComponentLibrary.java
===================================================================
--- modeshape-common/src/main/java/org/modeshape/common/component/ComponentLibrary.java (revision 1989)
+++ modeshape-common/src/main/java/org/modeshape/common/component/ComponentLibrary.java (working copy)
@@ -220,6 +220,27 @@ public class ComponentLibrary
}
}
+ public boolean removeAll() {
+ try {
+ this.lock.lock();
+ this.configs.clear();
+ this.instances.clear();
+ return true;
+ } finally {
+ this.lock.unlock();
+ }
+ }
+
+ public boolean removeAllAndAdd( ConfigType config ) {
+ try {
+ this.lock.lock();
+ removeAll();
+ return add(config);
+ } finally {
+ this.lock.unlock();
+ }
+ }
+
/**
* Refresh the instances by attempting to re-instantiate each registered configuration.
*
@@ -282,6 +303,7 @@ public class ComponentLibrary
reflection.invokeSetterMethodOnTarget(entry.getKey(), newInstance, entry.getValue());
}
}
+ configure(newInstance, config);
} catch (Throwable e) {
throw new SystemFailureException(e);
}
@@ -291,6 +313,11 @@ public class ComponentLibrary
return newInstance;
}
+ protected void configure( ComponentType newInstance,
+ ConfigType configuration ) throws Exception {
+ // do nothing
+ }
+
/**
* Method that instantiates the supplied class. This method can be overridden by subclasses that may need to wrap or adapt the
* instance to be a ComponentType.
Index: modeshape-graph/src/main/java/org/modeshape/graph/ExecutionContext.java
===================================================================
--- modeshape-graph/src/main/java/org/modeshape/graph/ExecutionContext.java (revision 1989)
+++ modeshape-graph/src/main/java/org/modeshape/graph/ExecutionContext.java (working copy)
@@ -71,6 +71,7 @@ public class ExecutionContext implements ClassLoaderFactory, Cloneable {
private final SecurityContext securityContext;
/** The unique ID string, which is always generated so that it can be final and not volatile. */
private final String id = UUID.randomUUID().toString();
+ private final String processId;
private final Map data;
/**
@@ -80,7 +81,7 @@ public class ExecutionContext implements ClassLoaderFactory, Cloneable {
*/
@SuppressWarnings( "synthetic-access" )
public ExecutionContext() {
- this(new NullSecurityContext(), null, null, null, null, null, null);
+ this(new NullSecurityContext(), null, null, null, null, null, null, null);
initializeDefaultNamespaces(this.getNamespaceRegistry());
assert securityContext != null;
@@ -101,6 +102,7 @@ public class ExecutionContext implements ClassLoaderFactory, Cloneable {
this.classLoaderFactory = original.getClassLoaderFactory();
this.mimeTypeDetector = original.getMimeTypeDetector();
this.data = original.getData();
+ this.processId = original.getProcessId();
}
/**
@@ -121,6 +123,7 @@ public class ExecutionContext implements ClassLoaderFactory, Cloneable {
this.classLoaderFactory = original.getClassLoaderFactory();
this.mimeTypeDetector = original.getMimeTypeDetector();
this.data = original.getData();
+ this.processId = original.getProcessId();
}
/**
@@ -138,6 +141,7 @@ public class ExecutionContext implements ClassLoaderFactory, Cloneable {
* @param classLoaderFactory the {@link ClassLoaderFactory} implementation, or null if a {@link StandardClassLoaderFactory}
* instance should be used
* @param data the custom data for this context, or null if there is no such data
+ * @param processId the unique identifier of the process in which this context exists, or null if it should be generated
*/
protected ExecutionContext( SecurityContext securityContext,
NamespaceRegistry namespaceRegistry,
@@ -145,7 +149,8 @@ public class ExecutionContext implements ClassLoaderFactory, Cloneable {
PropertyFactory propertyFactory,
MimeTypeDetector mimeTypeDetector,
ClassLoaderFactory classLoaderFactory,
- Map data ) {
+ Map data,
+ String processId ) {
assert securityContext != null;
this.securityContext = securityContext;
this.namespaceRegistry = namespaceRegistry != null ? namespaceRegistry : new ThreadSafeNamespaceRegistry(
@@ -155,6 +160,7 @@ public class ExecutionContext implements ClassLoaderFactory, Cloneable {
this.classLoaderFactory = classLoaderFactory == null ? new StandardClassLoaderFactory() : classLoaderFactory;
this.mimeTypeDetector = mimeTypeDetector != null ? mimeTypeDetector : createDefaultMimeTypeDetector();
this.data = data != null ? data : Collections.emptyMap();
+ this.processId = processId != null ? processId : UUID.randomUUID().toString();
}
private MimeTypeDetector createDefaultMimeTypeDetector() {
@@ -253,7 +259,7 @@ public class ExecutionContext implements ClassLoaderFactory, Cloneable {
}
/**
- * Get the unique identifier for this context.
+ * Get the unique identifier for this context. Each context will always have a unique identifier.
*
* @return the unique identifier string; never null and never empty
*/
@@ -262,6 +268,16 @@ public class ExecutionContext implements ClassLoaderFactory, Cloneable {
}
/**
+ * Get the identifier for the process in which this context exists. Multiple contexts running in the same "process" will all
+ * have the same identifier.
+ *
+ * @return the identifier for the process; never null and never empty
+ */
+ public String getProcessId() {
+ return processId;
+ }
+
+ /**
* Get the immutable map of custom data that is affiliated with this context.
*
* @return the custom data; never null but possibly empty
@@ -283,7 +299,7 @@ public class ExecutionContext implements ClassLoaderFactory, Cloneable {
// Don't supply the value factories or property factories, since they'll have to be recreated
// to reference the supplied namespace registry ...
return new ExecutionContext(this.getSecurityContext(), namespaceRegistry, null, null, this.getMimeTypeDetector(),
- this.getClassLoaderFactory(), this.getData());
+ this.getClassLoaderFactory(), this.getData(), getProcessId());
}
/**
@@ -299,7 +315,7 @@ public class ExecutionContext implements ClassLoaderFactory, Cloneable {
// Don't supply the value factories or property factories, since they'll have to be recreated
// to reference the supplied namespace registry ...
return new ExecutionContext(this.getSecurityContext(), getNamespaceRegistry(), getValueFactories(), getPropertyFactory(),
- mimeTypeDetector, getClassLoaderFactory(), this.getData());
+ mimeTypeDetector, getClassLoaderFactory(), this.getData(), getProcessId());
}
/**
@@ -314,7 +330,7 @@ public class ExecutionContext implements ClassLoaderFactory, Cloneable {
// Don't supply the value factories or property factories, since they'll have to be recreated
// to reference the supplied namespace registry ...
return new ExecutionContext(this.getSecurityContext(), getNamespaceRegistry(), getValueFactories(), getPropertyFactory(),
- getMimeTypeDetector(), classLoaderFactory, this.getData());
+ getMimeTypeDetector(), classLoaderFactory, this.getData(), getProcessId());
}
/**
@@ -348,7 +364,7 @@ public class ExecutionContext implements ClassLoaderFactory, Cloneable {
newData = Collections.unmodifiableMap(new HashMap(data));
}
return new ExecutionContext(this.getSecurityContext(), getNamespaceRegistry(), getValueFactories(), getPropertyFactory(),
- getMimeTypeDetector(), getClassLoaderFactory(), newData);
+ getMimeTypeDetector(), getClassLoaderFactory(), newData, getProcessId());
}
/**
@@ -380,7 +396,20 @@ public class ExecutionContext implements ClassLoaderFactory, Cloneable {
newData = Collections.unmodifiableMap(newData);
}
return new ExecutionContext(getSecurityContext(), getNamespaceRegistry(), getValueFactories(), getPropertyFactory(),
- getMimeTypeDetector(), getClassLoaderFactory(), newData);
+ getMimeTypeDetector(), getClassLoaderFactory(), newData, getProcessId());
+ }
+
+ /**
+ * Create a new execution context that mirrors this context but that contains the supplied process identifier.
+ *
+ * @param processId the identifier of the process
+ * @return the execution context that is identical with this execution context, but which uses the supplied process
+ * identifier; never null
+ * @since 2.1
+ */
+ public ExecutionContext with( String processId ) {
+ return new ExecutionContext(getSecurityContext(), getNamespaceRegistry(), getValueFactories(), getPropertyFactory(),
+ getMimeTypeDetector(), getClassLoaderFactory(), getData(), processId);
}
/**
Index: modeshape-graph/src/main/java/org/modeshape/graph/Graph.java
===================================================================
--- modeshape-graph/src/main/java/org/modeshape/graph/Graph.java (revision 1989)
+++ modeshape-graph/src/main/java/org/modeshape/graph/Graph.java (working copy)
@@ -7041,6 +7041,34 @@ public class Graph {
return getLocation().hashCode();
}
+ /**
+ * {@inheritDoc}
+ *
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ @Override
+ public boolean equals( Object obj ) {
+ if (obj instanceof SubgraphResults) {
+ SubgraphResults that = (SubgraphResults)obj;
+ return getLocation().equals(that.getLocation()) && request.equals(that.request);
+ } else if (obj instanceof Subgraph) {
+ Subgraph that = (Subgraph)obj;
+ if (!getLocation().equals(that.getLocation())) return false;
+ Iterator thisIter = this.iterator();
+ Iterator thatIter = that.iterator();
+ while (thisIter.hasNext() && thatIter.hasNext()) {
+ SubgraphNode thisNode = thisIter.next();
+ SubgraphNode thatNode = thatIter.next();
+ if (!thisNode.getLocation().equals(thatNode.getLocation())) return false;
+ if (!thisNode.getProperties().equals(thatNode.getProperties())) return false;
+ if (!thisNode.getChildren().equals(thatNode.getChildren())) return false;
+ }
+ if (thisIter.hasNext() || thatIter.hasNext()) return false;
+ return true;
+ }
+ return false;
+ }
+
@Override
public String toString() {
return "Subgraph\n" + getToString(getContext());
Index: modeshape-graph/src/main/java/org/modeshape/graph/ModeShapeLexicon.java
===================================================================
--- modeshape-graph/src/main/java/org/modeshape/graph/ModeShapeLexicon.java (revision 1989)
+++ modeshape-graph/src/main/java/org/modeshape/graph/ModeShapeLexicon.java (working copy)
@@ -48,7 +48,12 @@ public class ModeShapeLexicon {
public static final Name RESOURCE = new BasicName(Namespace.URI, "resource");
public static final Name ROOT = new BasicName(Namespace.URI, "root");
public static final Name TIME_TO_EXPIRE = new BasicName(Namespace.URI, "timeToExpire");
- public static final Name NAMESPACE_URI = new BasicName(Namespace.URI, "uri");
+ public static final Name URI = new BasicName(Namespace.URI, "uri");
+ /**
+ * @deprecated Use {@link #URI} instead.
+ */
+ @Deprecated
+ public static final Name NAMESPACE_URI = URI;
public static final Name WORKSPACES = new BasicName(Namespace.URI, "workspaces");
public static final Name SOURCE_NAME = new BasicName(Namespace.URI, "source");
Index: modeshape-graph/src/main/java/org/modeshape/graph/observe/LocalObservationBus.java
new file mode 100644
===================================================================
--- /dev/null (revision 1989)
+++ modeshape-graph/src/main/java/org/modeshape/graph/observe/LocalObservationBus.java (working copy)
@@ -0,0 +1,103 @@
+/*
+ * ModeShape (http://www.modeshape.org)
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * See the AUTHORS.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * ModeShape is free software. Unless otherwise indicated, all code in ModeShape
+ * is licensed to you under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * ModeShape is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.modeshape.graph.observe;
+
+import net.jcip.annotations.ThreadSafe;
+import org.modeshape.common.util.Logger;
+import org.modeshape.graph.GraphI18n;
+
+/**
+ * A simple {@link Observer} that is itself {@link Observable}. This class essentially multiplexes the events from a single
+ * Observable to disseminate each event to multiple Observers.
+ */
+@ThreadSafe
+public class LocalObservationBus implements ObservationBus {
+ private final ChangeObservers observers = new ChangeObservers();
+ private static final Logger LOGGER = Logger.getLogger(LocalObservationBus.class);
+
+ public LocalObservationBus() {
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.modeshape.graph.observe.ObservationBus#start()
+ */
+ public void start() {
+ // nothing to do
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.modeshape.graph.observe.Observable#register(org.modeshape.graph.observe.Observer)
+ */
+ public boolean register( Observer observer ) {
+ if (observer == null) return false;
+ return observers.register(observer);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.modeshape.graph.observe.Observable#unregister(org.modeshape.graph.observe.Observer)
+ */
+ public boolean unregister( Observer observer ) {
+ return observers.unregister(observer);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.modeshape.graph.observe.Observer#notify(org.modeshape.graph.observe.Changes)
+ */
+ public void notify( Changes changes ) {
+ if (changes != null) {
+ // Broadcast the changes to the registered observers ...
+ try {
+ observers.broadcast(changes);
+ } catch (RuntimeException t) {
+ LOGGER.error(t, GraphI18n.errorNotifyingObserver, t.getLocalizedMessage());
+ }
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.modeshape.graph.observe.ObservationBus#hasObservers()
+ */
+ public boolean hasObservers() {
+ return !observers.isEmpty();
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.modeshape.graph.observe.ObservationBus#shutdown()
+ */
+ public void shutdown() {
+ observers.shutdown();
+ }
+}
Index: modeshape-graph/src/main/java/org/modeshape/graph/observe/ObservationBus.java
===================================================================
--- modeshape-graph/src/main/java/org/modeshape/graph/observe/ObservationBus.java (revision 1989)
+++ modeshape-graph/src/main/java/org/modeshape/graph/observe/ObservationBus.java (working copy)
@@ -24,69 +24,28 @@
package org.modeshape.graph.observe;
import net.jcip.annotations.ThreadSafe;
-import org.modeshape.common.util.Logger;
-import org.modeshape.graph.GraphI18n;
/**
- * A simple {@link Observer} that is itself {@link Observable}. This class essentially multiplexes the events from a single
+ * A simple {@link Observer} that is itself {@link Observable}. This interface essentially multiplexes the events from a single
* Observable to disseminate each event to multiple Observers.
*/
@ThreadSafe
-public class ObservationBus implements Observable, Observer {
- private final ChangeObservers observers = new ChangeObservers();
- private static final Logger LOGGER = Logger.getLogger(ObservationBus.class);
-
- public ObservationBus() {
- }
-
- /**
- * {@inheritDoc}
- *
- * @see org.modeshape.graph.observe.Observable#register(org.modeshape.graph.observe.Observer)
- */
- public boolean register( Observer observer ) {
- if (observer == null) return false;
- return observers.register(observer);
- }
+public interface ObservationBus extends Observable, Observer {
/**
- * {@inheritDoc}
- *
- * @see org.modeshape.graph.observe.Observable#unregister(org.modeshape.graph.observe.Observer)
- */
- public boolean unregister( Observer observer ) {
- return observers.unregister(observer);
- }
-
- /**
- * {@inheritDoc}
- *
- * @see org.modeshape.graph.observe.Observer#notify(org.modeshape.graph.observe.Changes)
+ * Prepare this bus for operation by starting any resources.
*/
- public void notify( Changes changes ) {
- if (changes != null) {
- // Broadcast the changes to the registered observers ...
- try {
- observers.broadcast(changes);
- } catch (RuntimeException t) {
- LOGGER.error(t, GraphI18n.errorNotifyingObserver, t.getLocalizedMessage());
- }
- }
- }
+ public void start();
/**
* Determine whether this particular bus currently has any observers.
*
* @return true if there is at least one observer, or false otherwise
*/
- public boolean hasObservers() {
- return !observers.isEmpty();
- }
+ public boolean hasObservers();
/**
* Unregister all registered observers, and mark this as no longer accepting new registered observers.
*/
- public void shutdown() {
- observers.shutdown();
- }
+ public void shutdown();
}
Index: modeshape-graph/src/main/java/org/modeshape/graph/property/basic/GraphNamespaceRegistry.java
===================================================================
--- modeshape-graph/src/main/java/org/modeshape/graph/property/basic/GraphNamespaceRegistry.java (revision 1989)
+++ modeshape-graph/src/main/java/org/modeshape/graph/property/basic/GraphNamespaceRegistry.java (working copy)
@@ -53,7 +53,7 @@ import org.modeshape.graph.property.ValueFactory;
@NotThreadSafe
public class GraphNamespaceRegistry implements NamespaceRegistry {
- public static final Name DEFAULT_URI_PROPERTY_NAME = ModeShapeLexicon.NAMESPACE_URI;
+ public static final Name DEFAULT_URI_PROPERTY_NAME = ModeShapeLexicon.URI;
public static final String GENERATED_PREFIX = "ns";
private SimpleNamespaceRegistry cache;
Index: modeshape-graph/src/main/java/org/modeshape/graph/request/processor/RequestProcessor.java
===================================================================
--- modeshape-graph/src/main/java/org/modeshape/graph/request/processor/RequestProcessor.java (revision 1989)
+++ modeshape-graph/src/main/java/org/modeshape/graph/request/processor/RequestProcessor.java (working copy)
@@ -1026,7 +1026,7 @@ public abstract class RequestProcessor {
String userName = context.getSecurityContext() != null ? context.getSecurityContext().getUserName() : null;
if (userName == null) userName = "";
String contextId = context.getId();
- String processId = null;
+ String processId = context.getProcessId();
Map userData = context.getData();
Changes changes = new Changes(processId, contextId, userName, getSourceName(), getNowInUtc(), this.changes, userData);
observer.notify(changes);
Index: modeshape-integration-tests/pom.xml
===================================================================
--- modeshape-integration-tests/pom.xml (revision 1989)
+++ modeshape-integration-tests/pom.xml (working copy)
@@ -41,7 +41,7 @@
+ -->
junit
junit
@@ -75,6 +75,12 @@
test
+
+ org.modeshape
+ modeshape-clustering
+ ${project.version}
+ test
+
org.modeshape
modeshape-connector-jbosscache
Index: modeshape-integration-tests/src/test/java/org/modeshape/test/integration/ClusteringTest.java
new file mode 100644
===================================================================
--- /dev/null (revision 1989)
+++ modeshape-integration-tests/src/test/java/org/modeshape/test/integration/ClusteringTest.java (working copy)
@@ -0,0 +1,317 @@
+/*
+ * ModeShape (http://www.modeshape.org)
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * See the AUTHORS.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * ModeShape is free software. Unless otherwise indicated, all code in ModeShape
+ * is licensed to you under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * ModeShape is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.modeshape.test.integration;
+
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsNull.notNullValue;
+import static org.junit.Assert.assertThat;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import javax.jcr.ImportUUIDBehavior;
+import javax.jcr.Node;
+import javax.jcr.Repository;
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+import javax.jcr.UnsupportedRepositoryOperationException;
+import javax.jcr.observation.Event;
+import javax.jcr.observation.EventIterator;
+import javax.jcr.observation.EventListener;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.modeshape.common.util.FileUtil;
+import org.modeshape.connector.store.jpa.JpaSource;
+import org.modeshape.jcr.JcrConfiguration;
+import org.modeshape.jcr.JcrEngine;
+import org.modeshape.jcr.ModeShapeRoles;
+import org.modeshape.jcr.JcrRepository.Option;
+
+public class ClusteringTest {
+
+ private static JcrConfiguration configuration;
+ private static JcrEngine engine1;
+ private static JcrEngine engine2;
+ private static JcrEngine engine3;
+ private static List sessions = new ArrayList();
+
+ @BeforeClass
+ public static void beforeEach() throws Exception {
+ // Delete the database files, if there are any ...
+ FileUtil.delete("target/db");
+
+ // Set up the database and keep this connection open ...
+ String url = "jdbc:hsqldb:file:target/db/ClusteredObservationBusTest";
+ String username = "sa";
+ String password = "";
+
+ configuration = new JcrConfiguration();
+ configuration.repositorySource("car-source")
+ .usingClass(JpaSource.class)
+ .setDescription("The automobile content")
+ .setProperty("dialect", "org.hibernate.dialect.HSQLDialect")
+ .setProperty("driverClassName", "org.hsqldb.jdbcDriver")
+ .setProperty("url", url)
+ .setProperty("username", username)
+ .setProperty("password", password)
+ .setProperty("maximumConnectionsInPool", "2")
+ .setProperty("minimumConnectionsInPool", "1")
+ .setProperty("numberOfConnectionsToAcquireAsNeeded", "1")
+ .setProperty("maximumSizeOfStatementCache", "100")
+ .setProperty("maximumConnectionIdleTimeInSeconds", "10")
+ .setProperty("referentialIntegrityEnforced", "true")
+ .setProperty("largeValueSizeInBytes", "150")
+ .setProperty("autoGenerateSchema", "update")
+ .setProperty("retryLimit", "3")
+ .setProperty("showSql", "false");
+ configuration.repository("cars")
+ .setSource("car-source")
+ .registerNamespace("car", "http://www.modeshape.org/examples/cars/1.0")
+ .addNodeTypes(resourceUrl("cars.cnd"))
+ .setOption(Option.ANONYMOUS_USER_ROLES, ModeShapeRoles.ADMIN);
+ configuration.clustering().setProperty("clusterName", "MyCluster");// .setProperty("configuration", "");
+
+ // Create an engine and use it to populate the source ...
+ engine1 = configuration.build();
+ engine1.start();
+
+ Repository repository = engine1.getRepository("cars");
+
+ // Use a session to load the contents ...
+ Session session = repository.login();
+ try {
+ InputStream stream = resourceStream("io/cars-system-view.xml");
+ try {
+ session.getWorkspace().importXML("/", stream, ImportUUIDBehavior.IMPORT_UUID_CREATE_NEW);
+ } catch (Throwable t) {
+ t.printStackTrace();
+ } finally {
+ stream.close();
+ }
+ } finally {
+ session.logout();
+ }
+
+ // Make sure the data was imported ...
+ session = repository.login();
+ try {
+ Node node = session.getRootNode().getNode("Cars/Hybrid/Toyota Highlander");
+ assertThat(node, is(notNullValue()));
+ assertThat(session.getRootNode().getNodes().getSize(), is(2L)); // "Cars" and "jcr:system"
+ } finally {
+ session.logout();
+ }
+
+ // Start the other engines ...
+ engine2 = configuration.build();
+ engine2.start();
+
+ engine3 = configuration.build();
+ engine3.start();
+ }
+
+ @AfterClass
+ public static void afterAll() throws Exception {
+ // Close all of the sessions ...
+ for (Session session : sessions) {
+ if (session.isLive()) session.logout();
+ }
+ sessions.clear();
+
+ // Shut down the engines ...
+ if (engine1 != null) engine1.shutdown();
+ if (engine2 != null) engine2.shutdown();
+ if (engine3 != null) engine3.shutdown();
+ engine1 = null;
+ engine2 = null;
+ engine3 = null;
+ }
+
+ // ----------------------------------------------------------------------------------------------------------------
+ // Tests
+ // ----------------------------------------------------------------------------------------------------------------
+
+ @Test
+ public void shouldAllowMultipleEnginesToAccessSameDatabase() throws Exception {
+ Session session1 = sessionFrom(engine1);
+ Node node1 = session1.getRootNode().getNode("Cars/Hybrid/Toyota Highlander");
+ assertThat(node1, is(notNullValue()));
+
+ Session session2 = sessionFrom(engine2);
+ Node node = session2.getRootNode().getNode("Cars/Hybrid/Toyota Highlander");
+ assertThat(node, is(notNullValue()));
+ }
+
+ @Test
+ public void shouldReceiveNotificationsFromAllEnginesWhenChangingContentInOne() throws Exception {
+ Session session1 = sessionFrom(engine1);
+ Session session2 = sessionFrom(engine2);
+ Session session3 = sessionFrom(engine3);
+
+ int eventTypes = Event.NODE_ADDED | Event.NODE_REMOVED; // |Event.PROPERTY_ADDED|Event.PROPERTY_CHANGED|Event.PROPERTY_REMOVED
+ CustomListener listener1 = addListenerTo(session1, eventTypes, 1);
+ CustomListener listener2 = addListenerTo(session2, eventTypes, 1);
+ CustomListener listener3 = addListenerTo(session3, eventTypes, 1);
+ CustomListener remoteListener1 = addRemoteListenerTo(session1, eventTypes, 0);
+ CustomListener remoteListener2 = addRemoteListenerTo(session2, eventTypes, 1);
+ CustomListener remoteListener3 = addRemoteListenerTo(session2, eventTypes, 1);
+
+ // Make some changes ...
+ session1.getRootNode().addNode("SomeNewNode");
+ session1.save();
+
+ // Wait for all the listeners ...
+ listener1.await();
+ listener2.await();
+ listener3.await();
+ remoteListener1.await();
+ remoteListener2.await();
+ remoteListener3.await();
+
+ // Disconnect the listeners ...
+ listener1.disconnect();
+ listener2.disconnect();
+ listener3.disconnect();
+ remoteListener1.disconnect();
+ remoteListener2.disconnect();
+ remoteListener3.disconnect();
+
+ // Now check the events ...
+ listener1.checkObservedEvents();
+ listener2.checkObservedEvents();
+ listener3.checkObservedEvents();
+ remoteListener1.checkObservedEvents();
+ remoteListener2.checkObservedEvents();
+ remoteListener3.checkObservedEvents();
+ }
+
+ // ----------------------------------------------------------------------------------------------------------------
+ // Utility Methods
+ // ----------------------------------------------------------------------------------------------------------------
+
+ protected Session sessionFrom( JcrEngine engine ) throws RepositoryException {
+ Repository repository = engine.getRepository("cars");
+ Session session = repository.login();
+ sessions.add(session);
+ return session;
+ }
+
+ /**
+ * Add a listener for only remote events.
+ *
+ * @param session the session
+ * @param eventTypes the type of events
+ * @param expectedEventCount the number of expected events
+ * @return the listener
+ * @throws UnsupportedRepositoryOperationException
+ * @throws RepositoryException
+ */
+ protected CustomListener addRemoteListenerTo( Session session,
+ int eventTypes,
+ int expectedEventCount )
+ throws UnsupportedRepositoryOperationException, RepositoryException {
+ CustomListener listener = new CustomListener(session, expectedEventCount);
+ session.getWorkspace().getObservationManager().addEventListener(listener, eventTypes, null, true, null, null, true);
+ return listener;
+ }
+
+ /**
+ * Add a listener for local and remote events.
+ *
+ * @param session the session
+ * @param eventTypes the type of events
+ * @param expectedEventCount the number of expected events
+ * @return the listener
+ * @throws UnsupportedRepositoryOperationException
+ * @throws RepositoryException
+ */
+ protected CustomListener addListenerTo( Session session,
+ int eventTypes,
+ int expectedEventCount )
+ throws UnsupportedRepositoryOperationException, RepositoryException {
+ CustomListener listener = new CustomListener(session, expectedEventCount);
+ session.getWorkspace().getObservationManager().addEventListener(listener, eventTypes, null, true, null, null, false);
+ return listener;
+ }
+
+ protected static URL resourceUrl( String name ) {
+ return ClusteringTest.class.getClassLoader().getResource(name);
+ }
+
+ protected static InputStream resourceStream( String name ) {
+ return ClusteringTest.class.getClassLoader().getResourceAsStream(name);
+ }
+
+ protected static class CustomListener implements EventListener {
+ private final List receivedEvents = new ArrayList();
+ private final int expectedEventCount;
+ private final CountDownLatch latch;
+ private final Session session;
+
+ protected CustomListener( Session session,
+ int expectedEventCount ) {
+ this.latch = new CountDownLatch(expectedEventCount);
+ this.expectedEventCount = expectedEventCount;
+ this.session = session;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see javax.jcr.observation.EventListener#onEvent(javax.jcr.observation.EventIterator)
+ */
+ @Override
+ public void onEvent( EventIterator events ) {
+ while (events.hasNext()) {
+ receivedEvents.add(events.nextEvent());
+ latch.countDown();
+ }
+ }
+
+ public void await() throws InterruptedException {
+ latch.await(3, TimeUnit.SECONDS);
+ }
+
+ public List getObservedEvents() {
+ return receivedEvents;
+ }
+
+ public void checkObservedEvents() {
+ StringBuilder msg = new StringBuilder("Expected ");
+ msg.append(expectedEventCount);
+ msg.append(" events but received ");
+ msg.append(receivedEvents.size());
+ msg.append(": ");
+ msg.append(receivedEvents);
+ assertThat(msg.toString(), receivedEvents.size(), is(expectedEventCount));
+ }
+
+ public void disconnect() throws RepositoryException {
+ this.session.getWorkspace().getObservationManager().removeEventListener(this);
+ }
+ }
+}
Index: modeshape-jcr/src/main/java/org/modeshape/jcr/JcrEngine.java
===================================================================
--- modeshape-jcr/src/main/java/org/modeshape/jcr/JcrEngine.java (revision 1989)
+++ modeshape-jcr/src/main/java/org/modeshape/jcr/JcrEngine.java (working copy)
@@ -118,9 +118,9 @@ public class JcrEngine extends ModeShapeEngine implements Repositories {
}
@Override
- public void shutdown() {
+ protected void preShutdown() {
scheduler.shutdown();
- super.shutdown();
+ super.preShutdown();
try {
this.repositoriesLock.lock();
@@ -253,7 +253,7 @@ public class JcrEngine extends ModeShapeEngine implements Repositories {
Node namespacesNode = subgraph.getNode(ModeShapeLexicon.NAMESPACES);
if (namespacesNode != null) {
GraphNamespaceRegistry registry = new GraphNamespaceRegistry(configuration, namespacesNode.getLocation().getPath(),
- ModeShapeLexicon.NAMESPACE_URI);
+ ModeShapeLexicon.URI);
context = context.with(registry);
}
Index: modeshape-jcr/src/main/java/org/modeshape/jcr/JcrRepository.java
===================================================================
--- modeshape-jcr/src/main/java/org/modeshape/jcr/JcrRepository.java (revision 1989)
+++ modeshape-jcr/src/main/java/org/modeshape/jcr/JcrRepository.java (working copy)
@@ -540,7 +540,7 @@ public class JcrRepository implements Repository {
// Create the namespace registry and corresponding execution context.
// Note that this persistent registry has direct access to the system workspace.
- Name uriProperty = ModeShapeLexicon.NAMESPACE_URI;
+ Name uriProperty = ModeShapeLexicon.URI;
PathFactory pathFactory = executionContext.getValueFactories().getPathFactory();
Path systemPath = pathFactory.create(JcrLexicon.SYSTEM);
Path namespacesPath = pathFactory.create(systemPath, ModeShapeLexicon.NAMESPACES);
@@ -1612,6 +1612,8 @@ public class JcrRepository implements Repository {
private final Observable repositoryObservable;
private final String sourceName;
private final String systemSourceName;
+ private final String repositorySourceName;
+ private final String processId;
/**
* @param repositoryObservable the repository library observable this observer should register with
@@ -1621,6 +1623,8 @@ public class JcrRepository implements Repository {
this.repositoryObservable.register(this);
this.sourceName = getObservableSourceName();
this.systemSourceName = getSystemSourceName();
+ this.repositorySourceName = getRepositorySourceName();
+ this.processId = getExecutionContext().getProcessId();
}
/**
@@ -1628,10 +1632,9 @@ public class JcrRepository implements Repository {
*
* @see org.modeshape.graph.observe.Observer#notify(org.modeshape.graph.observe.Changes)
*/
- public void notify( final Changes changes ) {
- // We only care about events that come from the repository source or the system source ...
- String changedSourceName = changes.getSourceName();
- if (sourceName.equals(changedSourceName) || systemSourceName.equals(changedSourceName)) {
+ public void notify( Changes changes ) {
+ final Changes acceptableChanges = filter(changes);
+ if (acceptableChanges != null) {
// We're still in the thread where the connector published its changes,
// so we need to create a runnable that will send these changes to all
@@ -1647,7 +1650,7 @@ public class JcrRepository implements Repository {
while (observerIterator.hasNext()) {
Observer observer = observerIterator.next();
assert observer != null;
- observer.notify(changes);
+ observer.notify(acceptableChanges);
}
}
};
@@ -1657,6 +1660,33 @@ public class JcrRepository implements Repository {
}
}
+ private Changes filter( Changes changes ) {
+ // We only care about events that come from the repository source, the system source,
+ // or the repository source name (for remote events only) ...
+ String changedSourceName = changes.getSourceName();
+ if (sourceName.equals(changedSourceName)) {
+ // These are changes made locally by this repository ...
+ return changes;
+ }
+ if (systemSourceName.equals(changedSourceName)) {
+ // These are changes made locally by this repository ...
+ return changes;
+ }
+ if (repositorySourceName.equals(changedSourceName)) {
+ // These may be events generated locally or from a remote engine in the cluster ...
+ if (this.processId.equals(changes.getProcessId())) {
+ // These events were made locally and are being handled above, so we can ignore these ...
+ return null;
+ }
+ // Otherwise, the changes were recieved from another engine in the cluster and
+ // we do want to respond to these changes. However, the source name of the changes
+ // needs to be altered to match the 'sourceName' ...
+ return new Changes(changes.getProcessId(), changes.getContextId(), changes.getUserName(), sourceName,
+ changes.getTimestamp(), changes.getChangeRequests(), changes.getData());
+ }
+ return null;
+ }
+
/**
* {@inheritDoc}
*
Index: modeshape-jcr/src/main/java/org/modeshape/jcr/ModeShapeLexicon.java
===================================================================
--- modeshape-jcr/src/main/java/org/modeshape/jcr/ModeShapeLexicon.java (revision 1989)
+++ modeshape-jcr/src/main/java/org/modeshape/jcr/ModeShapeLexicon.java (working copy)
@@ -36,16 +36,15 @@ public class ModeShapeLexicon extends org.modeshape.repository.ModeShapeLexicon
public static final Name BASE = new BasicName(Namespace.URI, "base");
public static final Name EXPIRATION_DATE = new BasicName(Namespace.URI, "expirationDate");
public static final Name IS_HELD_BY_SESSION = new BasicName(Namespace.URI, "isHeldBySession");
- public static final Name IS_SESSION_SCOPED = new BasicName(Namespace.URI, "isSessionScoped");
- public static final Name LOCK = new BasicName(Namespace.URI, "lock");
+ public static final Name IS_SESSION_SCOPED = new BasicName(Namespace.URI, "isSessionScoped");
+ public static final Name LOCK = new BasicName(Namespace.URI, "lock");
public static final Name LOCKED_UUID = new BasicName(Namespace.URI, "lockedUuid");
public static final Name LOCKING_SESSION = new BasicName(Namespace.URI, "lockingSession");
- public static final Name LOCKS = new BasicName(Namespace.URI, "locks");
+ public static final Name LOCKS = new BasicName(Namespace.URI, "locks");
public static final Name NAMESPACE = new BasicName(Namespace.URI, "namespace");
public static final Name NODE_TYPES = new BasicName(Namespace.URI, "nodeTypes");
public static final Name REPOSITORIES = new BasicName(Namespace.URI, "repositories");
public static final Name SYSTEM = new BasicName(Namespace.URI, "system");
- public static final Name URI = new BasicName(Namespace.URI, "uri");
public static final Name VERSION_STORAGE = new BasicName(Namespace.URI, "versionStorage");
- public static final Name WORKSPACE = new BasicName(Namespace.URI, "workspace");
+ public static final Name WORKSPACE = new BasicName(Namespace.URI, "workspace");
}
Index: modeshape-jcr/src/test/java/org/modeshape/jcr/JcrConfigurationTest.java
===================================================================
--- modeshape-jcr/src/test/java/org/modeshape/jcr/JcrConfigurationTest.java (revision 1989)
+++ modeshape-jcr/src/test/java/org/modeshape/jcr/JcrConfigurationTest.java (working copy)
@@ -383,7 +383,7 @@ public class JcrConfigurationTest {
assertThat(subgraph.getNode("/mode:repositories/Car Repository/mode:namespaces").getChildren(),
hasChild(segment("modetest")));
assertThat(subgraph.getNode("/mode:repositories/Car Repository/mode:namespaces/modetest"),
- hasProperty(ModeShapeLexicon.NAMESPACE_URI, "http://www.modeshape.org/test/1.0"));
+ hasProperty(ModeShapeLexicon.URI, "http://www.modeshape.org/test/1.0"));
// Initialize IDTrust and a policy file (which defines the "modeshape-jcr" login config name)
String configFile = "security/jaas.conf.xml";
Index: modeshape-repository/src/main/java/org/modeshape/repository/ModeShapeConfiguration.java
===================================================================
--- modeshape-repository/src/main/java/org/modeshape/repository/ModeShapeConfiguration.java (revision 1989)
+++ modeshape-repository/src/main/java/org/modeshape/repository/ModeShapeConfiguration.java (working copy)
@@ -56,6 +56,7 @@ import org.modeshape.graph.Workspace;
import org.modeshape.graph.connector.RepositorySource;
import org.modeshape.graph.connector.inmemory.InMemoryRepositorySource;
import org.modeshape.graph.mimetype.MimeTypeDetector;
+import org.modeshape.graph.observe.ObservationBus;
import org.modeshape.graph.property.Name;
import org.modeshape.graph.property.NamespaceRegistry;
import org.modeshape.graph.property.Path;
@@ -101,6 +102,7 @@ public class ModeShapeConfiguration {
private final Map> sequencerDefinitions = new HashMap>();
private final Map> repositorySourceDefinitions = new HashMap>();
private final Map> mimeTypeDetectorDefinitions = new HashMap>();
+ private ClusterDefinition extends ModeShapeConfiguration> clusterDefinition;
/**
* Create a new configuration, using a default-constructed {@link ExecutionContext}.
@@ -283,8 +285,9 @@ public class ModeShapeConfiguration {
graph.importXmlFrom(configurationFileInputStream).skippingRootElement(true).into(pathToParent);
// The file was imported successfully, so now create the content information ...
- configurationContent = new ConfigurationDefinition(configurationContent.getName(), source, null, pathToParent, context,
- null);
+ configurationContent = configurationContent.with(pathToParent)
+ .with(source)
+ .withWorkspace(source.getDefaultWorkspaceName());
return this;
}
@@ -348,6 +351,8 @@ public class ModeShapeConfiguration {
workspace = graph.createWorkspace().named(workspaceName);
}
assert workspace.getRoot() != null;
+ } else {
+ workspaceName = graph.getCurrentWorkspaceName(); // will be the default
}
// Verify the path ...
@@ -356,8 +361,7 @@ public class ModeShapeConfiguration {
assert parent != null;
// Now create the content information ...
- configurationContent = new ConfigurationDefinition(configurationContent.getName(), source, workspaceName, path, context,
- null);
+ configurationContent = configurationContent.with(source).withWorkspace(workspaceName).with(path);
return this;
}
@@ -675,6 +679,15 @@ public class ModeShapeConfiguration {
}
/**
+ * Obtain the definition for this engine's clustering. If no clustering definition exists, one will be created.
+ *
+ * @return the clustering definition; never null
+ */
+ public ClusterDefinition extends ModeShapeConfiguration> clustering() {
+ return clusterDefinition(this);
+ }
+
+ /**
* Convenience method to make the code that sets up this configuration easier to read. This method simply returns this object.
*
* @return this configuration component; never null
@@ -950,6 +963,15 @@ public class ModeShapeConfiguration {
}
/**
+ * Interface used to set up and define the cluster configuration.
+ *
+ * @param the type of the configuration component that owns this definition object
+ */
+ public interface ClusterDefinition
+ extends Returnable, SetProperties>, Removable {
+ }
+
+ /**
* Interface used to set up and define a MIME type detector instance.
*
* @param the type of the configuration component that owns this definition object
@@ -1094,6 +1116,21 @@ public class ModeShapeConfiguration {
return definition;
}
+ /**
+ * Utility method to construct a definition object for the clustering with the supplied name and return type.
+ *
+ * @param the type of the return object
+ * @param returnObject the return object
+ * @return the definition for the clustering; never null
+ */
+ @SuppressWarnings( "unchecked" )
+ protected ClusterDefinition clusterDefinition( ReturnType returnObject ) {
+ if (clusterDefinition == null) {
+ clusterDefinition = new ClusterBuilder(returnObject, changes(), path(), ModeShapeLexicon.CLUSTERING);
+ }
+ return (ClusterDefinition)clusterDefinition;
+ }
+
protected static class BaseReturnable implements Returnable {
protected final ReturnType returnObject;
@@ -1463,6 +1500,23 @@ public class ModeShapeConfiguration {
}
}
+ protected static class ClusterBuilder
+ extends GraphComponentBuilder, ObservationBus>
+ implements ClusterDefinition {
+
+ protected ClusterBuilder( ReturnType returnObject,
+ Graph.Batch batch,
+ Path path,
+ Name... names ) {
+ super(returnObject, batch, path, names);
+ }
+
+ @Override
+ protected ClusterDefinition thisType() {
+ return this;
+ }
+ }
+
/**
* Representation of the current configuration content.
*/
@@ -1483,6 +1537,7 @@ public class ModeShapeConfiguration {
ExecutionContext context,
ClassLoaderFactory classLoaderFactory ) {
assert configurationName != null;
+ assert source != null;
this.name = configurationName;
this.source = source;
this.path = path != null ? path : RootPath.INSTANCE;
@@ -1581,6 +1636,18 @@ public class ModeShapeConfiguration {
* @return the new configuration
*/
public ConfigurationDefinition with( ClassLoaderFactory classLoaderFactory ) {
+ CheckArg.isNotNull(source, "source");
+ return new ConfigurationDefinition(name, source, workspace, path, context, classLoaderFactory);
+ }
+
+ /**
+ * Return a copy of this configuration that uses the supplied repository source instead of this object's
+ * {@link #getRepositorySource() repository source}.
+ *
+ * @param source the repository source containing the configuration
+ * @return the new configuration
+ */
+ public ConfigurationDefinition with( RepositorySource source ) {
return new ConfigurationDefinition(name, source, workspace, path, context, classLoaderFactory);
}
Index: modeshape-repository/src/main/java/org/modeshape/repository/ModeShapeEngine.java
===================================================================
--- modeshape-repository/src/main/java/org/modeshape/repository/ModeShapeEngine.java (revision 1989)
+++ modeshape-repository/src/main/java/org/modeshape/repository/ModeShapeEngine.java (working copy)
@@ -21,6 +21,17 @@
*/
package org.modeshape.repository;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import net.jcip.annotations.Immutable;
import org.modeshape.common.collection.Problem;
import org.modeshape.common.collection.Problems;
@@ -44,28 +55,19 @@ import org.modeshape.graph.mimetype.ExtensionBasedMimeTypeDetector;
import org.modeshape.graph.mimetype.MimeTypeDetector;
import org.modeshape.graph.mimetype.MimeTypeDetectorConfig;
import org.modeshape.graph.mimetype.MimeTypeDetectors;
-import org.modeshape.graph.observe.ObservationBus;
import org.modeshape.graph.property.Name;
import org.modeshape.graph.property.Path;
import org.modeshape.graph.property.PathExpression;
import org.modeshape.graph.property.PathNotFoundException;
import org.modeshape.graph.property.Property;
+import org.modeshape.repository.cluster.ClusteringConfig;
+import org.modeshape.repository.cluster.ClusteringService;
import org.modeshape.repository.sequencer.SequencerConfig;
import org.modeshape.repository.sequencer.SequencingService;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
/**
- * A single instance of the ModeShape services, which is obtained after setting up the {@link ModeShapeConfiguration#build() configuration}.
+ * A single instance of the ModeShape services, which is obtained after setting up the {@link ModeShapeConfiguration#build()
+ * configuration}.
*
* @see ModeShapeConfiguration
*/
@@ -82,16 +84,18 @@ public class ModeShapeEngine {
private final RepositoryService repositoryService;
private final SequencingService sequencingService;
private final ExecutorService executorService;
+ private final ClusteringService clusteringService;
private final MimeTypeDetectors detectors;
+ private final String engineId = UUID.randomUUID().toString();
private static final Logger LOGGER = Logger.getLogger(ModeShapeEngine.class);
protected ModeShapeEngine( ExecutionContext context,
- ModeShapeConfiguration.ConfigurationDefinition configuration ) {
+ ModeShapeConfiguration.ConfigurationDefinition configuration ) {
this.problems = new SimpleProblems();
// Use the configuration's context ...
this.detectors = new MimeTypeDetectors();
- this.context = context.with(detectors);
+ this.context = context.with(detectors).with(engineId);
// And set up the scanner ...
this.configuration = configuration;
@@ -105,23 +109,24 @@ public class ModeShapeEngine {
detectors.addDetector(new MimeTypeDetectorConfig("ExtensionDetector", "Extension-based MIME type detector",
ExtensionBasedMimeTypeDetector.class));
- // Create the RepositoryContext that the configuration repository source should use ...
- ObservationBus configurationChangeBus = new ObservationBus();
- RepositoryContext configContext = new SimpleRepositoryContext(context, configurationChangeBus, null);
- final RepositorySource configSource = this.configuration.getRepositorySource();
- configSource.initialize(configContext);
+ // Create the clustering service ...
+ ClusteringConfig clusterConfig = scanner.getClusteringConfiguration();
+ clusteringService = new ClusteringService();
+ clusteringService.setExecutionContext(context);
+ clusteringService.setClusteringConfig(clusterConfig);
// Create the RepositoryService, pointing it to the configuration repository ...
Path pathToConfigurationRoot = this.configuration.getPath();
String configWorkspaceName = this.configuration.getWorkspace();
- repositoryService = new RepositoryService(configSource, configWorkspaceName, pathToConfigurationRoot, context, problems);
+ RepositorySource configSource = this.configuration.getRepositorySource();
+ repositoryService = new RepositoryService(configSource, configWorkspaceName, pathToConfigurationRoot, context,
+ clusteringService, problems);
- // Now register the repository service to be notified of changes to the configuration ...
- configurationChangeBus.register(repositoryService);
+ // Create the executor service (which starts out with 0 threads, so it's okay to do here) ...
+ ThreadFactory threadPoolFactory = new NamedThreadFactory(configuration.getName());
+ executorService = Executors.newCachedThreadPool(threadPoolFactory);
// Create the sequencing service ...
- ThreadFactory threadPoolFactory = new NamedThreadFactory(configuration.getName());
- executorService = Executors.newScheduledThreadPool(10, threadPoolFactory);
sequencingService = new SequencingService();
sequencingService.setExecutionContext(context);
sequencingService.setExecutorService(executorService);
@@ -129,8 +134,9 @@ public class ModeShapeEngine {
for (SequencerConfig sequencerConfig : scanner.getSequencingConfigurations()) {
sequencingService.addSequencer(sequencerConfig);
}
- }
+ // The rest of the instantiation/configuration will be done in start()
+ }
/**
* Get the problems that were encountered when setting up this engine from the configuration.
@@ -270,8 +276,18 @@ public class ModeShapeEngine {
// Then throw an exception ...
throw new IllegalStateException(RepositoryI18n.errorsPreventStarting.text());
}
+
+ // Create the RepositoryContext that the configuration repository source should use ...
+ RepositoryContext configContext = new SimpleRepositoryContext(context, clusteringService, null);
+ configuration.getRepositorySource().initialize(configContext);
+
+ // Start the various services ...
+ clusteringService.getAdministrator().start();
repositoryService.getAdministrator().start();
sequencingService.getAdministrator().start();
+
+ // Now register the repository service to be notified of changes to the configuration ...
+ clusteringService.register(repositoryService);
}
/**
@@ -282,17 +298,27 @@ public class ModeShapeEngine {
* @see #start()
*/
public void shutdown() {
- // Then terminate the executor service, which may be running background jobs that are not yet completed
+ preShutdown();
+ postShutdown();
+ }
+
+ protected void preShutdown() {
+ // Terminate the executor service, which may be running background jobs that are not yet completed
// and which will prevent new jobs being submitted (to the sequencing service) ...
executorService.shutdown();
- // First, shutdown the sequencing service, which will prevent any additional jobs from going through ...
+ // Next, shutdown the sequencing service, which will prevent any additional jobs from going through ...
sequencingService.getAdministrator().shutdown();
- // Finally shut down the repository source, which closes all connections ...
+ // Shut down the repository source, which closes all connections ...
repositoryService.getAdministrator().shutdown();
}
+ protected void postShutdown() {
+ // Finally shut down the clustering service ...
+ clusteringService.shutdown();
+ }
+
/**
* Blocks until the shutdown has completed, or the timeout occurs, or the current thread is interrupted, whichever happens
* first.
@@ -387,6 +413,49 @@ public class ModeShapeEngine {
return detectors;
}
+ public ClusteringConfig getClusteringConfiguration() {
+ Graph graph = Graph.create(configurationRepository.getRepositorySource(), context);
+ Path pathToClusteringNode = context.getValueFactories().getPathFactory().create(configurationRepository.getPath(),
+ ModeShapeLexicon.CLUSTERING);
+ try {
+ Subgraph subgraph = graph.getSubgraphOfDepth(2).at(pathToClusteringNode);
+
+ Set skipProperties = new HashSet();
+ skipProperties.add(ModeShapeLexicon.DESCRIPTION);
+ skipProperties.add(ModeShapeLexicon.CLASSNAME);
+ skipProperties.add(ModeShapeLexicon.CLASSPATH);
+ Set skipNamespaces = new HashSet();
+ skipNamespaces.add(JcrLexicon.Namespace.URI);
+ skipNamespaces.add(JcrNtLexicon.Namespace.URI);
+ skipNamespaces.add(JcrMixLexicon.Namespace.URI);
+
+ Node clusterNode = subgraph.getRoot();
+ String name = stringValueOf(clusterNode);
+ String desc = stringValueOf(clusterNode, ModeShapeLexicon.DESCRIPTION);
+ String classname = stringValueOf(clusterNode, ModeShapeLexicon.CLASSNAME);
+ String[] classpath = stringValuesOf(clusterNode, ModeShapeLexicon.CLASSPATH);
+ if (classname == null || classname.trim().length() == 0) {
+ classname = "org.modeshape.clustering.ClusteredObservationBus";
+ }
+
+ Map properties = new HashMap();
+ for (Property property : clusterNode.getProperties()) {
+ Name propertyName = property.getName();
+ if (skipNamespaces.contains(propertyName.getNamespaceUri())) continue;
+ if (skipProperties.contains(propertyName)) continue;
+ if (property.isSingle()) {
+ properties.put(propertyName.getLocalName(), property.getFirstValue());
+ } else {
+ properties.put(propertyName.getLocalName(), property.getValuesAsArray());
+ }
+ }
+ return new ClusteringConfig(name, desc, properties, classname, classpath);
+ } catch (PathNotFoundException e) {
+ // no detectors registered ...
+ }
+ return null;
+ }
+
public List getSequencingConfigurations() {
List configs = new ArrayList();
Graph graph = Graph.create(configurationRepository.getRepositorySource(), context);
Index: modeshape-repository/src/main/java/org/modeshape/repository/ModeShapeLexicon.java
===================================================================
--- modeshape-repository/src/main/java/org/modeshape/repository/ModeShapeLexicon.java (revision 1989)
+++ modeshape-repository/src/main/java/org/modeshape/repository/ModeShapeLexicon.java (working copy)
@@ -48,4 +48,7 @@ public class ModeShapeLexicon extends org.modeshape.graph.ModeShapeLexicon {
public static final Name VALUE = new BasicName(Namespace.URI, "value");
public static final Name RETRY_LIMIT = new BasicName(Namespace.URI, "retryLimit");
public static final Name DEFAULT_CACHE_POLICY = new BasicName(Namespace.URI, "defaultCachePolicy");
+ public static final Name CLUSTERING = new BasicName(Namespace.URI, "clustering");
+ public static final Name CONFIGURATION = new BasicName(Namespace.URI, "configuration");
+ public static final Name CLUSTER_NAME = new BasicName(Namespace.URI, "clusterName");
}
Index: modeshape-repository/src/main/java/org/modeshape/repository/RepositoryI18n.java
===================================================================
--- modeshape-repository/src/main/java/org/modeshape/repository/RepositoryI18n.java (revision 1989)
+++ modeshape-repository/src/main/java/org/modeshape/repository/RepositoryI18n.java (working copy)
@@ -55,27 +55,14 @@ public final class RepositoryI18n {
public static I18n errorFindingPropertyNameInPropertyChangedEvent;
public static I18n errorFindingPropertyNameInPropertyRemovedEvent;
- // Rules
- public static I18n unableToObtainJsr94RuleAdministrator;
- public static I18n errorUsingJsr94RuleAdministrator;
- public static I18n unableToObtainJsr94ServiceProvider;
- public static I18n errorAddingOrUpdatingRuleSet;
- public static I18n errorRollingBackRuleSetAfterUpdateFailed;
- public static I18n errorReadingRulesAndProperties;
- public static I18n errorDeregisteringRuleSetBeforeUpdatingIt;
- public static I18n errorRecreatingRuleSet;
- public static I18n errorRemovingRuleSet;
- public static I18n errorRemovingRuleSetUponShutdown;
- public static I18n unableToFindRuleSet;
- public static I18n errorExecutingRuleSetWithGlobalsAndFacts;
- public static I18n unableToBuildRuleSetRegularExpressionPattern;
-
- public static I18n errorObtainingSessionToRepositoryWorkspace;
- public static I18n errorWritingProblemsOnRuleSet;
-
- public static I18n federationServiceName;
- public static I18n observationServiceName;
- public static I18n ruleServiceName;
+ // Repository service ...
+ public static I18n repositoryServiceName;
+
+ // Clustering service ...
+ public static I18n clusteringServiceName;
+ public static I18n unableToRegisterObserverOnUnstartedClusteringService;
+ public static I18n unableToUnregisterObserverOnUnstartedClusteringService;
+ public static I18n unableToNotifyObserversOnUnstartedClusteringService;
// Sequencing
public static I18n sequencingServiceName;
Index: modeshape-repository/src/main/java/org/modeshape/repository/RepositoryLibrary.java
===================================================================
--- modeshape-repository/src/main/java/org/modeshape/repository/RepositoryLibrary.java (revision 1989)
+++ modeshape-repository/src/main/java/org/modeshape/repository/RepositoryLibrary.java (working copy)
@@ -65,7 +65,7 @@ public class RepositoryLibrary implements RepositoryConnectionFactory, Observabl
protected class Administrator extends AbstractServiceAdministrator {
protected Administrator() {
- super(RepositoryI18n.federationServiceName, State.STARTED);
+ super(RepositoryI18n.repositoryServiceName, State.STARTED);
}
/**
@@ -109,7 +109,7 @@ public class RepositoryLibrary implements RepositoryConnectionFactory, Observabl
private final Map pools = new HashMap();
private RepositoryConnectionFactory delegate;
private final ExecutionContext executionContext;
- private final ObservationBus observationBus = new ObservationBus();
+ private final ObservationBus observationBus;
private final RepositorySource configurationSource;
private final String configurationWorkspaceName;
private final Path pathToConfigurationRoot;
@@ -121,21 +121,27 @@ public class RepositoryLibrary implements RepositoryConnectionFactory, Observabl
* @param configurationWorkspaceName the name of the workspace in the {@link RepositorySource} that is the configuration
* repository, or null if the default workspace of the source should be used (if there is one)
* @param pathToSourcesConfigurationRoot the path of the node in the configuration source repository that should be treated by
- * this service as the root of the service's configuration; if null, then "/dna:system" is used
+ * this service as the root of the service's configuration
* @param context the execution context in which this service should run
- * @throws IllegalArgumentException if the executionContextFactory
reference is null
+ * @param observationBus the {@link ObservationBus} instance that should be used for changes in the sources
+ * @throws IllegalArgumentException if any of the configurationSource
,
+ * pathToSourcesConfigurationRoot
, observationBus
, or context
references are
+ * null
*/
public RepositoryLibrary( RepositorySource configurationSource,
String configurationWorkspaceName,
Path pathToSourcesConfigurationRoot,
- final ExecutionContext context ) {
+ final ExecutionContext context,
+ ObservationBus observationBus ) {
CheckArg.isNotNull(configurationSource, "configurationSource");
CheckArg.isNotNull(context, "context");
CheckArg.isNotNull(pathToSourcesConfigurationRoot, "pathToSourcesConfigurationRoot");
+ CheckArg.isNotNull(observationBus, "observationBus");
this.executionContext = context;
this.configurationSource = configurationSource;
this.configurationWorkspaceName = configurationWorkspaceName;
this.pathToConfigurationRoot = pathToSourcesConfigurationRoot;
+ this.observationBus = observationBus;
}
/**
@@ -225,8 +231,6 @@ public class RepositoryLibrary implements RepositoryConnectionFactory, Observabl
} finally {
this.sourcesLock.readLock().unlock();
}
- // Remove all listeners ...
- this.observationBus.shutdown();
}
/**
Index: modeshape-repository/src/main/java/org/modeshape/repository/RepositoryService.java
===================================================================
--- modeshape-repository/src/main/java/org/modeshape/repository/RepositoryService.java (revision 1989)
+++ modeshape-repository/src/main/java/org/modeshape/repository/RepositoryService.java (working copy)
@@ -23,6 +23,11 @@
*/
package org.modeshape.repository;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import net.jcip.annotations.ThreadSafe;
import org.modeshape.common.collection.Problems;
import org.modeshape.common.collection.SimpleProblems;
@@ -38,6 +43,7 @@ import org.modeshape.graph.Subgraph;
import org.modeshape.graph.connector.RepositorySource;
import org.modeshape.graph.observe.Changes;
import org.modeshape.graph.observe.NetChangeObserver;
+import org.modeshape.graph.observe.ObservationBus;
import org.modeshape.graph.observe.Observer;
import org.modeshape.graph.property.Name;
import org.modeshape.graph.property.Path;
@@ -52,12 +58,6 @@ import org.modeshape.repository.service.AbstractServiceAdministrator;
import org.modeshape.repository.service.AdministeredService;
import org.modeshape.repository.service.ServiceAdministrator;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
/**
* A service that manages the {@link RepositorySource}es defined within a configuration repository.
*/
@@ -72,7 +72,7 @@ public class RepositoryService implements AdministeredService, Observer {
protected class Administrator extends AbstractServiceAdministrator {
protected Administrator() {
- super(RepositoryI18n.federationServiceName, State.PAUSED);
+ super(RepositoryI18n.repositoryServiceName, State.PAUSED);
}
/**
@@ -135,6 +135,7 @@ public class RepositoryService implements AdministeredService, Observer {
* @param pathToConfigurationRoot the path of the node in the configuration source repository that should be treated by this
* service as the root of the service's configuration; if null, then "/dna:system" is used
* @param context the execution context in which this service should run
+ * @param observationBus the {@link ObservationBus} instance that should be used for changes in the sources
* @param problems the {@link Problems} instance that this service should use to report problems starting repositories
* @throws IllegalArgumentException if the bootstrap source is null or the execution context is null
*/
@@ -142,15 +143,18 @@ public class RepositoryService implements AdministeredService, Observer {
String configurationWorkspaceName,
Path pathToConfigurationRoot,
ExecutionContext context,
+ ObservationBus observationBus,
Problems problems ) {
CheckArg.isNotNull(configurationSource, "configurationSource");
CheckArg.isNotNull(context, "context");
+ CheckArg.isNotNull(observationBus, "observationBus");
PathFactory pathFactory = context.getValueFactories().getPathFactory();
if (pathToConfigurationRoot == null) pathToConfigurationRoot = pathFactory.create("/dna:system");
if (problems == null) problems = new SimpleProblems();
Path sourcesPath = pathFactory.create(pathToConfigurationRoot, ModeShapeLexicon.SOURCES);
- this.sources = new RepositoryLibrary(configurationSource, configurationWorkspaceName, sourcesPath, context);
+ this.sources = new RepositoryLibrary(configurationSource, configurationWorkspaceName, sourcesPath, context,
+ observationBus);
this.sources.addSource(configurationSource);
this.pathToConfigurationRoot = pathToConfigurationRoot;
this.configurationSourceName = configurationSource.getName();
@@ -535,6 +539,10 @@ public class RepositoryService implements AdministeredService, Observer {
*/
@Override
protected void notify( NetChanges netChanges ) {
+ if (getConfigurationWorkspaceName() == null) {
+ // This was a transient configuration source, so it should never change ...
+ return;
+ }
if (!getConfigurationSourceName().equals(netChanges.getSourceName())) return;
for (NetChange change : netChanges.getNetChanges()) {
if (!getConfigurationWorkspaceName().equals(change.getRepositoryWorkspaceName())) return;
Index: modeshape-repository/src/main/java/org/modeshape/repository/cluster/ClusteringConfig.java
new file mode 100644
===================================================================
--- /dev/null (revision 1989)
+++ modeshape-repository/src/main/java/org/modeshape/repository/cluster/ClusteringConfig.java (working copy)
@@ -0,0 +1,59 @@
+/*
+ * ModeShape (http://www.modeshape.org)
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * See the AUTHORS.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * ModeShape is free software. Unless otherwise indicated, all code in ModeShape
+ * is licensed to you under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * ModeShape is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.modeshape.repository.cluster;
+
+import java.util.Map;
+import net.jcip.annotations.Immutable;
+import org.modeshape.common.component.ComponentConfig;
+
+/**
+ * A configuration for a cluster.
+ */
+@Immutable
+public class ClusteringConfig extends ComponentConfig {
+
+ public ClusteringConfig( String name,
+ String description,
+ String classname,
+ String[] classpath ) {
+ this(name, description, System.currentTimeMillis(), null, classname, classpath);
+ }
+
+ public ClusteringConfig( String name,
+ String description,
+ Map properties,
+ String classname,
+ String[] classpath ) {
+ this(name, description, System.currentTimeMillis(), properties, classname, classpath);
+ }
+
+ public ClusteringConfig( String name,
+ String description,
+ long timestamp,
+ Map properties,
+ String classname,
+ String[] classpath ) {
+ super(name, description, timestamp, properties, classname, classpath);
+ }
+}
Index: modeshape-repository/src/main/java/org/modeshape/repository/cluster/ClusteringService.java
new file mode 100644
===================================================================
--- /dev/null (revision 1989)
+++ modeshape-repository/src/main/java/org/modeshape/repository/cluster/ClusteringService.java (working copy)
@@ -0,0 +1,238 @@
+/*
+ * ModeShape (http://www.modeshape.org)
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * See the AUTHORS.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * ModeShape is free software. Unless otherwise indicated, all code in ModeShape
+ * is licensed to you under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * ModeShape is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.modeshape.repository.cluster;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.modeshape.common.component.ComponentLibrary;
+import org.modeshape.common.util.CheckArg;
+import org.modeshape.graph.ExecutionContext;
+import org.modeshape.graph.observe.Changes;
+import org.modeshape.graph.observe.LocalObservationBus;
+import org.modeshape.graph.observe.ObservationBus;
+import org.modeshape.graph.observe.Observer;
+import org.modeshape.repository.RepositoryI18n;
+import org.modeshape.repository.service.AbstractServiceAdministrator;
+import org.modeshape.repository.service.AdministeredService;
+import org.modeshape.repository.service.ServiceAdministrator;
+
+/**
+ * The service that provides the observation bus for a clustered (or unclustered) environment.
+ */
+public class ClusteringService implements AdministeredService, ObservationBus {
+
+ /**
+ * The administrative component for this service.
+ */
+ protected class Administrator extends AbstractServiceAdministrator {
+
+ protected Administrator() {
+ super(RepositoryI18n.clusteringServiceName, State.PAUSED);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected void doStart( State fromState ) {
+ super.doStart(fromState);
+ startService();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected void doShutdown( State fromState ) {
+ super.doShutdown(fromState);
+ shutdownService();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected boolean doCheckIsTerminated() {
+ return isServiceTerminated();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean awaitTermination( long timeout,
+ TimeUnit unit ) {
+ return true; // nothing to wait for
+ }
+
+ }
+
+ private ExecutionContext executionContext;
+ private ObservationBus bus;
+ private final ComponentLibrary busLibrary = new ComponentLibrary();
+
+ /**
+ * @return executionContext
+ */
+ public ExecutionContext getExecutionContext() {
+ return this.executionContext;
+ }
+
+ /**
+ * @param executionContext Sets executionContext to the specified value.
+ */
+ public void setExecutionContext( ExecutionContext executionContext ) {
+ CheckArg.isNotNull(executionContext, "execution context");
+ if (this.getAdministrator().isStarted()) {
+ throw new IllegalStateException(RepositoryI18n.unableToChangeExecutionContextWhileRunning.text());
+ }
+ this.executionContext = executionContext;
+ this.busLibrary.setClassLoaderFactory(executionContext);
+ }
+
+ /**
+ * Set the configuration for the clustering. This method will replace any existing configuration.
+ *
+ * @param config the new configuration, or null if the default configuration should be used
+ * @return true if the configuration was set, or false otherwise
+ */
+ public boolean setClusteringConfig( ClusteringConfig config ) {
+ if (config == null) config = createDefaultConfiguration();
+ return this.busLibrary.removeAllAndAdd(config);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.modeshape.graph.observe.ObservationBus#hasObservers()
+ */
+ @Override
+ public boolean hasObservers() {
+ return bus != null && bus.hasObservers();
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.modeshape.graph.observe.Observable#register(org.modeshape.graph.observe.Observer)
+ */
+ @Override
+ public boolean register( Observer observer ) {
+ if (bus == null) {
+ throw new IllegalStateException(RepositoryI18n.unableToRegisterObserverOnUnstartedClusteringService.text());
+ }
+ return bus.register(observer);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.modeshape.graph.observe.Observable#unregister(org.modeshape.graph.observe.Observer)
+ */
+ @Override
+ public boolean unregister( Observer observer ) {
+ if (bus == null) {
+ throw new IllegalStateException(RepositoryI18n.unableToUnregisterObserverOnUnstartedClusteringService.text());
+ }
+ return bus.unregister(observer);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.modeshape.graph.observe.Observer#notify(org.modeshape.graph.observe.Changes)
+ */
+ @Override
+ public void notify( Changes changes ) {
+ if (bus == null) {
+ throw new IllegalStateException(RepositoryI18n.unableToNotifyObserversOnUnstartedClusteringService.text());
+ }
+ bus.notify(changes);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.modeshape.repository.service.AdministeredService#getAdministrator()
+ */
+ @Override
+ public ServiceAdministrator getAdministrator() {
+ return new Administrator();
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * This is equivalent to calling getAdminstrator().start()
and can be called multiple times.
+ *
+ *
+ * @see org.modeshape.graph.observe.ObservationBus#start()
+ */
+ @Override
+ public void start() {
+ getAdministrator().start();
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * This is equivalent to calling getAdminstrator().shutdown()
.
+ *
+ *
+ * @see org.modeshape.graph.observe.ObservationBus#shutdown()
+ */
+ @Override
+ public void shutdown() {
+ getAdministrator().shutdown();
+ }
+
+ protected void startService() {
+ List instances = busLibrary.getInstances();
+ if (instances.isEmpty()) {
+ setClusteringConfig(null);
+ instances = busLibrary.getInstances();
+ assert instances.size() > 0;
+ }
+ this.bus = instances.get(0);
+ this.bus.start();
+ }
+
+ protected void shutdownService() {
+ // Unregister our observer ...
+ try {
+ if (this.bus != null) {
+ this.bus.shutdown();
+ }
+ } finally {
+ this.bus = null;
+ }
+ }
+
+ protected boolean isServiceTerminated() {
+ return this.bus != null;
+ }
+
+ protected ClusteringConfig createDefaultConfiguration() {
+ return new ClusteringConfig("bus", "Local observation bus", LocalObservationBus.class.getName(), null);
+ }
+}
Index: modeshape-repository/src/main/resources/org/modeshape/repository/RepositoryI18n.properties
===================================================================
--- modeshape-repository/src/main/resources/org/modeshape/repository/RepositoryI18n.properties (revision 1989)
+++ modeshape-repository/src/main/resources/org/modeshape/repository/RepositoryI18n.properties (working copy)
@@ -40,26 +40,12 @@ errorFindingPropertyNameInPropertyAddedEvent = Error finding the name of the add
errorFindingPropertyNameInPropertyChangedEvent = Error finding the name of the changed property in the event path {0}
errorFindingPropertyNameInPropertyRemovedEvent = Error finding the name of the removed property in the event path {0}
-unableToObtainJsr94RuleAdministrator = Unable to obtain the rule administrator for JSR-94 service provider {0} ({1}) while adding/updating rule set {2}
-errorUsingJsr94RuleAdministrator = Error using rule administrator for JSR-94 service provider {0} ({1}) while adding/updating rule set {2}
-unableToObtainJsr94ServiceProvider = Error using rule administrator for JSR-94 service provider {0} ({1})
-errorAddingOrUpdatingRuleSet = Error adding/updating rule set "{0}"
-errorRollingBackRuleSetAfterUpdateFailed = Error rolling back rule set "{0}" after new rule set failed
-errorReadingRulesAndProperties = Error reading the rules and properties while adding/updating rule set "{0}"
-errorDeregisteringRuleSetBeforeUpdatingIt = Error deregistering rule set "{0}" before updating it
-errorRecreatingRuleSet = Error (re)creating the rule set "{0}"
-errorRemovingRuleSet = Error removing rule set "{0}"
-errorRemovingRuleSetUponShutdown = Error removing rule set "{0}" upon shutdown
-unableToFindRuleSet = Unable to find rule set with name "{0}"
-errorExecutingRuleSetWithGlobalsAndFacts = Error executing rule set "{0}" and with globals {1} and facts {2}
-unableToBuildRuleSetRegularExpressionPattern = Unable to build the rule set name pattern "{0}" using the supplied absolute path ("{1}"): {2}
+repositoryServiceName = Repository Service
-errorObtainingSessionToRepositoryWorkspace = Error obtaining JCR session to repository workspace {0}
-errorWritingProblemsOnRuleSet = Error while writing problems on rule set node {0}
-
-federationServiceName = Federation Service
-observationServiceName = Observation Service
-ruleServiceName = Rule Service
+clusteringServiceName = Clustering Service
+unableToRegisterObserverOnUnstartedClusteringService = Unable to register an observer until the clustering service is started
+unableToUnregisterObserverOnUnstartedClusteringService = Unable to unregister an observer until the clustering service is started
+unableToNotifyObserversOnUnstartedClusteringService = Unable to notify observers until the clustering service is started
sequencingServiceName = Sequencing Service
unableToChangeExecutionContextWhileRunning = Unable to change the execution context while running
Index: modeshape-repository/src/test/java/org/modeshape/repository/RepositoryServiceTest.java
===================================================================
--- modeshape-repository/src/test/java/org/modeshape/repository/RepositoryServiceTest.java (revision 1989)
+++ modeshape-repository/src/test/java/org/modeshape/repository/RepositoryServiceTest.java (working copy)
@@ -29,25 +29,27 @@ import static org.hamcrest.core.IsNull.notNullValue;
import static org.hamcrest.core.IsNull.nullValue;
import static org.hamcrest.core.IsSame.sameInstance;
import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
import org.modeshape.common.collection.Problems;
import org.modeshape.common.collection.SimpleProblems;
import org.modeshape.common.util.Logger;
-import org.modeshape.graph.ModeShapeLexicon;
import org.modeshape.graph.ExecutionContext;
import org.modeshape.graph.Graph;
+import org.modeshape.graph.ModeShapeLexicon;
import org.modeshape.graph.connector.RepositoryConnection;
import org.modeshape.graph.connector.RepositorySource;
import org.modeshape.graph.connector.inmemory.InMemoryRepositorySource;
+import org.modeshape.graph.observe.LocalObservationBus;
+import org.modeshape.graph.observe.ObservationBus;
import org.modeshape.graph.property.Path;
import org.modeshape.repository.service.ServiceAdministrator;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.MockitoAnnotations;
-import org.mockito.Mock;
/**
* @author Randall Hauch
@@ -62,6 +64,7 @@ public class RepositoryServiceTest {
private ExecutionContext context;
private Path root;
private Problems problems;
+ private ObservationBus bus;
@Mock
private RepositoryLibrary sources;
@@ -80,7 +83,8 @@ public class RepositoryServiceTest {
when(sources.createConnection(configSourceName)).thenReturn(configRepositoryConnection);
root = context.getValueFactories().getPathFactory().createRootPath();
problems = new SimpleProblems();
- service = new RepositoryService(configRepositorySource, configWorkspaceName, root, context, problems);
+ bus = new LocalObservationBus();
+ service = new RepositoryService(configRepositorySource, configWorkspaceName, root, context, bus, problems);
}
@After
@@ -193,7 +197,7 @@ public class RepositoryServiceTest {
@Test
public void shouldConfigureRepositorySourceWithSetterThatTakesArrayButWithSingleValues() {
Path configPath = context.getValueFactories().getPathFactory().create("/mode:system");
- service = new RepositoryService(configRepositorySource, configWorkspaceName, configPath, context, problems);
+ service = new RepositoryService(configRepositorySource, configWorkspaceName, configPath, context, bus, problems);
// Set up the configuration repository ...
configRepository.useWorkspace("default");
Index: modeshape-repository/src/test/java/org/modeshape/repository/sequencer/SequencingServiceTest.java
===================================================================
--- modeshape-repository/src/test/java/org/modeshape/repository/sequencer/SequencingServiceTest.java (revision 1989)
+++ modeshape-repository/src/test/java/org/modeshape/repository/sequencer/SequencingServiceTest.java (working copy)
@@ -39,6 +39,7 @@ import org.junit.Test;
import org.modeshape.graph.ExecutionContext;
import org.modeshape.graph.Graph;
import org.modeshape.graph.connector.inmemory.InMemoryRepositorySource;
+import org.modeshape.graph.observe.LocalObservationBus;
import org.modeshape.graph.property.Path;
import org.modeshape.repository.RepositoryLibrary;
import org.modeshape.repository.service.ServiceAdministrator;
@@ -62,7 +63,7 @@ public class SequencingServiceTest {
configSource.setDefaultWorkspaceName("default");
Path configPath = context.getValueFactories().getPathFactory().create("/");
- sources = new RepositoryLibrary(configSource, "default", configPath, context);
+ sources = new RepositoryLibrary(configSource, "default", configPath, context, new LocalObservationBus());
InMemoryRepositorySource source = new InMemoryRepositorySource();
source.setName(REPOSITORY_SOURCE_NAME);
sources.addSource(source);
Index: pom.xml
===================================================================
--- pom.xml (revision 1989)
+++ pom.xml (working copy)
@@ -131,6 +131,7 @@
modeshape-repository
modeshape-cnd
extensions/modeshape-search-lucene
+ extensions/modeshape-clustering
modeshape-jcr-api
modeshape-jcr
extensions/modeshape-classloader-maven