import static org.junit.Assert.assertTrue;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.infinispan.Cache;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfiguration;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.lucene.FileCacheKey;
import org.infinispan.lucene.FileMetadata;
import org.infinispan.lucene.readlocks.DistributedSegmentReadLocker;
import org.infinispan.lucene.readlocks.SegmentReadLocker;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.remoting.transport.jgroups.JGroupsTransport;
import org.junit.Test;

public class ReaderLockerTest {

  @Test
  public void test() throws Exception {
    ExecutorService threadPool = Executors.newFixedThreadPool(2);
    Future<Boolean> server1Passed = threadPool.submit(newTester());
    Thread.sleep(1000);
    Future<Boolean> server2Passed = threadPool.submit(newTester());

    assertTrue(server1Passed.get(6000, TimeUnit.SECONDS));
    assertTrue(server2Passed.get(6000, TimeUnit.SECONDS));
  }


  private static Callable<Boolean> newTester() {
    return new Callable<Boolean>() {
      @Override
      public Boolean call() {
        try {
          return new ReadLockerTester().runTest();
        } catch (Exception e) {
          e.printStackTrace();
          return false;
        }
      }
    };
  }

  private static class ReadLockerTester {

    public static final int READ_DELAY = 50;

    final Cache<Object, Object> metadataCache;
    final SegmentReadLocker segmentReadLocker;

    public ReadLockerTester() {
      EmbeddedCacheManager  cacheManager = startCacheManager();
      metadataCache = cacheManager.getCache("metadata");
      Cache<Object, Object> chunks = cacheManager.getCache("chunks");
      Cache<Object, Integer> locks = cacheManager.getCache("locks");

      segmentReadLocker = new DistributedSegmentReadLocker(locks, chunks, metadataCache, "indexName");
    }

    private EmbeddedCacheManager startCacheManager() {
      final GlobalConfiguration globalConfiguration =
          new GlobalConfigurationBuilder()
            .transport()
              .clusterName("TestChannel")
              .transport(new JGroupsTransport())
             .globalJmxStatistics()
              .enabled(false)
              .allowDuplicateDomains(true)
          .build();

        ConfigurationBuilder builder = new ConfigurationBuilder();
        Configuration configuration = builder
          .clustering()
            .cacheMode(CacheMode.DIST_SYNC)
             .stateTransfer()
                .awaitInitialTransfer(true)
                .fetchInMemoryState(true)
                .timeout(120000)
          .build();


        return new DefaultCacheManager(globalConfiguration, configuration, false);
    }

    public boolean runTest() throws InterruptedException {
      metadataCache.put(new FileCacheKey("indexName", "fileName"), new FileMetadata(10));

      final AtomicBoolean stopSignal = new AtomicBoolean();
      Thread thread = startReadLockerThread(stopSignal, "fileName");

      // Keep checking every 100ms. The file should never be deleted.
      for(int i = 0; i < 50; i++) {
        Thread.sleep(100);
        if (metadataCache.get(new FileCacheKey("indexName", "fileName")) == null) {

          // Stop the thread
          stopSignal.set(true);
          while(thread.isAlive()) {
          }

          return false;
        }
      }

      stopSignal.set(true);
      while(thread.isAlive()) {
      }


      return true;
    }

    private Thread startReadLockerThread(final AtomicBoolean stopSignal, final String fileName) {
      Thread thread = new Thread(new Runnable() {
        @Override
        public void run() {
          try {
            while (!stopSignal.get()) {
              segmentReadLocker.acquireReadLock(fileName);
              Thread.sleep(READ_DELAY);
              segmentReadLocker.deleteOrReleaseReadLock(fileName);
            }
          } catch (Throwable t) {
            t.printStackTrace();
          }
        }
      });

      thread.setDaemon(true);
      thread.start();
      return thread;
    }
  }
}
