import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import org.infinispan.Cache;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.remoting.transport.jgroups.JGroupsTransport;
import org.junit.Test;

public class RemoveCacheInfinispanCacheTest {
    // PASS
    @Test
    public void testRemoveCache_InvalidationAsync_Passes() throws Exception {
        DefaultCacheManager cacheManager0 = getClusteredCacheManager(0);
        DefaultCacheManager cacheManager1 = getClusteredCacheManager(1);
        waitForClusterFormation(cacheManager0, 2);

        try {
            Cache<Object, Object> cache0_0 = cacheManager0.getCache("cache0", CacheMode.INVALIDATION_ASYNC.toString());
            Cache<Object, Object> cache0_1 = cacheManager1.getCache("cache0", CacheMode.INVALIDATION_ASYNC.toString());
            cache0_0.put("key0_0", "value0_0");
            cache0_1.put("key0_1", "value0_1");

            assertTrue(cacheManager0.getCacheNames().contains("cache0"));
            assertTrue(cacheManager1.getCacheNames().contains("cache0"));

            // Removal of a cache using a cluster cache mode causes removing in the entire cluster.
            cacheManager0.removeCache("cache0");
            assertFalse(cacheManager0.getCacheNames().contains("cache0"));
            assertFalse(cacheManager1.getCacheNames().contains("cache0"));
        } finally {
            cacheManager0.stop();
            cacheManager1.stop();
        }
    }

    // FAIL
    @Test
    public void testRemoveCache_Local_Fails() throws Exception {
        DefaultCacheManager cacheManager0 = getClusteredCacheManager(0);
        DefaultCacheManager cacheManager1 = getClusteredCacheManager(1);
        waitForClusterFormation(cacheManager0, 2);

        try {
            Cache<Object, Object> cache0_0 = cacheManager0.getCache("cache0", CacheMode.LOCAL.toString());
            Cache<Object, Object> cache0_1 = cacheManager1.getCache("cache0", CacheMode.LOCAL.toString());
            cache0_0.put("key0_0", "value0_0");
            cache0_1.put("key0_1", "value0_1");

            assertTrue(cacheManager0.getCacheNames().contains("cache0"));
            assertTrue(cacheManager1.getCacheNames().contains("cache0"));

            // A local cache should be removable.  It isn't a cluster aware cache mode, so I wouldn't expect the remove to propagate
            // through the cluster, it should affect the local node only.  However, documentation is not unclear.
            // REGARDLESS, even trying to remove it throws an NPE!
            cacheManager0.removeCache("cache0");
            assertFalse(cacheManager0.getCacheNames().contains("cache0"));
            assertTrue(cacheManager1.getCacheNames().contains("cache0"));

            cacheManager1.removeCache("cache0");
            assertFalse(cacheManager0.getCacheNames().contains("cache0"));
            assertFalse(cacheManager1.getCacheNames().contains("cache0"));
        } finally {
            cacheManager0.stop();
            cacheManager1.stop();
        }
    }

    private void waitForClusterFormation(final DefaultCacheManager cacheManager, final int clusterSize)
            throws InterruptedException {
        while (((JGroupsTransport) cacheManager.getTransport()).getMembers().size() != clusterSize) {
            Thread.sleep(100);
        }
    }

    private DefaultCacheManager getClusteredCacheManager(final int node) {
        //@formatter:off
        String configTemplate = "TCP(receive_on_all_interfaces = false; enable_diagnostics = true; sock_conn_timeout = 5000; bind_addr = localhost; bind_port = THE_PORT) : "
                + "TCPPING(initial_hosts = localhost[7800],localhost[7801]; port_range = 0; max_dynamic_hosts = 0; async_discovery = true) : "
                + "MERGE3(min_interval = 100; max_interval = 1000) : "
                + "FD(max_tries = 10; timeout = 1000) : "
                + "FD_SOCK() : "
                + "VERIFY_SUSPECT(timeout = 1000) : "
                + "pbcast.NAKACK2(use_mcast_xmit = false) : "
                + "UNICAST3() : "
                + "pbcast.STABLE(desired_avg_gossip = 120000) : "
                + "pbcast.GMS(join_timeout = 250; max_join_attempts = 0; print_local_addr = true)";
        //@formatter:on

        String nodeConfig = configTemplate.replace("THE_PORT", String.valueOf(7800 + node));

        GlobalConfigurationBuilder globalBuilder = GlobalConfigurationBuilder.defaultClusteredBuilder();
        globalBuilder.globalJmxStatistics().allowDuplicateDomains(true);
        globalBuilder.clusteredDefault().transport().addProperty(JGroupsTransport.CONFIGURATION_STRING, nodeConfig);

        DefaultCacheManager cacheManager = new DefaultCacheManager(globalBuilder.build());
        {
            ConfigurationBuilder builder = new ConfigurationBuilder();
            builder.clustering().cacheMode(CacheMode.LOCAL);
            cacheManager.defineConfiguration(CacheMode.LOCAL.toString(), builder.build());
        }
        {
            ConfigurationBuilder builder = new ConfigurationBuilder();
            builder.clustering().cacheMode(CacheMode.INVALIDATION_ASYNC);
            cacheManager.defineConfiguration(CacheMode.INVALIDATION_ASYNC.toString(), builder.build());
        }
        return cacheManager;
    }
}
