Index: src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java =================================================================== --- src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java (revision 1553) +++ src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java (working copy) @@ -1,6 +1,9 @@ package org.infinispan.loaders.cloud; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.ObjectInput; import java.io.ObjectOutput; import java.util.HashSet; @@ -11,6 +14,8 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; import org.infinispan.Cache; import org.infinispan.config.ConfigurationException; @@ -37,10 +42,11 @@ /** * The CloudCacheStore implementation that utilizes JClouds to communicate with cloud storage providers - * such as Amazon's S3, Rackspace's Cloudfiles, or - * any other such provider supported by JClouds. + * href="http://code.google.com/p/jclouds">JClouds to communicate with cloud + * storage providers such as Amazon's S3, + * Rackspace's + * Cloudfiles, or any other such provider supported by JClouds. *

* This file store stores stuff in the following format: * http://{cloud-storage-provider}/{bucket}/{bucket_number}.bucket @@ -51,250 +57,317 @@ * @since 4.0 */ public class CloudCacheStore extends BucketBasedCacheStore { - private static final Log log = LogFactory.getLog(CloudCacheStore.class); - private final ThreadLocal>> asyncCommandFutures = new ThreadLocal>>(); - private CloudCacheStoreConfig cfg; - private String containerName; - private BlobStoreContext ctx; - private BlobStore blobStore; - private AsyncBlobStore asyncBlobStore; - private boolean pollFutures = false; - private boolean constructInternalBlobstores = true; + private static final Log log = LogFactory.getLog(CloudCacheStore.class); + private final ThreadLocal>> asyncCommandFutures = new ThreadLocal>>(); + private CloudCacheStoreConfig cfg; + private String containerName; + private BlobStoreContext ctx; + private BlobStore blobStore; + private AsyncBlobStore asyncBlobStore; + private boolean pollFutures = false; + private boolean constructInternalBlobstores = true; + + @Override + public Class getConfigurationClass() { + return CloudCacheStoreConfig.class; + } + + private String getThisContainerName() { + return cfg.getBucketPrefix() + + "-" + + cache.getName().toLowerCase().replace("_", "").replace(".", + ""); + } + + @Override + protected boolean supportsMultiThreadedPurge() { + return true; + } - @Override - public Class getConfigurationClass() { - return CloudCacheStoreConfig.class; - } + @Override + public void init(CacheLoaderConfig cfg, Cache cache, Marshaller m) + throws CacheLoaderException { + this.cfg = (CloudCacheStoreConfig) cfg; + init(cfg, cache, m, null, null, null, true); + } - private String getThisContainerName() { - return cfg.getBucketPrefix() + "-" - + cache.getName().toLowerCase().replace("_", "").replace(".", ""); - } + public void init(CacheLoaderConfig cfg, Cache cache, Marshaller m, + BlobStoreContext ctx, BlobStore blobStore, + AsyncBlobStore asyncBlobStore, boolean constructInternalBlobstores) + throws CacheLoaderException { + super.init(cfg, cache, m); + this.cfg = (CloudCacheStoreConfig) cfg; + this.cache = cache; + this.marshaller = m; + this.ctx = ctx; + this.blobStore = blobStore; + this.asyncBlobStore = asyncBlobStore; + this.constructInternalBlobstores = constructInternalBlobstores; + } - @Override - protected boolean supportsMultiThreadedPurge() { - return true; - } + @Override + public void start() throws CacheLoaderException { + super.start(); + if (constructInternalBlobstores) { + if (cfg.getCloudService() == null) + throw new ConfigurationException("CloudService must be set!"); + if (cfg.getIdentity() == null) + throw new ConfigurationException("Identity must be set"); + if (cfg.getPassword() == null) + throw new ConfigurationException("Password must be set"); + } + if (cfg.getBucketPrefix() == null) + throw new ConfigurationException("CloudBucket must be set"); + containerName = getThisContainerName(); + try { + if (constructInternalBlobstores) { + // add an executor as a constructor param to + // EnterpriseConfigurationModule, pass + // property overrides instead of Properties() + ctx = new BlobStoreContextFactory().createContext(cfg + .getCloudService(), cfg.getIdentity(), cfg + .getPassword(), ImmutableSet.of( + new EnterpriseConfigurationModule(), + new Log4JLoggingModule()), new Properties()); + blobStore = ctx.getBlobStore(); + asyncBlobStore = ctx.getAsyncBlobStore(); + } - @Override - public void init(CacheLoaderConfig cfg, Cache cache, Marshaller m) - throws CacheLoaderException { - this.cfg = (CloudCacheStoreConfig) cfg; - init(cfg, cache, m, null, null, null, true); - } + // the "location" is not currently used. + if (!blobStore.containerExists(containerName)) + blobStore.createContainerInLocation(cfg + .getCloudServiceLocation(), containerName); + pollFutures = !cfg.getAsyncStoreConfig().isEnabled(); + } catch (IOException ioe) { + throw new CacheLoaderException("Unable to create context", ioe); + } + } - public void init(CacheLoaderConfig cfg, Cache cache, Marshaller m, BlobStoreContext ctx, - BlobStore blobStore, AsyncBlobStore asyncBlobStore, boolean constructInternalBlobstores) - throws CacheLoaderException { - super.init(cfg, cache, m); - this.cfg = (CloudCacheStoreConfig) cfg; - this.cache = cache; - this.marshaller = m; - this.ctx = ctx; - this.blobStore = blobStore; - this.asyncBlobStore = asyncBlobStore; - this.constructInternalBlobstores = constructInternalBlobstores; - } + @Override + protected Set loadAllLockSafe() + throws CacheLoaderException { + Set result = new HashSet(); - @Override - public void start() throws CacheLoaderException { - super.start(); - if (constructInternalBlobstores) { - if (cfg.getCloudService() == null) - throw new ConfigurationException("CloudService must be set!"); - if (cfg.getIdentity() == null) - throw new ConfigurationException("Identity must be set"); - if (cfg.getPassword() == null) - throw new ConfigurationException("Password must be set"); - } - if (cfg.getBucketPrefix() == null) - throw new ConfigurationException("CloudBucket must be set"); - containerName = getThisContainerName(); - try { - if (constructInternalBlobstores) { - // add an executor as a constructor param to EnterpriseConfigurationModule, pass - // property overrides instead of Properties() - ctx = new BlobStoreContextFactory().createContext(cfg.getCloudService(), cfg - .getIdentity(), cfg.getPassword(), ImmutableSet.of( - new EnterpriseConfigurationModule(), new Log4JLoggingModule()), - new Properties()); - blobStore = ctx.getBlobStore(); - asyncBlobStore = ctx.getAsyncBlobStore(); - } + for (Map.Entry entry : ctx.createBlobMap(containerName) + .entrySet()) { + Bucket bucket = readFromBlob(entry.getValue(), entry.getKey()); + if (bucket.removeExpiredEntries()) + updateBucket(bucket); + result.addAll(bucket.getStoredEntries()); + } + return result; + } - // the "location" is not currently used. - if (!blobStore.containerExists(containerName)) - blobStore.createContainerInLocation(cfg.getCloudServiceLocation(), containerName); - pollFutures = !cfg.getAsyncStoreConfig().isEnabled(); - } catch (IOException ioe) { - throw new CacheLoaderException("Unable to create context", ioe); - } - } + @Override + protected void fromStreamLockSafe(ObjectInput objectInput) + throws CacheLoaderException { + String source; + try { + source = (String) objectInput.readObject(); + } catch (Exception e) { + throw convertToCacheLoaderException( + "Error while reading from stream", e); + } + if (containerName.equals(source)) { + log.info("Attempt to load the same cloud bucket ({0}) ignored", + source); + } else { + // TODO implement stream handling. What's the JClouds API to "copy" + // one bucket to another? + } + } - protected Set loadAllLockSafe() throws CacheLoaderException { - Set result = new HashSet(); + @Override + protected void toStreamLockSafe(ObjectOutput objectOutput) + throws CacheLoaderException { + try { + objectOutput.writeObject(containerName); + } catch (Exception e) { + throw convertToCacheLoaderException( + "Error while writing to stream", e); + } + } - for (Map.Entry entry : ctx.createBlobMap(containerName).entrySet()) { - Bucket bucket = readFromBlob(entry.getValue(), entry.getKey()); - if (bucket.removeExpiredEntries()) - updateBucket(bucket); - result.addAll(bucket.getStoredEntries()); - } - return result; - } + @Override + protected void clearLockSafe() { + List> futures = asyncCommandFutures.get(); + if (futures == null) { + // is a sync call + blobStore.clearContainer(containerName); + } else { + // is an async call - invoke clear() on the container asynchronously + // and store the future + // in the 'futures' collection + futures.add(asyncBlobStore.clearContainer(containerName)); + } + } - protected void fromStreamLockSafe(ObjectInput objectInput) throws CacheLoaderException { - String source; - try { - source = (String) objectInput.readObject(); - } catch (Exception e) { - throw convertToCacheLoaderException("Error while reading from stream", e); - } - if (containerName.equals(source)) { - log.info("Attempt to load the same cloud bucket ({0}) ignored", source); - } else { - // TODO implement stream handling. What's the JClouds API to "copy" one bucket to another? - } - } + private CacheLoaderException convertToCacheLoaderException(String m, + Throwable c) { + if (c instanceof CacheLoaderException) { + return (CacheLoaderException) c; + } else { + return new CacheLoaderException(m, c); + } + } - protected void toStreamLockSafe(ObjectOutput objectOutput) throws CacheLoaderException { - try { - objectOutput.writeObject(containerName); - } catch (Exception e) { - throw convertToCacheLoaderException("Error while writing to stream", e); - } - } + @Override + protected Bucket loadBucket(String hash) throws CacheLoaderException { + return readFromBlob(blobStore.getBlob(containerName, + encodeBucketName(hash)), hash); + } - protected void clearLockSafe() { - List> futures = asyncCommandFutures.get(); - if (futures == null) { - // is a sync call - blobStore.clearContainer(containerName); - } else { - // is an async call - invoke clear() on the container asynchronously and store the future - // in the 'futures' collection - futures.add(asyncBlobStore.clearContainer(containerName)); - } - } + private void purge(BlobMap blobMap) throws CacheLoaderException { + for (Map.Entry entry : blobMap.entrySet()) { + Bucket bucket = readFromBlob(entry.getValue(), entry.getKey()); + if (bucket.removeExpiredEntries()) + updateBucket(bucket); + } + } - private CacheLoaderException convertToCacheLoaderException(String m, Throwable c) { - if (c instanceof CacheLoaderException) { - return (CacheLoaderException) c; - } else { - return new CacheLoaderException(m, c); - } - } + @Override + protected void purgeInternal() throws CacheLoaderException { + // TODO can expiry data be stored in a blob's metadata? More efficient + // purging that way. See + // https://jira.jboss.org/jira/browse/ISPN-334 + if (!cfg.isLazyPurgingOnly()) { + acquireGlobalLock(false); + try { + final BlobMap blobMap = ctx.createBlobMap(containerName); + if (multiThreadedPurge) { + purgerService.execute(new Runnable() { + public void run() { + try { + purge(blobMap); + } catch (Exception e) { + log.warn("Problems purging", e); + } + } + }); + } else { + purge(blobMap); + } + } finally { + releaseGlobalLock(false); + } + } + } - protected Bucket loadBucket(String hash) throws CacheLoaderException { - return readFromBlob(blobStore.getBlob(containerName, encodeBucketName(hash)), hash); - } + @Override + protected void insertBucket(Bucket bucket) throws CacheLoaderException { + Blob blob = blobStore.newBlob(encodeBucketName(bucket.getBucketName())); + writeToBlob(blob, bucket); - private void purge(BlobMap blobMap) throws CacheLoaderException { - for (Map.Entry entry : blobMap.entrySet()) { - Bucket bucket = readFromBlob(entry.getValue(), entry.getKey()); - if (bucket.removeExpiredEntries()) - updateBucket(bucket); - } - } + List> futures = asyncCommandFutures.get(); + if (futures == null) { + // is a sync call + blobStore.putBlob(containerName, blob); + } else { + // is an async call - invoke clear() on the container asynchronously + // and store the future + // in the 'futures' collection + futures.add(asyncBlobStore.putBlob(containerName, blob)); + } + } - protected void purgeInternal() throws CacheLoaderException { - // TODO can expiry data be stored in a blob's metadata? More efficient purging that way. See - // https://jira.jboss.org/jira/browse/ISPN-334 - if (!cfg.isLazyPurgingOnly()) { - acquireGlobalLock(false); - try { - final BlobMap blobMap = ctx.createBlobMap(containerName); - if (multiThreadedPurge) { - purgerService.execute(new Runnable() { - public void run() { - try { - purge(blobMap); - } catch (Exception e) { - log.warn("Problems purging", e); - } - } - }); - } else { - purge(blobMap); - } - } finally { - releaseGlobalLock(false); - } - } - } + @Override + public void applyModifications(List modifications) + throws CacheLoaderException { + List> futures = new LinkedList>(); + asyncCommandFutures.set(futures); - protected void insertBucket(Bucket bucket) throws CacheLoaderException { - Blob blob = blobStore.newBlob(encodeBucketName(bucket.getBucketName())); - writeToBlob(blob, bucket); + try { + super.applyModifications(modifications); + if (pollFutures) { + CacheLoaderException exception = null; + try { + futures = asyncCommandFutures.get(); + if (log.isTraceEnabled()) + log.trace("Futures, in order: {0}", futures); + for (Future f : futures) { + Object o = f.get(); + if (log.isTraceEnabled()) + log.trace("Future {0} returned {1}", f, o); + } + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } catch (ExecutionException ee) { + exception = convertToCacheLoaderException( + "Caught exception in async process", ee.getCause()); + } + if (exception != null) + throw exception; + } + } finally { + asyncCommandFutures.remove(); + } + } - List> futures = asyncCommandFutures.get(); - if (futures == null) { - // is a sync call - blobStore.putBlob(containerName, blob); - } else { - // is an async call - invoke clear() on the container asynchronously and store the future - // in the 'futures' collection - futures.add(asyncBlobStore.putBlob(containerName, blob)); - } - } + @Override + protected void updateBucket(Bucket bucket) throws CacheLoaderException { + insertBucket(bucket); + } - @Override - public void applyModifications(List modifications) - throws CacheLoaderException { - List> futures = new LinkedList>(); - asyncCommandFutures.set(futures); + private void writeToBlob(Blob blob, Bucket bucket) + throws CacheLoaderException { - try { - super.applyModifications(modifications); - if (pollFutures) { - CacheLoaderException exception = null; - try { - futures = asyncCommandFutures.get(); - if (log.isTraceEnabled()) log.trace("Futures, in order: {0}", futures); - for (Future f : futures) { - Object o = f.get(); - if (log.isTraceEnabled()) log.trace("Future {0} returned {1}", f, o); - } - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } catch (ExecutionException ee) { - exception = convertToCacheLoaderException("Caught exception in async process", ee - .getCause()); - } - if (exception != null) - throw exception; - } - } finally { - asyncCommandFutures.remove(); - } - } + try { + final byte[] payloadBuffer = marshaller.objectToByteBuffer(bucket); + if (cfg.isCompress()) + blob.setPayload(compress(payloadBuffer)); + else + blob.setPayload(payloadBuffer); + } catch (IOException e) { + throw new CacheLoaderException(e); + } + } - protected void updateBucket(Bucket bucket) throws CacheLoaderException { - insertBucket(bucket); - } + private Bucket readFromBlob(Blob blob, String bucketName) + throws CacheLoaderException { + if (blob == null) + return null; + try { + Bucket bucket; + if (cfg.isCompress()) + bucket = (Bucket) marshaller + .objectFromInputStream(new GZIPInputStream(blob + .getContent())); + else + bucket = (Bucket) marshaller.objectFromInputStream(blob + .getContent()); - private void writeToBlob(Blob blob, Bucket bucket) throws CacheLoaderException { - try { - blob.setPayload(marshaller.objectToByteBuffer(bucket)); - } catch (IOException e) { - throw new CacheLoaderException(e); - } - } + if (bucket != null) + bucket.setBucketName(bucketName); + return bucket; + } catch (Exception e) { + throw convertToCacheLoaderException("Unable to read blob", e); + } + } - private Bucket readFromBlob(Blob blob, String bucketName) throws CacheLoaderException { - if (blob == null) - return null; - try { - Bucket bucket = (Bucket) marshaller.objectFromInputStream(blob.getContent()); - if (bucket != null) - bucket.setBucketName(bucketName); - return bucket; - } catch (Exception e) { - throw convertToCacheLoaderException("Unable to read blob", e); - } - } + private byte[] compress(final byte[] payloadBuffer) throws IOException { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + InputStream input = new ByteArrayInputStream(payloadBuffer); + GZIPOutputStream output = new GZIPOutputStream(baos); + byte[] buf = new byte[1024]; - private String encodeBucketName(String decodedName) { - return (decodedName.startsWith("-")) ? decodedName.replace('-', 'A') : decodedName; - } + int bytesRead = input.read(buf); + while (bytesRead != -1) { + output.write(buf, 0, bytesRead); + bytesRead = input.read(buf); + } + input.close(); + output.flush(); + output.finish(); + output.close(); + final byte[] byteArray = baos.toByteArray(); + return byteArray; + } + + private String encodeBucketName(String decodedName) { + final String name = (decodedName.startsWith("-")) ? decodedName + .replace('-', 'A') : decodedName; + if (cfg.isCompress()) + return name + ".gz"; + return name; + } } Index: src/main/java/org/infinispan/loaders/cloud/CloudCacheStoreConfig.java =================================================================== --- src/main/java/org/infinispan/loaders/cloud/CloudCacheStoreConfig.java (revision 1553) +++ src/main/java/org/infinispan/loaders/cloud/CloudCacheStoreConfig.java (working copy) @@ -34,7 +34,17 @@ private String cloudService; private int maxConnections = 10000; private boolean secure = true; - private String cloudServiceLocation = "DEFAULT"; + private boolean compress = true; + + public boolean isCompress() { + return compress; + } + + public void setCompress(boolean compress) { + this.compress = compress; + } + + private final String cloudServiceLocation = "DEFAULT"; private static final long serialVersionUID = -9011054600279256849L; public CloudCacheStoreConfig() { @@ -133,6 +143,8 @@ if (maxConnections != that.maxConnections) return false; if (requestTimeout != that.requestTimeout) return false; if (secure != that.secure) return false; + if (compress != that.compress) + return false; if (bucketPrefix != null ? !bucketPrefix.equals(that.bucketPrefix) : that.bucketPrefix != null) return false; if (cloudService != null ? !cloudService.equals(that.cloudService) : that.cloudService != null) return false; if (cloudServiceLocation != null ? !cloudServiceLocation.equals(that.cloudServiceLocation) : that.cloudServiceLocation != null) @@ -158,6 +170,7 @@ result = 31 * result + (cloudService != null ? cloudService.hashCode() : 0); result = 31 * result + maxConnections; result = 31 * result + (secure ? 1 : 0); + result = 31 * result + (compress ? 1 : 0); result = 31 * result + (cloudServiceLocation != null ? cloudServiceLocation.hashCode() : 0); return result; } @@ -175,6 +188,7 @@ ", cloudService='" + cloudService + '\'' + ", maxConnections=" + maxConnections + ", secure=" + secure + + ", compress=" + compress + ", cloudServiceLocation='" + cloudServiceLocation + '\'' + '}'; }