-
Bug
-
Resolution: Cannot Reproduce
-
Major
-
None
-
7.0.0.CR1, 7.0.0.CR2, 7.0.0.Final
-
None
Two async listeners created
{InvalidationListener, DistributionListener}and check if async events are fired on cache
- InvalidationListener
/** * Custom listener for handling notification events. */ @Listener(sync=false) public class InvalidationListener { public AtomicInteger keyId = new AtomicInteger(); public String nodeName; public Thread threadRunningNotif; public volatile boolean isInvalidated = false; public InvalidationListener(final String nodeName) { this.nodeName = nodeName; } /** * Handles the entry created event. * @param event */ @CacheEntryCreated public void handleCreationAndModification(Event event) { if(event.isPre()) { threadRunningNotif = Thread.currentThread(); keyId.getAndIncrement(); } } /** * Handles the entry invalidated event. * @param event */ @CacheEntryInvalidated public void handleInvalidation(CacheEntryInvalidatedEvent event) { if (event.isPre()) { System.out.println("Data is invalidated."); isInvalidated = true; threadRunningNotif = Thread.currentThread(); keyId.getAndIncrement(); } }
- DistributionListener
/** * Custom listener for verifying all notificaitions. */ @Listener(sync=false) public class DistributionListener { public AtomicInteger keyId = new AtomicInteger(); public String nodeName; public Thread topologyThreadRunningNotif = null; public Thread threadRunningNotif; public volatile boolean isRehashed= false; public volatile boolean isTopologyChanged = false; public DistributionListener(final String nodeName) { this.nodeName = nodeName; } /** * Handling data creation event. * @param event */ @CacheEntryCreated public void handleCreationAndModification(Event event) { if(event.isPre()) { threadRunningNotif = Thread.currentThread(); keyId.getAndIncrement(); } } /** * Handles data rehashed event. * @param evn */ @DataRehashed public void handleDataRehash(Event evn) { if(evn.isPre()) { System.out.println("Data Rehashed on " + nodeName); isRehashed = true; threadRunningNotif = Thread.currentThread(); } } /** * Handles cluster topology changed event. * @param event */ @TopologyChanged public void handleTopologyChange(TopologyChangedEvent event) { if(!event.isPre()) { System.out.println("Topology changed." + nodeName); isTopologyChanged = true; topologyThreadRunningNotif = Thread.currentThread(); } } }
no CacheEntryInvalidated event is fired
/** * Checks that the cache on node1 is invalidated and the CacheInvalidated event is fired and the notification went * asynchronously. * * @throws Exception */ @Test @OperateOnDeployment("node1") @InSequence(3) public void testNode1Invalidation() throws Exception { eventually("The elements should be invalidated.", new Condition() { @Override public boolean isSatisfied() throws Exception { return listenerNode1.isInvalidated; } }, 15000, 15); assertTrue("The invalidation should be done asynchronously", listenerNode1.threadRunningNotif != Thread.currentThread()); assertTrue("The number of called invalidation method should be <= 50, but is " + listenerNode1.keyId.get(), listenerNode1.keyId.get() <= 50); }No DataRehashed event is fired
/** * Verifying that the data has been rehashed on node1, the event DataRehashed was fired and the notification was * asynchronous. * @throws Exception */ @Test @OperateOnDeployment("node1") @InSequence(3) public void testDataRehashEventNode1Verifying() throws Exception { eventually("The rehash should happen.", new Condition() { @Override public boolean isSatisfied() throws Exception { return listenerNode1.isRehashed; } }, 15000, 15); assertTrue("The rehash listener thread should be different rather than the current.", Thread.currentThread() != listenerNode1.threadRunningNotif); assertTrue("The topology should be changed so far on node1.", listenerNode1.isTopologyChanged); assertTrue("The topology should be changed with different thread rather than this one on node1.", listenerNode1.topologyThreadRunningNotif != Thread.currentThread()); }