Index: core/src/main/java/org/infinispan/container/impl/InternalDataContainer.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/java/org/infinispan/container/impl/InternalDataContainer.java (revision dfcf9e47c6633265df9a06f9991e9a7c015bb6fc) +++ core/src/main/java/org/infinispan/container/impl/InternalDataContainer.java (date 1633706384285) @@ -203,6 +203,11 @@ return Flowable.fromIterable(() -> iterator(IntSets.immutableSet(segment))); } + default Publisher> publisher(IntSet segments) { + return Flowable.fromIterable(segments) + .flatMap(this::publisher); + } + /** * Performs the given action for each element of the container that maps to the given set of segments * until all elements have been processed or the action throws an exception. Unless otherwise specified by the Index: core/src/test/java/org/infinispan/reactive/publisher/impl/RehashClusterPublisherManagerTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/test/java/org/infinispan/reactive/publisher/impl/RehashClusterPublisherManagerTest.java (revision dfcf9e47c6633265df9a06f9991e9a7c015bb6fc) +++ core/src/test/java/org/infinispan/reactive/publisher/impl/RehashClusterPublisherManagerTest.java (date 1633706384314) @@ -19,6 +19,8 @@ import java.util.stream.Stream; import org.infinispan.Cache; +import org.infinispan.commons.test.ExceptionRunnable; +import org.infinispan.commons.util.IntSets; import org.infinispan.configuration.cache.CacheMode; import org.infinispan.configuration.cache.ConfigurationBuilder; import org.infinispan.container.impl.InternalDataContainer; @@ -27,7 +29,6 @@ import org.infinispan.reactive.publisher.impl.commands.reduction.ReductionPublisherRequestCommand; import org.infinispan.remoting.rpc.RpcManager; import org.infinispan.remoting.transport.Address; -import org.infinispan.commons.test.ExceptionRunnable; import org.infinispan.test.Mocks; import org.infinispan.test.MultipleCacheManagersTest; import org.infinispan.test.TestDataSCI; @@ -35,6 +36,7 @@ import org.infinispan.test.fwk.CheckPoint; import org.infinispan.util.ControlledConsistentHashFactory; import org.mockito.Mockito; +import org.mockito.stubbing.Answer; import org.reactivestreams.Publisher; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; @@ -78,7 +80,7 @@ return Arrays.stream(DeliveryGuarantee.values()) .flatMap(dg -> Stream.of(Boolean.TRUE, Boolean.FALSE) .flatMap(parallel -> Stream.of(Boolean.TRUE, Boolean.FALSE) - .map(entry -> new Object[]{dg, parallel, entry }))) + .map(entry -> new Object[]{dg, parallel, entry}))) .toArray(Object[][]::new); } @@ -116,9 +118,12 @@ CheckPoint checkPoint = new CheckPoint(); // Always let it finish once released checkPoint.triggerForever(Mocks.AFTER_RELEASE); - // Block on the checkpoint when it is requesting segment 2 from node 2 + // Block on the checkpoint when it is requesting segment 2 from node 2 (need both as different methods are invoked + // if the invocation is parallel) Mocks.blockingMock(checkPoint, InternalDataContainer.class, cache2, (stub, m) -> stub.when(m).publisher(Mockito.eq(2))); + Mocks.blockingMock(checkPoint, InternalDataContainer.class, cache2, + (stub, m) -> stub.when(m).publisher(Mockito.eq(IntSets.immutableSet(2)))); int expectedAmount = caches().size(); // If it is at most once, we don't retry the segment so the count will be off by 1 @@ -198,10 +203,13 @@ InternalDataContainer internalDataContainer = TestingUtil.extractComponent(cache2, InternalDataContainer.class); InternalDataContainer spy = spy(internalDataContainer); - doAnswer(invocation -> { + Answer blockingAnswer = invocation -> { Publisher result = (Publisher) invocation.callRealMethod(); return Mocks.blockingPublisher(result, checkPoint); - }).when(spy).publisher(eq(2)); + }; + // Depending upon if it is parallel or not, it can invoke either method + doAnswer(blockingAnswer).when(spy).publisher(eq(2)); + doAnswer(blockingAnswer).when(spy).publisher(eq(IntSets.immutableSet(2))); TestingUtil.replaceComponent(cache2, InternalDataContainer.class, spy, true); Index: core/src/test/java/org/infinispan/expiration/impl/ExpirationStoreFunctionalTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/test/java/org/infinispan/expiration/impl/ExpirationStoreFunctionalTest.java (revision dfcf9e47c6633265df9a06f9991e9a7c015bb6fc) +++ core/src/test/java/org/infinispan/expiration/impl/ExpirationStoreFunctionalTest.java (date 1633705826372) @@ -1,5 +1,10 @@ package org.infinispan.expiration.impl; +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertNotNull; + +import java.util.concurrent.TimeUnit; + import org.infinispan.configuration.cache.CacheMode; import org.infinispan.configuration.cache.ConfigurationBuilder; import org.infinispan.persistence.dummy.DummyInMemoryStoreConfigurationBuilder; @@ -9,25 +14,77 @@ @Test(groups = "functional", testName = "expiration.impl.ExpirationStoreFunctionalTest") public class ExpirationStoreFunctionalTest extends ExpirationFunctionalTest { + private boolean passivationEnabled; + + private final int MAX_IN_MEMORY = 5; + + ExpirationStoreFunctionalTest passivation(boolean enable) { + this.passivationEnabled = enable; + return this; + } + @Factory @Override public Object[] factory() { return new Object[]{ // Test is for dummy store and we don't care about memory storage types - new ExpirationStoreFunctionalTest().cacheMode(CacheMode.LOCAL), + new ExpirationStoreFunctionalTest().passivation(true).cacheMode(CacheMode.LOCAL), + new ExpirationStoreFunctionalTest().passivation(false).cacheMode(CacheMode.LOCAL), }; } @Override protected String parameters() { - return null; + return "[passivation= " + passivationEnabled + "]"; } @Override protected void configure(ConfigurationBuilder config) { config - // Prevent the reaper from running, reaperEnabled(false) doesn't work when a store is present - .expiration().wakeUpInterval(Long.MAX_VALUE) - .persistence().addStore(DummyInMemoryStoreConfigurationBuilder.class); + .memory().maxCount(passivationEnabled ? MAX_IN_MEMORY : -1) + // Prevent the reaper from running, reaperEnabled(false) doesn't work when a store is present + .expiration().wakeUpInterval(Long.MAX_VALUE) + .persistence().passivation(passivationEnabled) + .addStore(DummyInMemoryStoreConfigurationBuilder.class); + } + + @Override + protected int maxInMemory() { + return passivationEnabled ? MAX_IN_MEMORY : super.maxInMemory(); + } + + public void testMaxIdleWithPassivation() { + cache.put("will-expire", "uh oh", -1, null, 1, TimeUnit.MILLISECONDS); + // Approximately half of these will be in memory with passivation, with rest in the store + for (int i = 0; i < SIZE; i++) { + cache.put("key-" + i, "value-" + i, -1, null, 10, TimeUnit.MILLISECONDS); + } + + assertEquals(SIZE + 1, cache.size()); + + timeService.advance(6); + + assertEquals(SIZE, cache.size()); + + // Now we read just a few of them + assertNotNull(cache.get("key-" + 1)); + assertNotNull(cache.get("key-" + 6)); + assertNotNull(cache.get("key-" + 3)); + + processExpiration(); + + // This will expire all but the 3 we touched above + timeService.advance(6); + + assertEquals(3, cache.size()); + + // This will expire the rest + timeService.advance(6); + + assertEquals(0, cache.size()); + + processExpiration(); + + assertEquals(0, cache.getAdvancedCache().getDataContainer().sizeIncludingExpired()); } } Index: core/src/test/java/org/infinispan/expiration/impl/ExpirationFunctionalTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/test/java/org/infinispan/expiration/impl/ExpirationFunctionalTest.java (revision dfcf9e47c6633265df9a06f9991e9a7c015bb6fc) +++ core/src/test/java/org/infinispan/expiration/impl/ExpirationFunctionalTest.java (date 1633705826360) @@ -2,7 +2,9 @@ import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertFalse; +import static org.testng.AssertJUnit.assertNotNull; import static org.testng.AssertJUnit.assertNull; +import static org.testng.AssertJUnit.assertTrue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -113,17 +115,22 @@ assertEquals(0, cache.size()); } + protected int maxInMemory() { + return SIZE; + } + public void testSimpleExpirationMaxIdle() throws Exception { for (int i = 0; i < SIZE; i++) { - cache.put("key-" + i, "value-" + i,-1, null, 1, TimeUnit.MILLISECONDS); + cache.put("key-" + i, "value-" + i, -1, null, 1, TimeUnit.MILLISECONDS); } timeService.advance(2); assertEquals(0, cache.size()); // Only processExpiration actually removes the entries - assertEquals(SIZE, cache.getAdvancedCache().getDataContainer().sizeIncludingExpired()); + assertEquals(maxInMemory(), cache.getAdvancedCache().getDataContainer().sizeIncludingExpired()); processExpiration(); + assertEquals(0, cache.size()); assertEquals(0, cache.getAdvancedCache().getDataContainer().sizeIncludingExpired()); } @@ -148,27 +155,35 @@ public void testExpirationLifespanInOps() throws Exception { for (int i = 0; i < SIZE; i++) { - cache.put("key-" + i, "value-" + i, 1, TimeUnit.MILLISECONDS); + long expirationTime = i % 2 == 0 ? 1 : 1000; + cache.put("key-" + i, "value-" + i, expirationTime, TimeUnit.MILLISECONDS); } timeService.advance(2); - for (int i = 0; i < SIZE; i++) { - assertFalse(cache.containsKey("key-" + i)); - assertNull(cache.get("key-" + i)); - assertNull(cache.remove("key-" + i)); - } + checkOddExist(SIZE); } public void testExpirationMaxIdleInOps() throws Exception { for (int i = 0; i < SIZE; i++) { - cache.put("key-" + i, "value-" + i,-1, null, 1, TimeUnit.MILLISECONDS); + long expirationTime = i % 2 == 0 ? 1 : 1000; + cache.put("key-" + i, "value-" + i, -1, null, expirationTime, TimeUnit.MILLISECONDS); } timeService.advance(2); + checkOddExist(SIZE); + } + + protected void checkOddExist(int SIZE) { for (int i = 0; i < SIZE; i++) { - assertFalse(cache.containsKey("key-" + i)); - assertNull(cache.get("key-" + i)); - assertNull(cache.remove("key-" + i)); + if (i % 2 == 0) { + assertFalse(cache.containsKey("key-" + i)); + assertNull(cache.get("key-" + i)); + assertNull(cache.remove("key-" + i)); + } else { + assertTrue(cache.containsKey("key-" + i)); + assertNotNull(cache.get("key-" + i)); + assertNotNull(cache.remove("key-" + i)); + } } } @@ -179,10 +194,10 @@ timeService.advance(2); cache.getAdvancedCache().getDataContainer() - .forEach(ice -> { - throw new RuntimeException( - "No task should be executed on expired entry"); - }); + .forEach(ice -> { + throw new RuntimeException( + "No task should be executed on expired entry"); + }); } public void testExpirationMaxIdleDataContainerIterator() throws Exception { @@ -203,7 +218,7 @@ AtomicInteger invocationCount = new AtomicInteger(); cache.getAdvancedCache().getDataContainer().iteratorIncludingExpired().forEachRemaining(ice -> invocationCount.incrementAndGet()); - assertEquals(SIZE, invocationCount.get()); + assertEquals(maxInMemory(), invocationCount.get()); processExpiration(); cache.getAdvancedCache().getDataContainer() Index: core/src/main/java/org/infinispan/configuration/cache/ExpirationConfigurationBuilder.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/java/org/infinispan/configuration/cache/ExpirationConfigurationBuilder.java (revision dfcf9e47c6633265df9a06f9991e9a7c015bb6fc) +++ core/src/main/java/org/infinispan/configuration/cache/ExpirationConfigurationBuilder.java (date 1633705826319) @@ -64,13 +64,23 @@ /** * Maximum idle time a cache entry will be maintained in the cache, in milliseconds. If the idle * time is exceeded, the entry will be expired cluster-wide. -1 means the entries never expire. - * + *

* Note that this can be overridden on a per-entry basis by using the Cache API. */ public ExpirationConfigurationBuilder maxIdle(long l, TimeUnit unit) { return maxIdle(unit.toMillis(l)); } + /** + * Maximum idle time a cache entry will be maintained in the cache, in milliseconds. If the idle + * time is exceeded, the entry will be expired cluster-wide. -1 means the entries never expire. + * + * @return the max idle setting, default is -1 for disabled + */ + public long maxIdle() { + return attributes.attribute(MAX_IDLE).get(); + } + /** * Enable the background reaper to test entries for expiration. * Regardless of whether a reaper is used, entries are tested for expiration lazily when they are Index: core/src/test/java/org/infinispan/persistence/PassivationOptionsTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/test/java/org/infinispan/persistence/PassivationOptionsTest.java (date 1633705826385) +++ core/src/test/java/org/infinispan/persistence/PassivationOptionsTest.java (date 1633705826385) @@ -0,0 +1,59 @@ +package org.infinispan.persistence; + +import java.util.stream.Stream; + +import org.infinispan.commons.CacheConfigurationException; +import org.infinispan.commons.test.Exceptions; +import org.infinispan.configuration.cache.ConfigurationBuilder; +import org.infinispan.expiration.impl.CustomLoaderNonNullWithExpirationTest; +import org.infinispan.manager.EmbeddedCacheManagerStartupException; +import org.infinispan.persistence.dummy.DummyInMemoryStoreConfigurationBuilder; +import org.infinispan.test.AbstractInfinispanTest; +import org.infinispan.test.fwk.TestCacheManagerFactory; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Test(groups = "functional", testName = "persistence.PassivationOptionsTest") +public class PassivationOptionsTest extends AbstractInfinispanTest { + public void testPassivationWithLoader() { + ConfigurationBuilder builder = new ConfigurationBuilder(); + builder.persistence() + .passivation(true) + .addStore(CustomLoaderNonNullWithExpirationTest.SimpleLoaderConfigurationBuilder.class) + .segmented(false); + + TestCacheManagerFactory.createCacheManager(builder); + } + + @DataProvider(name = "passivationEnabled") + public Object[][] maxIdlePassivationParams() { + return Stream.of(Boolean.TRUE, Boolean.FALSE) + .flatMap(passivationEnabled -> + Stream.of(Boolean.TRUE, Boolean.FALSE) + .map(maxIdleEnabled -> new Object[]{passivationEnabled, maxIdleEnabled})) + .toArray(Object[][]::new); + } + + + @Test(dataProvider = "passivationEnabled") + public void testPassivationWithMaxIdle(boolean passivationEnabled, boolean maxIdleEnabled) { + ConfigurationBuilder builder = new ConfigurationBuilder(); + if (maxIdleEnabled) { + builder.expiration() + .maxIdle(10); + } + builder.persistence() + .passivation(passivationEnabled) + .addStore(DummyInMemoryStoreConfigurationBuilder.class); + + try { + TestCacheManagerFactory.createCacheManager(builder); + } catch (Throwable t) { + if (!passivationEnabled && maxIdleEnabled) { + Exceptions.assertException(EmbeddedCacheManagerStartupException.class, CacheConfigurationException.class, ".*Max idle is not allowed.*", t); + } else { + throw t; + } + } + } +} Index: core/src/main/java/org/infinispan/util/logging/Log.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/java/org/infinispan/util/logging/Log.java (revision dfcf9e47c6633265df9a06f9991e9a7c015bb6fc) +++ core/src/main/java/org/infinispan/util/logging/Log.java (date 1633705826352) @@ -2212,4 +2212,11 @@ @Message(value = "Read invalid data in SingleFileStore file %s, please remove the file and retry", id = 650) PersistenceException invalidSingleFileStoreData(String path); + + @Message(value = "Max idle is not allowed while using a store without passivation", id = 651) + CacheConfigurationException maxIdleNotAllowedWithoutPassivation(); + + @LogMessage(level = WARN) + @Message(value = "Max idle is not supported when using a store", id = 652) + void maxIdleNotTestedWithPassivation(); } Index: core/src/main/java/org/infinispan/persistence/manager/PersistenceManagerImpl.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/java/org/infinispan/persistence/manager/PersistenceManagerImpl.java (revision dfcf9e47c6633265df9a06f9991e9a7c015bb6fc) +++ core/src/main/java/org/infinispan/persistence/manager/PersistenceManagerImpl.java (date 1633705826341) @@ -2,6 +2,7 @@ import static java.util.Collections.singletonList; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.infinispan.util.logging.Log.CONFIG; import static org.infinispan.util.logging.Log.PERSISTENCE; import java.lang.invoke.MethodHandles; @@ -201,6 +202,16 @@ // Blocks here waiting for stores and availability task to start if needed storeStartup.blockingAwait(); + // If a store is not writeable, then max idle works fine as it only expires in memory, thus refreshing + // the value that can be read from the store + // Max idle is not currently supported with stores, it sorta works with passivation though + if (configuration.expiration().maxIdle() > 0 && + stores.stream().anyMatch(status -> !status.characteristics.contains(Characteristic.READ_ONLY))) { + if (!configuration.persistence().passivation()) { + throw CONFIG.maxIdleNotAllowedWithoutPassivation(); + } + CONFIG.maxIdleNotTestedWithPassivation(); + } allSegmentedOrShared = allStoresSegmentedOrShared(); } catch (Throwable t) { log.debug("PersistenceManagerImpl encountered an exception during startup of stores", t);