//$Id: NakackTest.java,v 1.1 2007/07/04 07:29:33 belaban Exp $

package org.jgroups.tests;

import junit.framework.Test;

import junit.framework.TestCase;
import junit.framework.TestSuite;
import org.jgroups.*;
import org.jgroups.debug.Simulator;
import org.jgroups.protocols.pbcast.NAKACK;
import org.jgroups.stack.IpAddress;
import org.jgroups.stack.Protocol;
import org.jgroups.util.Util;

import java.util.Hashtable;
import java.util.Properties;
import java.util.Vector;
import java.util.Enumeration ;

/**
 * Tests the reliable FIFO (NAKACK) protocol
 * 
 * Two sender peers send 1000 messages to the group, where each message contains
 * a long value, mirroring seqnos used. A receiver peer receives the messages 
 * from each sender and checks that seqnos are received in the correct order.
 *
 * This test makes use of Simulator to test the protocol in 
 * isolation from a JChannel. Each peer is wrapped in a Simulator instance and
 * the instances are linked together to form the group.
 *
 * An object all_msgs_recd is used to allow the main test thread to discover when 
 * all sent messages have been received.
 *
 * The test case passes if the expected number of messages is received, and messages
 * are received in order from each sender. This implies that:
 * (i) all messages from each peer were received (reliable) and 
 * (ii) all messages from each peer are received in order (FIFO)
 * 
 * @uathor Richard Achmatowicz 
 * @author Bela Ban
 */
public class NakackTest2 extends TestCase {

    final static int NUM_PEERS = 3; 
    final static int NUM_MSGS = 1000; 
    final static int WAIT_TIMEOUT = 10; // secs
    final static int MSGS_PER_STATUS_LINE = 100; 

    // convey assertion failure from thread to main framework
    static boolean notFIFO = false ;
    static boolean allMsgsReceived = false ;
    
    IpAddress[] addresses = new IpAddress[NUM_PEERS] ;
    Vector<Address> members;
    View view;
    Simulator[] simulators = new Simulator[NUM_PEERS] ;
    NAKACK[] nakacklayers = new NAKACK[NUM_PEERS] ;
    Protocol[][] stacks = new Protocol[NUM_PEERS][] ;
    Thread[] threads = new Thread[NUM_PEERS] ;

    //define senders and receivers
    boolean[] isSender = new boolean[NUM_PEERS] ;

    // used to wait for signal that all messages received
    static Object all_msgs_recd = new Object() ;

    public NakackTest2(String name) {
	super(name);
    }

    /**
     * Set up a number of simulator instances wrapping NAKACK 
     */
    public void setUp() throws Exception {

	super.setUp();

	// define the senders and the receivers
	isSender[0] = false ;
	isSender[1] = true ;
	isSender[2] = true ;

	// dummy IP addresses and ports 
	addresses[0] = new IpAddress(1111);
	addresses[1] = new IpAddress(2222);
	addresses[2] = new IpAddress(3333);

	// dummy set of members which works for all three simulators
	members = new Vector<Address>();
	for (int i = 0 ; i < NUM_PEERS; i++) {
	    members.add(addresses[i]);
	}

	// create a dummy View(creator, timestamp, member set) 
	view = new View(addresses[0], 1, members);

	// create new simulator instances
	for (int i = 0; i < NUM_PEERS; i++) {
	    createNAKACKSimulator(simulators, view, addresses, nakacklayers, stacks, i) ;
	}

	// describe the configuration of the three simulators
	for (int i = 0; i < NUM_PEERS; i++) {
	    for (int j = 0; j < NUM_PEERS; j++) {
		if (i == j) 
		    simulators[i].addMember(addresses[j]) ;
		else 
		    simulators[i].addMember(addresses[j], simulators[j]) ;
		}
	}
		
	// start the simulators
	for (int i = 0; i < NUM_PEERS; i++) 
	    simulators[i].start();
    }

    public void tearDown() throws Exception {
	super.tearDown();

	// stop the simulators
	for (int i = 0; i < NUM_PEERS; i++)
	    simulators[i].stop();
    }

    private void createNAKACKSimulator(Simulator[] simulators, View view, Address[] addresses, NAKACK[] nakacklayers, Protocol[][] stacks, int i) {
		
	// create the simulator instance
	simulators[i] = new Simulator();
	simulators[i].setLocalAddress(addresses[i]);
	simulators[i].setView(view);

	// set up the protocol under test
	nakacklayers[i] = new NAKACK();
		
	// set up its properties
	Properties props=new Properties();
	props.setProperty("use_mcast_xmit", "true");
	props.setProperty("exponential_backoff", "150");
	props.setProperty("retransmit_timeout", "50,300,600,1200");
		
	nakacklayers[i].setProperties(props);
		
	// our protocol stack under test consists of one protocol
	stacks[i] = new Protocol[]{nakacklayers[i]};
	simulators[i].setProtocolStack(stacks[i]);
    }
	
    /**
     * This test only involves one member!
     */
    public void testReceptionOfAllMessages() {

	// set up the receiver callbacks for each simulator
	Simulator.Receiver[] receivers = new Simulator.Receiver[NUM_PEERS] ;

	// set up the sender and the receiver callbacks, according to whether
	// the peer is a sender or a receiver
	for (int i = 0; i < NUM_PEERS; i++) {
	    
	    if (isSender[i]) 
		receivers[i] = new SenderPeer(simulators[i]) ;
	    else 
		receivers[i] = new ReceiverPeer(simulators[i]) ;
		
	    simulators[i].setReceiver(receivers[i]);
	    threads[i] = new MyNAKACKPeer(simulators[i], isSender[i]) ;
	}

	// start the NAKACK peers and let them exchange messages
	for (int i = 0; i < NUM_PEERS; i++) {
	    threads[i].start() ;
	}

	// wait for the receiver peer to signal that it has received messages, or timeout
	synchronized(all_msgs_recd) {
	    try {
		all_msgs_recd.wait(WAIT_TIMEOUT * 1000) ;
	    }
	    catch(InterruptedException e) {
		System.out.println("main thread interrupted") ;
	    }
	}

	// wait for the threads to terminate
	try {
	    for (int i = 0; i < NUM_PEERS; i++) {
		threads[i].join() ;
	    }
	}
	catch(InterruptedException e) {
	}

	// the test fails if:
	// - a seqno is received out of order (not FIFO), or 
	// - not all messages were received (not reliable) or
	// - not all messages are received in time allotted (allMsgsReceived)
	assertTrue("Incorrect number of messages received by the receiver thread", allMsgsReceived) ;
	assertFalse("Sequnece numbers for a peer not in correct order", notFIFO) ;
    }

    /**
     * This is called by the Simulator when a message comes back up the stack.
     * Used by message senders to simply display messages received from other peers.
     */
    class SenderPeer implements Simulator.Receiver {
	Simulator simulator = null ;
	int num_mgs_received=0;

	SenderPeer(Simulator s) {
	    this.simulator = s ;
	}
	
	// keep track of how many messages were received
	public void receive(Event evt) {
	    if(evt.getType() == Event.MSG) {
		num_mgs_received++;
		if(num_mgs_received % MSGS_PER_STATUS_LINE == 0)
		    System.out.println("<" + simulator.getLocalAddress() + ">:" + "<== " + num_mgs_received);
	    }
	}

	public int getNumberOfReceivedMessages() {
	    return num_mgs_received;
	}
    }

    /**
     * This is called by the Simulator when a message comes back up the stack.
     * This method should do the following:
     * - receive messages from senders 
     * - check that sequence numbers for each sender are in order (with no gaps)
     * - terminate when correct number of messages have been received
     */
    class ReceiverPeer implements Simulator.Receiver {
	Simulator simulator = null ;
	int num_mgs_received=0;
	long starting_seqno = 1 ;
        long last_seqno = starting_seqno;
        
        Hashtable<Address, Long> senders = new Hashtable<Address, Long>() ;
        Message msg ;
        Address sender ;
        Long s ;
        long received_seqno ;

	ReceiverPeer(Simulator s) {
	    this.simulator = s ;
	}
	
	public synchronized void receive(Event evt) {
				
	    if (evt.getType() == Event.MSG) {
				
		// keep track of seqno ordering of messages received
		msg=(Message)evt.getArg();
		sender=msg.getSrc();

		// get the expected next seqno for this sender
		s = (Long)senders.get(sender);
		if(s == null) {
		    s = new Long(starting_seqno);
		    senders.put(sender, s);
		}
		last_seqno = s.longValue();
					
		try {
		    s=(Long)msg.getObject();
		    received_seqno=s.longValue();

		    // System.out.println("sender = " + sender + " last = " + last_seqno + " received = " + received_seqno) ;

		    num_mgs_received++;
		    // System.out.println("num_msgs_recd = " + num_mgs_received) ;

		    // 1. check if sequence numbers are in sequence
		    if(received_seqno == last_seqno) {
			// correct - update with next expected seqno
			senders.put(sender, new Long(last_seqno+1));
		    }
		    else {
			// error, terminate test
			notFIFO = true ; 
			fail("FAIL: received msg #" + received_seqno + ", expected " + last_seqno);
		    }

		    Address address = simulator.getLocalAddress() ;

		    if(received_seqno % MSGS_PER_STATUS_LINE == 0 && received_seqno > 0)
			System.out.println("<" + address + ">:" + "PASS: received msg #" + received_seqno + " from " + sender);


		    // 2. condition to terminate the test - all messages received (whether in 
		    // correct order or not)
		    if(num_mgs_received >= NakackTest2.NUM_MSGS * (NUM_PEERS-1)) {
			
			// indicate that we have received the required number of messages
			// to differentiate between timeout and notifyAll cases on monitor
			allMsgsReceived = true ;

			// signal that all messages have been received - this will allow the receiver
			// thread to terminate normally
			synchronized(all_msgs_recd) {
			    all_msgs_recd.notifyAll() ;
			}
		    }   
		}
		catch(Exception ex) {
		    System.out.println(ex.toString()) ;
		    // log.error("NakackTest.CheckNoGaps.up()", ex);
		}
	    }	
	}

	public int getNumberOfReceivedMessages() {
	    return num_mgs_received;
	}
    }


    static class MyNAKACKPeer extends Thread {

	Simulator s = null ;
	boolean sender = false ;
        
	public MyNAKACKPeer(Simulator s, boolean sender) {
	    this.s = s ;
	    this.sender = sender ;
	}

	public void run() {

	    // senders send NUM_MSGS messages to all peers, beginning with seqno 1
	    if (sender) {

		Address address = s.getLocalAddress() ;

		// send a collection of dummy messages by mcast to the stack under test
		for(int i=1; i <= NUM_MSGS; i++) {

		    Message msg=new Message(null, address, new Long(i));
		    Event evt=new Event(Event.MSG, msg);

		    // call Simulator.send() to introduce the event into the stack under test
		    s.send(evt);

		    // status indicator
		    if(i % MSGS_PER_STATUS_LINE == 0)
			System.out.println("<" + address + ">:" + " ==> " + i);
		}	   
	    }

	    if (!sender) {
		// wait for the receiver callback to signal that it has received messages, or timeout
		// this just causes this thread to block until its receiver has finished 
		synchronized(all_msgs_recd) {
		    try {
			all_msgs_recd.wait(WAIT_TIMEOUT * 1000) ;
		    }
		    catch(InterruptedException e) {
			System.out.println("main thread interrupted") ;
		    }
		}
	    }
	}
    }


    public static Test suite() {
	return new TestSuite(NakackTest2.class);
    }

    public static void main(String[] args) {
	junit.textui.TestRunner.run(suite());
    }
}







