Index: src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java
===================================================================
--- src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java (revision 1553)
+++ src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java (working copy)
@@ -1,6 +1,9 @@
package org.infinispan.loaders.cloud;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.HashSet;
@@ -11,6 +14,8 @@
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
import org.infinispan.Cache;
import org.infinispan.config.ConfigurationException;
@@ -37,10 +42,11 @@
/**
* The CloudCacheStore implementation that utilizes JClouds to communicate with cloud storage providers
- * such as Amazon's S3, Rackspace's Cloudfiles, or
- * any other such provider supported by JClouds.
+ * href="http://code.google.com/p/jclouds">JClouds to communicate with cloud
+ * storage providers such as Amazon's S3,
+ * Rackspace's
+ * Cloudfiles, or any other such provider supported by JClouds.
*
* This file store stores stuff in the following format:
* http://{cloud-storage-provider}/{bucket}/{bucket_number}.bucket
@@ -51,250 +57,317 @@
* @since 4.0
*/
public class CloudCacheStore extends BucketBasedCacheStore {
- private static final Log log = LogFactory.getLog(CloudCacheStore.class);
- private final ThreadLocal>> asyncCommandFutures = new ThreadLocal>>();
- private CloudCacheStoreConfig cfg;
- private String containerName;
- private BlobStoreContext ctx;
- private BlobStore blobStore;
- private AsyncBlobStore asyncBlobStore;
- private boolean pollFutures = false;
- private boolean constructInternalBlobstores = true;
+ private static final Log log = LogFactory.getLog(CloudCacheStore.class);
+ private final ThreadLocal>> asyncCommandFutures = new ThreadLocal>>();
+ private CloudCacheStoreConfig cfg;
+ private String containerName;
+ private BlobStoreContext ctx;
+ private BlobStore blobStore;
+ private AsyncBlobStore asyncBlobStore;
+ private boolean pollFutures = false;
+ private boolean constructInternalBlobstores = true;
+
+ @Override
+ public Class extends CacheStoreConfig> getConfigurationClass() {
+ return CloudCacheStoreConfig.class;
+ }
+
+ private String getThisContainerName() {
+ return cfg.getBucketPrefix()
+ + "-"
+ + cache.getName().toLowerCase().replace("_", "").replace(".",
+ "");
+ }
+
+ @Override
+ protected boolean supportsMultiThreadedPurge() {
+ return true;
+ }
- @Override
- public Class extends CacheStoreConfig> getConfigurationClass() {
- return CloudCacheStoreConfig.class;
- }
+ @Override
+ public void init(CacheLoaderConfig cfg, Cache, ?> cache, Marshaller m)
+ throws CacheLoaderException {
+ this.cfg = (CloudCacheStoreConfig) cfg;
+ init(cfg, cache, m, null, null, null, true);
+ }
- private String getThisContainerName() {
- return cfg.getBucketPrefix() + "-"
- + cache.getName().toLowerCase().replace("_", "").replace(".", "");
- }
+ public void init(CacheLoaderConfig cfg, Cache, ?> cache, Marshaller m,
+ BlobStoreContext ctx, BlobStore blobStore,
+ AsyncBlobStore asyncBlobStore, boolean constructInternalBlobstores)
+ throws CacheLoaderException {
+ super.init(cfg, cache, m);
+ this.cfg = (CloudCacheStoreConfig) cfg;
+ this.cache = cache;
+ this.marshaller = m;
+ this.ctx = ctx;
+ this.blobStore = blobStore;
+ this.asyncBlobStore = asyncBlobStore;
+ this.constructInternalBlobstores = constructInternalBlobstores;
+ }
- @Override
- protected boolean supportsMultiThreadedPurge() {
- return true;
- }
+ @Override
+ public void start() throws CacheLoaderException {
+ super.start();
+ if (constructInternalBlobstores) {
+ if (cfg.getCloudService() == null)
+ throw new ConfigurationException("CloudService must be set!");
+ if (cfg.getIdentity() == null)
+ throw new ConfigurationException("Identity must be set");
+ if (cfg.getPassword() == null)
+ throw new ConfigurationException("Password must be set");
+ }
+ if (cfg.getBucketPrefix() == null)
+ throw new ConfigurationException("CloudBucket must be set");
+ containerName = getThisContainerName();
+ try {
+ if (constructInternalBlobstores) {
+ // add an executor as a constructor param to
+ // EnterpriseConfigurationModule, pass
+ // property overrides instead of Properties()
+ ctx = new BlobStoreContextFactory().createContext(cfg
+ .getCloudService(), cfg.getIdentity(), cfg
+ .getPassword(), ImmutableSet.of(
+ new EnterpriseConfigurationModule(),
+ new Log4JLoggingModule()), new Properties());
+ blobStore = ctx.getBlobStore();
+ asyncBlobStore = ctx.getAsyncBlobStore();
+ }
- @Override
- public void init(CacheLoaderConfig cfg, Cache, ?> cache, Marshaller m)
- throws CacheLoaderException {
- this.cfg = (CloudCacheStoreConfig) cfg;
- init(cfg, cache, m, null, null, null, true);
- }
+ // the "location" is not currently used.
+ if (!blobStore.containerExists(containerName))
+ blobStore.createContainerInLocation(cfg
+ .getCloudServiceLocation(), containerName);
+ pollFutures = !cfg.getAsyncStoreConfig().isEnabled();
+ } catch (IOException ioe) {
+ throw new CacheLoaderException("Unable to create context", ioe);
+ }
+ }
- public void init(CacheLoaderConfig cfg, Cache, ?> cache, Marshaller m, BlobStoreContext ctx,
- BlobStore blobStore, AsyncBlobStore asyncBlobStore, boolean constructInternalBlobstores)
- throws CacheLoaderException {
- super.init(cfg, cache, m);
- this.cfg = (CloudCacheStoreConfig) cfg;
- this.cache = cache;
- this.marshaller = m;
- this.ctx = ctx;
- this.blobStore = blobStore;
- this.asyncBlobStore = asyncBlobStore;
- this.constructInternalBlobstores = constructInternalBlobstores;
- }
+ @Override
+ protected Set loadAllLockSafe()
+ throws CacheLoaderException {
+ Set result = new HashSet();
- @Override
- public void start() throws CacheLoaderException {
- super.start();
- if (constructInternalBlobstores) {
- if (cfg.getCloudService() == null)
- throw new ConfigurationException("CloudService must be set!");
- if (cfg.getIdentity() == null)
- throw new ConfigurationException("Identity must be set");
- if (cfg.getPassword() == null)
- throw new ConfigurationException("Password must be set");
- }
- if (cfg.getBucketPrefix() == null)
- throw new ConfigurationException("CloudBucket must be set");
- containerName = getThisContainerName();
- try {
- if (constructInternalBlobstores) {
- // add an executor as a constructor param to EnterpriseConfigurationModule, pass
- // property overrides instead of Properties()
- ctx = new BlobStoreContextFactory().createContext(cfg.getCloudService(), cfg
- .getIdentity(), cfg.getPassword(), ImmutableSet.of(
- new EnterpriseConfigurationModule(), new Log4JLoggingModule()),
- new Properties());
- blobStore = ctx.getBlobStore();
- asyncBlobStore = ctx.getAsyncBlobStore();
- }
+ for (Map.Entry entry : ctx.createBlobMap(containerName)
+ .entrySet()) {
+ Bucket bucket = readFromBlob(entry.getValue(), entry.getKey());
+ if (bucket.removeExpiredEntries())
+ updateBucket(bucket);
+ result.addAll(bucket.getStoredEntries());
+ }
+ return result;
+ }
- // the "location" is not currently used.
- if (!blobStore.containerExists(containerName))
- blobStore.createContainerInLocation(cfg.getCloudServiceLocation(), containerName);
- pollFutures = !cfg.getAsyncStoreConfig().isEnabled();
- } catch (IOException ioe) {
- throw new CacheLoaderException("Unable to create context", ioe);
- }
- }
+ @Override
+ protected void fromStreamLockSafe(ObjectInput objectInput)
+ throws CacheLoaderException {
+ String source;
+ try {
+ source = (String) objectInput.readObject();
+ } catch (Exception e) {
+ throw convertToCacheLoaderException(
+ "Error while reading from stream", e);
+ }
+ if (containerName.equals(source)) {
+ log.info("Attempt to load the same cloud bucket ({0}) ignored",
+ source);
+ } else {
+ // TODO implement stream handling. What's the JClouds API to "copy"
+ // one bucket to another?
+ }
+ }
- protected Set loadAllLockSafe() throws CacheLoaderException {
- Set result = new HashSet();
+ @Override
+ protected void toStreamLockSafe(ObjectOutput objectOutput)
+ throws CacheLoaderException {
+ try {
+ objectOutput.writeObject(containerName);
+ } catch (Exception e) {
+ throw convertToCacheLoaderException(
+ "Error while writing to stream", e);
+ }
+ }
- for (Map.Entry entry : ctx.createBlobMap(containerName).entrySet()) {
- Bucket bucket = readFromBlob(entry.getValue(), entry.getKey());
- if (bucket.removeExpiredEntries())
- updateBucket(bucket);
- result.addAll(bucket.getStoredEntries());
- }
- return result;
- }
+ @Override
+ protected void clearLockSafe() {
+ List> futures = asyncCommandFutures.get();
+ if (futures == null) {
+ // is a sync call
+ blobStore.clearContainer(containerName);
+ } else {
+ // is an async call - invoke clear() on the container asynchronously
+ // and store the future
+ // in the 'futures' collection
+ futures.add(asyncBlobStore.clearContainer(containerName));
+ }
+ }
- protected void fromStreamLockSafe(ObjectInput objectInput) throws CacheLoaderException {
- String source;
- try {
- source = (String) objectInput.readObject();
- } catch (Exception e) {
- throw convertToCacheLoaderException("Error while reading from stream", e);
- }
- if (containerName.equals(source)) {
- log.info("Attempt to load the same cloud bucket ({0}) ignored", source);
- } else {
- // TODO implement stream handling. What's the JClouds API to "copy" one bucket to another?
- }
- }
+ private CacheLoaderException convertToCacheLoaderException(String m,
+ Throwable c) {
+ if (c instanceof CacheLoaderException) {
+ return (CacheLoaderException) c;
+ } else {
+ return new CacheLoaderException(m, c);
+ }
+ }
- protected void toStreamLockSafe(ObjectOutput objectOutput) throws CacheLoaderException {
- try {
- objectOutput.writeObject(containerName);
- } catch (Exception e) {
- throw convertToCacheLoaderException("Error while writing to stream", e);
- }
- }
+ @Override
+ protected Bucket loadBucket(String hash) throws CacheLoaderException {
+ return readFromBlob(blobStore.getBlob(containerName,
+ encodeBucketName(hash)), hash);
+ }
- protected void clearLockSafe() {
- List> futures = asyncCommandFutures.get();
- if (futures == null) {
- // is a sync call
- blobStore.clearContainer(containerName);
- } else {
- // is an async call - invoke clear() on the container asynchronously and store the future
- // in the 'futures' collection
- futures.add(asyncBlobStore.clearContainer(containerName));
- }
- }
+ private void purge(BlobMap blobMap) throws CacheLoaderException {
+ for (Map.Entry entry : blobMap.entrySet()) {
+ Bucket bucket = readFromBlob(entry.getValue(), entry.getKey());
+ if (bucket.removeExpiredEntries())
+ updateBucket(bucket);
+ }
+ }
- private CacheLoaderException convertToCacheLoaderException(String m, Throwable c) {
- if (c instanceof CacheLoaderException) {
- return (CacheLoaderException) c;
- } else {
- return new CacheLoaderException(m, c);
- }
- }
+ @Override
+ protected void purgeInternal() throws CacheLoaderException {
+ // TODO can expiry data be stored in a blob's metadata? More efficient
+ // purging that way. See
+ // https://jira.jboss.org/jira/browse/ISPN-334
+ if (!cfg.isLazyPurgingOnly()) {
+ acquireGlobalLock(false);
+ try {
+ final BlobMap blobMap = ctx.createBlobMap(containerName);
+ if (multiThreadedPurge) {
+ purgerService.execute(new Runnable() {
+ public void run() {
+ try {
+ purge(blobMap);
+ } catch (Exception e) {
+ log.warn("Problems purging", e);
+ }
+ }
+ });
+ } else {
+ purge(blobMap);
+ }
+ } finally {
+ releaseGlobalLock(false);
+ }
+ }
+ }
- protected Bucket loadBucket(String hash) throws CacheLoaderException {
- return readFromBlob(blobStore.getBlob(containerName, encodeBucketName(hash)), hash);
- }
+ @Override
+ protected void insertBucket(Bucket bucket) throws CacheLoaderException {
+ Blob blob = blobStore.newBlob(encodeBucketName(bucket.getBucketName()));
+ writeToBlob(blob, bucket);
- private void purge(BlobMap blobMap) throws CacheLoaderException {
- for (Map.Entry entry : blobMap.entrySet()) {
- Bucket bucket = readFromBlob(entry.getValue(), entry.getKey());
- if (bucket.removeExpiredEntries())
- updateBucket(bucket);
- }
- }
+ List> futures = asyncCommandFutures.get();
+ if (futures == null) {
+ // is a sync call
+ blobStore.putBlob(containerName, blob);
+ } else {
+ // is an async call - invoke clear() on the container asynchronously
+ // and store the future
+ // in the 'futures' collection
+ futures.add(asyncBlobStore.putBlob(containerName, blob));
+ }
+ }
- protected void purgeInternal() throws CacheLoaderException {
- // TODO can expiry data be stored in a blob's metadata? More efficient purging that way. See
- // https://jira.jboss.org/jira/browse/ISPN-334
- if (!cfg.isLazyPurgingOnly()) {
- acquireGlobalLock(false);
- try {
- final BlobMap blobMap = ctx.createBlobMap(containerName);
- if (multiThreadedPurge) {
- purgerService.execute(new Runnable() {
- public void run() {
- try {
- purge(blobMap);
- } catch (Exception e) {
- log.warn("Problems purging", e);
- }
- }
- });
- } else {
- purge(blobMap);
- }
- } finally {
- releaseGlobalLock(false);
- }
- }
- }
+ @Override
+ public void applyModifications(List extends Modification> modifications)
+ throws CacheLoaderException {
+ List> futures = new LinkedList>();
+ asyncCommandFutures.set(futures);
- protected void insertBucket(Bucket bucket) throws CacheLoaderException {
- Blob blob = blobStore.newBlob(encodeBucketName(bucket.getBucketName()));
- writeToBlob(blob, bucket);
+ try {
+ super.applyModifications(modifications);
+ if (pollFutures) {
+ CacheLoaderException exception = null;
+ try {
+ futures = asyncCommandFutures.get();
+ if (log.isTraceEnabled())
+ log.trace("Futures, in order: {0}", futures);
+ for (Future> f : futures) {
+ Object o = f.get();
+ if (log.isTraceEnabled())
+ log.trace("Future {0} returned {1}", f, o);
+ }
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException ee) {
+ exception = convertToCacheLoaderException(
+ "Caught exception in async process", ee.getCause());
+ }
+ if (exception != null)
+ throw exception;
+ }
+ } finally {
+ asyncCommandFutures.remove();
+ }
+ }
- List> futures = asyncCommandFutures.get();
- if (futures == null) {
- // is a sync call
- blobStore.putBlob(containerName, blob);
- } else {
- // is an async call - invoke clear() on the container asynchronously and store the future
- // in the 'futures' collection
- futures.add(asyncBlobStore.putBlob(containerName, blob));
- }
- }
+ @Override
+ protected void updateBucket(Bucket bucket) throws CacheLoaderException {
+ insertBucket(bucket);
+ }
- @Override
- public void applyModifications(List extends Modification> modifications)
- throws CacheLoaderException {
- List> futures = new LinkedList>();
- asyncCommandFutures.set(futures);
+ private void writeToBlob(Blob blob, Bucket bucket)
+ throws CacheLoaderException {
- try {
- super.applyModifications(modifications);
- if (pollFutures) {
- CacheLoaderException exception = null;
- try {
- futures = asyncCommandFutures.get();
- if (log.isTraceEnabled()) log.trace("Futures, in order: {0}", futures);
- for (Future> f : futures) {
- Object o = f.get();
- if (log.isTraceEnabled()) log.trace("Future {0} returned {1}", f, o);
- }
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- } catch (ExecutionException ee) {
- exception = convertToCacheLoaderException("Caught exception in async process", ee
- .getCause());
- }
- if (exception != null)
- throw exception;
- }
- } finally {
- asyncCommandFutures.remove();
- }
- }
+ try {
+ final byte[] payloadBuffer = marshaller.objectToByteBuffer(bucket);
+ if (cfg.isCompress())
+ blob.setPayload(compress(payloadBuffer));
+ else
+ blob.setPayload(payloadBuffer);
+ } catch (IOException e) {
+ throw new CacheLoaderException(e);
+ }
+ }
- protected void updateBucket(Bucket bucket) throws CacheLoaderException {
- insertBucket(bucket);
- }
+ private Bucket readFromBlob(Blob blob, String bucketName)
+ throws CacheLoaderException {
+ if (blob == null)
+ return null;
+ try {
+ Bucket bucket;
+ if (cfg.isCompress())
+ bucket = (Bucket) marshaller
+ .objectFromInputStream(new GZIPInputStream(blob
+ .getContent()));
+ else
+ bucket = (Bucket) marshaller.objectFromInputStream(blob
+ .getContent());
- private void writeToBlob(Blob blob, Bucket bucket) throws CacheLoaderException {
- try {
- blob.setPayload(marshaller.objectToByteBuffer(bucket));
- } catch (IOException e) {
- throw new CacheLoaderException(e);
- }
- }
+ if (bucket != null)
+ bucket.setBucketName(bucketName);
+ return bucket;
+ } catch (Exception e) {
+ throw convertToCacheLoaderException("Unable to read blob", e);
+ }
+ }
- private Bucket readFromBlob(Blob blob, String bucketName) throws CacheLoaderException {
- if (blob == null)
- return null;
- try {
- Bucket bucket = (Bucket) marshaller.objectFromInputStream(blob.getContent());
- if (bucket != null)
- bucket.setBucketName(bucketName);
- return bucket;
- } catch (Exception e) {
- throw convertToCacheLoaderException("Unable to read blob", e);
- }
- }
+ private byte[] compress(final byte[] payloadBuffer) throws IOException {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ InputStream input = new ByteArrayInputStream(payloadBuffer);
+ GZIPOutputStream output = new GZIPOutputStream(baos);
+ byte[] buf = new byte[1024];
- private String encodeBucketName(String decodedName) {
- return (decodedName.startsWith("-")) ? decodedName.replace('-', 'A') : decodedName;
- }
+ int bytesRead = input.read(buf);
+ while (bytesRead != -1) {
+ output.write(buf, 0, bytesRead);
+ bytesRead = input.read(buf);
+ }
+ input.close();
+ output.flush();
+ output.finish();
+ output.close();
+ final byte[] byteArray = baos.toByteArray();
+ return byteArray;
+ }
+
+ private String encodeBucketName(String decodedName) {
+ final String name = (decodedName.startsWith("-")) ? decodedName
+ .replace('-', 'A') : decodedName;
+ if (cfg.isCompress())
+ return name + ".gz";
+ return name;
+ }
}
Index: src/main/java/org/infinispan/loaders/cloud/CloudCacheStoreConfig.java
===================================================================
--- src/main/java/org/infinispan/loaders/cloud/CloudCacheStoreConfig.java (revision 1553)
+++ src/main/java/org/infinispan/loaders/cloud/CloudCacheStoreConfig.java (working copy)
@@ -34,7 +34,17 @@
private String cloudService;
private int maxConnections = 10000;
private boolean secure = true;
- private String cloudServiceLocation = "DEFAULT";
+ private boolean compress = true;
+
+ public boolean isCompress() {
+ return compress;
+ }
+
+ public void setCompress(boolean compress) {
+ this.compress = compress;
+ }
+
+ private final String cloudServiceLocation = "DEFAULT";
private static final long serialVersionUID = -9011054600279256849L;
public CloudCacheStoreConfig() {
@@ -133,6 +143,8 @@
if (maxConnections != that.maxConnections) return false;
if (requestTimeout != that.requestTimeout) return false;
if (secure != that.secure) return false;
+ if (compress != that.compress)
+ return false;
if (bucketPrefix != null ? !bucketPrefix.equals(that.bucketPrefix) : that.bucketPrefix != null) return false;
if (cloudService != null ? !cloudService.equals(that.cloudService) : that.cloudService != null) return false;
if (cloudServiceLocation != null ? !cloudServiceLocation.equals(that.cloudServiceLocation) : that.cloudServiceLocation != null)
@@ -158,6 +170,7 @@
result = 31 * result + (cloudService != null ? cloudService.hashCode() : 0);
result = 31 * result + maxConnections;
result = 31 * result + (secure ? 1 : 0);
+ result = 31 * result + (compress ? 1 : 0);
result = 31 * result + (cloudServiceLocation != null ? cloudServiceLocation.hashCode() : 0);
return result;
}
@@ -175,6 +188,7 @@
", cloudService='" + cloudService + '\'' +
", maxConnections=" + maxConnections +
", secure=" + secure +
+ ", compress=" + compress +
", cloudServiceLocation='" + cloudServiceLocation + '\'' +
'}';
}