Subject: [PATCH] ISPN-14860 Test massindexing on cache with sql store
---
Index: core/src/test/java/org/infinispan/test/SingleCacheManagerTest.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/core/src/test/java/org/infinispan/test/SingleCacheManagerTest.java b/core/src/test/java/org/infinispan/test/SingleCacheManagerTest.java
--- a/core/src/test/java/org/infinispan/test/SingleCacheManagerTest.java (revision c4ad934c01f00908d8a314b2e5d7b6f28ea90cc5)
+++ b/core/src/test/java/org/infinispan/test/SingleCacheManagerTest.java (revision 48d569f48a82dbc199aeba8353e640e019dc74e2)
@@ -84,7 +84,7 @@
@AfterMethod(alwaysRun=true)
protected void clearContent() {
- if (cleanupAfterTest()) clearCacheManager();
+ if (cleanupAfterMethod()) clearCacheManager();
}
protected ConfigurationBuilder getDefaultStandaloneCacheConfig(boolean transactional) {
Index: persistence/sql/pom.xml
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/persistence/sql/pom.xml b/persistence/sql/pom.xml
--- a/persistence/sql/pom.xml (revision c4ad934c01f00908d8a314b2e5d7b6f28ea90cc5)
+++ b/persistence/sql/pom.xml (revision 48d569f48a82dbc199aeba8353e640e019dc74e2)
@@ -108,6 +108,17 @@
test
+
+ org.infinispan
+ infinispan-query
+ test
+
+
+
+ org.assertj
+ assertj-core
+ test
+
org.kohsuke.metainf-services
Index: persistence/sql/src/test/java/org/infinispan/persistence/sql/massindexer/GamePlayer.java
===================================================================
diff --git a/persistence/sql/src/test/java/org/infinispan/persistence/sql/massindexer/GamePlayer.java b/persistence/sql/src/test/java/org/infinispan/persistence/sql/massindexer/GamePlayer.java
new file mode 100644
--- /dev/null (revision 48d569f48a82dbc199aeba8353e640e019dc74e2)
+++ b/persistence/sql/src/test/java/org/infinispan/persistence/sql/massindexer/GamePlayer.java (revision 48d569f48a82dbc199aeba8353e640e019dc74e2)
@@ -0,0 +1,56 @@
+package org.infinispan.persistence.sql.massindexer;
+
+import org.infinispan.api.annotations.indexing.Basic;
+import org.infinispan.api.annotations.indexing.Indexed;
+import org.infinispan.api.annotations.indexing.Keyword;
+import org.infinispan.api.annotations.indexing.Text;
+import org.infinispan.protostream.GeneratedSchema;
+import org.infinispan.protostream.annotations.AutoProtoSchemaBuilder;
+import org.infinispan.protostream.annotations.ProtoFactory;
+import org.infinispan.protostream.annotations.ProtoField;
+
+@Indexed
+public class GamePlayer {
+
+ private String nick;
+ private Integer ranking;
+ private String game;
+ private String history;
+
+ @ProtoFactory
+ public GamePlayer(String nick, Integer ranking, String game, String history) {
+ this.nick = nick;
+ this.ranking = ranking;
+ this.game = game;
+ this.history = history;
+ }
+
+ @Keyword(normalizer = "lowercase")
+ @ProtoField(value = 1)
+ public String getNick() {
+ return nick;
+ }
+
+ @Basic
+ @ProtoField(value = 2)
+ public Integer getRanking() {
+ return ranking;
+ }
+
+ @Keyword(normalizer = "lowercase")
+ @ProtoField(value = 3)
+ public String getGame() {
+ return game;
+ }
+
+ @Text
+ @ProtoField(value = 4)
+ public String getHistory() {
+ return history;
+ }
+
+ @AutoProtoSchemaBuilder(schemaPackageName = "play", includeClasses = GamePlayer.class)
+ public interface GamePlayerSchema extends GeneratedSchema {
+ GamePlayerSchema INSTANCE = new GamePlayerSchemaImpl();
+ }
+}
Index: persistence/sql/src/test/java/org/infinispan/persistence/sql/massindexer/PersistenceSQLMassIndexingTest.java
===================================================================
diff --git a/persistence/sql/src/test/java/org/infinispan/persistence/sql/massindexer/PersistenceSQLMassIndexingTest.java b/persistence/sql/src/test/java/org/infinispan/persistence/sql/massindexer/PersistenceSQLMassIndexingTest.java
new file mode 100644
--- /dev/null (revision 48d569f48a82dbc199aeba8353e640e019dc74e2)
+++ b/persistence/sql/src/test/java/org/infinispan/persistence/sql/massindexer/PersistenceSQLMassIndexingTest.java (revision 48d569f48a82dbc199aeba8353e640e019dc74e2)
@@ -0,0 +1,208 @@
+package org.infinispan.persistence.sql.massindexer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.infinispan.util.concurrent.CompletionStages.join;
+import static org.testng.Assert.assertEquals;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Stream;
+
+import org.infinispan.Cache;
+import org.infinispan.commons.dataconversion.MediaType;
+import org.infinispan.configuration.cache.CacheMode;
+import org.infinispan.configuration.cache.ConfigurationBuilder;
+import org.infinispan.configuration.cache.IndexStorage;
+import org.infinispan.configuration.cache.IndexingMode;
+import org.infinispan.configuration.global.GlobalConfigurationBuilder;
+import org.infinispan.container.entries.CacheEntry;
+import org.infinispan.context.Flag;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.persistence.jdbc.common.DatabaseType;
+import org.infinispan.persistence.jdbc.common.SqlManager;
+import org.infinispan.persistence.jdbc.common.configuration.ConnectionFactoryConfiguration;
+import org.infinispan.persistence.jdbc.common.configuration.ConnectionFactoryConfigurationBuilder;
+import org.infinispan.persistence.jdbc.common.connectionfactory.ConnectionFactory;
+import org.infinispan.persistence.sql.configuration.QueriesJdbcStoreConfigurationBuilder;
+import org.infinispan.query.Indexer;
+import org.infinispan.query.Search;
+import org.infinispan.query.core.stats.IndexInfo;
+import org.infinispan.test.SingleCacheManagerTest;
+import org.infinispan.test.fwk.CleanupAfterTest;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.testng.annotations.Test;
+
+@Test(groups = "functional", testName = "persistence.sql.massindexer.PersistenceSQLMassIndexingTest")
+@CleanupAfterTest
+public class PersistenceSQLMassIndexingTest extends SingleCacheManagerTest {
+
+ private static final String ANOTHER_CACHE = "anotherCache";
+
+ private static final String DESCRIPTION = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, " +
+ "sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Cum sociis natoque penatibus et magnis." +
+ " Vel pharetra vel turpis nunc eget. Ullamcorper eget nulla facilisi etiam dignissim diam. " +
+ "Bibendum arcu vitae elementum curabitur vitae nunc sed.";
+
+ private static final int NUM_ENTITIES = 10_000;
+
+ @Override
+ protected EmbeddedCacheManager createCacheManager() {
+ ConfigurationBuilder config = new ConfigurationBuilder();
+ QueriesJdbcStoreConfigurationBuilder jdbcStoreConfig = config
+ .clustering()
+ .cacheMode(CacheMode.REPL_SYNC)
+ .encoding()
+ .mediaType(MediaType.APPLICATION_PROTOSTREAM_TYPE)
+ .indexing()
+ .enable()
+ .storage(IndexStorage.LOCAL_HEAP)
+ .indexingMode(IndexingMode.MANUAL)
+ .addIndexedEntity(GamePlayer.class)
+ .persistence()
+ .passivation(false)
+ .addStore(QueriesJdbcStoreConfigurationBuilder.class)
+ .preload(false);
+
+ SqlManager sqlManager = SqlManager.fromDatabaseType(DatabaseType.H2, "GamePlayer", true);
+
+ List keyColumns = Collections.singletonList("id");
+ List allColumns = Arrays.asList("id", "nick", "ranking", "game", "history");
+
+ jdbcStoreConfig.queries()
+ .size("select count(*) from GamePlayer")
+ .select("select id, nick, ranking, game, history from GamePlayer where id = :id")
+ .selectAll("select id, nick, ranking, game, history from GamePlayer")
+ .upsert(sqlManager.getUpsertStatement(keyColumns, allColumns))
+ .delete("delete from GamePlayer where id = :id")
+ .deleteAll("delete from GamePlayer");
+
+ jdbcStoreConfig
+ .connectionPool()
+ .driverClass(org.h2.Driver.class)
+ .connectionUrl("jdbc:h2:mem:players;DB_CLOSE_DELAY=-1")
+ .username("sa");
+
+ jdbcStoreConfig.keyColumns("id");
+ jdbcStoreConfig.schema()
+ .packageName("play")
+ .messageName("GamePlayer")
+ .embeddedKey(false);
+
+ createTable(jdbcStoreConfig);
+
+ GlobalConfigurationBuilder globalBuilder = new GlobalConfigurationBuilder().clusteredDefault();
+ globalBuilder.serialization().addContextInitializer(GamePlayer.GamePlayerSchema.INSTANCE);
+
+ EmbeddedCacheManager manager = TestCacheManagerFactory.createClusteredCacheManager(globalBuilder, config);
+ manager.defineConfiguration(ANOTHER_CACHE, config.build());
+
+ return manager;
+ }
+
+ @Override
+ protected void setup() throws Exception {
+ super.setup();
+ populateTheCache();
+ }
+
+ @Test
+ public void tryLoop() {
+ AtomicInteger sink = new AtomicInteger();
+
+ long start = System.currentTimeMillis();
+ try (Stream> stream = cache.getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL)
+ .cacheEntrySet().stream()) {
+ stream.forEach(
+ (val) -> {
+ GamePlayer player = (GamePlayer) val.getValue();
+ sink.addAndGet(player.getRanking());
+ }
+ );
+ }
+ long duration = System.currentTimeMillis() - start;
+ log.infov("try-loop duration -> " + duration);
+
+ assertEquals(sink.get(), 45_000);
+ }
+
+ @Test
+ public void testMassIndexer() {
+ populateTheCache();
+
+ IndexInfo indexInfo = getIndexInfo(cache);
+ assertThat(indexInfo.count()).isZero();
+
+ Indexer massIndexer = Search.getIndexer(cache);
+ runIndexer(massIndexer);
+
+ indexInfo = getIndexInfo(cache);
+ assertThat(indexInfo.count()).isEqualTo(NUM_ENTITIES);
+
+ // reindex another cache using the same cache store
+ Cache anotherCache = cacheManager.getCache(ANOTHER_CACHE);
+
+ indexInfo = getIndexInfo(anotherCache);
+ assertThat(indexInfo.count()).isZero();
+
+ massIndexer = Search.getIndexer(anotherCache);
+ runIndexer(massIndexer);
+
+ indexInfo = getIndexInfo(anotherCache);
+ assertThat(indexInfo.count()).isEqualTo(NUM_ENTITIES);
+ }
+
+ private void populateTheCache() {
+ HashMap players = new HashMap<>();
+ for (int k = 0; k < NUM_ENTITIES; k++) {
+ players.put(k, new GamePlayer("nick-" + k, k % 10, "Dark Age Of Camelot", DESCRIPTION));
+ }
+ cache.putAll(players);
+ }
+
+ private void createTable(QueriesJdbcStoreConfigurationBuilder jdbcStoreConfig) {
+ ConnectionFactoryConfigurationBuilder connectionFactory =
+ jdbcStoreConfig.getConnectionFactory();
+ String createTable = "create table GamePlayer (" +
+ "id int not null, " +
+ "nick varchar(63) not null, " +
+ "ranking int, " +
+ "game varchar(63), " +
+ "history varchar(511)," +
+ "primary key (id))";
+
+ ConnectionFactoryConfiguration cfc = connectionFactory.create();
+ ConnectionFactory factory = ConnectionFactory.getConnectionFactory(cfc.connectionFactoryClass());
+ factory.start(cfc, getClass().getClassLoader());
+
+ Connection connection = null;
+ try {
+ connection = factory.getConnection();
+ try (Statement stmt = connection.createStatement()) {
+ stmt.execute(createTable);
+ }
+ } catch (SQLException t) {
+ throw new AssertionError(t);
+ } finally {
+ factory.releaseConnection(connection);
+ factory.stop();
+ }
+ }
+
+ private IndexInfo getIndexInfo(Cache, ?> cache) {
+ return join(Search.getSearchStatistics(cache).getIndexStatistics().computeIndexInfos()).get(GamePlayer.class.getName());
+ }
+
+ private static void runIndexer(Indexer massIndexer) {
+ long start = System.currentTimeMillis();
+ join(massIndexer.runLocal());
+ long end = System.currentTimeMillis();
+ long duration = end - start;
+ log.infov("runIndexer duration -> " + duration);
+ }
+}