package org.jgroups.tests;

import org.jgroups.*;
import org.jgroups.blocks.*;
import org.jgroups.conf.ClassConfigurator;
import org.jgroups.util.RspList;
import org.jgroups.util.Util;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author Bela Ban
 * @version $Id$
 */
public class bla2 extends ReceiverAdapter {
    JChannel      c1;
    RpcDispatcher disp1, disp2, disp3;

    static final short ID=1500;



    public void start() throws ChannelException {
        ClassConfigurator.add(ID, ScopeHeader.class);


        c1=new JChannel("/home/bela/fast.xml");

        disp1=new MyRpcDispatcher((short)100, c1, null, null, new Server("foo1()"));
        disp2=new MyRpcDispatcher((short)0,   c1, null, null, new Server("foo2()"));
        disp3=new MyRpcDispatcher((short)150, c1, null, null, new Server("foo3()"));

        MyUpHandler handler=new MyUpHandler();
        c1.setUpHandler(handler);

        handler.put((short)100, disp1.getProtocolAdapter());
        handler.put((short)0,   disp2.getProtocolAdapter());
        handler.put((short)150, disp3.getProtocolAdapter());

        c1.connect("bla");
        while(true) {
            Util.keyPress("enter: ");
            RspList rsps=disp1.callRemoteMethods(null, "foo", null, null, RequestOptions.SYNC.setTimeout(0));
            System.out.println("rsps:\n" + rsps);

            rsps=disp2.callRemoteMethods(null, "foo", null, null, RequestOptions.SYNC);
            System.out.println("rsps:\n" + rsps);

            rsps=disp3.callRemoteMethods(null, "foo", null, null, RequestOptions.SYNC);
            System.out.println("rsps:\n" + rsps);
        }
    }



    public static void main(String[] args) throws ChannelException {
        new bla2().start();
    }


    /**
     * Subclass of RpcDispatcher with a scope. Different instances should have different scopes, or else dispatching to
     * the correct dispatcher won't work !<p/>
     * MyRpcDispatcher creates a special RequestCorrelator to be used by MessageDispatcher
     */
    static class MyRpcDispatcher extends RpcDispatcher {
        short scope;
        
        public MyRpcDispatcher() {
            super();
        }

        public MyRpcDispatcher(short scope, Channel channel, MessageListener l, MembershipListener l2, Object server_obj) {
            super(channel, l, l2, server_obj);
            this.scope=scope;
            if(corr instanceof MyRequestCorrelator)
                ((MyRequestCorrelator)corr).setScope(scope);
        }

        protected RequestCorrelator createRequestCorrelator(Object transport, RequestHandler handler, Address local_addr) {
            return new MyRequestCorrelator(scope, transport, handler, local_addr);
        }

    }

    /**
     * Subclass of RequestCorrelator which adds the scope to each request and response. The UpHandler (see below) uses
     * the scope to dispatch to the correct MessageDispatcher.
     */
    static class MyRequestCorrelator extends RequestCorrelator {
        short scope;

        public MyRequestCorrelator(short scope, Object transport, RequestHandler handler, Address local_addr) {
            super(ClassConfigurator.getProtocolId(RequestCorrelator.class), transport, handler, local_addr);
            this.scope=scope;
        }

        public short getScope() {
            return scope;
        }

        public void setScope(short scope) {
            this.scope=scope;
        }

        public void sendRequest(long id, Collection<Address> dest_mbrs, Message msg, RspCollector coll,
                                boolean use_anycasting) throws Exception {
            ScopeHeader hdr=new ScopeHeader(scope);
            msg.putHeader(ID, hdr);
            super.sendRequest(id, dest_mbrs, msg, coll, use_anycasting);
        }

        protected void prepareResponse(Message rsp) {
            ScopeHeader hdr=new ScopeHeader(scope);
            rsp.putHeader(ID, hdr);
            super.prepareResponse(rsp);
        }
    }

    /**
     * Instance of UpHandler which de-multiplexes to the correct MessageDispatcher, based on the scope.
     */
    static class MyUpHandler implements UpHandler {
        final Map<Short,UpHandler> handlers=new ConcurrentHashMap<Short,UpHandler>();

        public Object up(Event evt) {
            switch(evt.getType()) {
                case Event.MSG:

                    Message msg=(Message)evt.getArg();
                    ScopeHeader hdr=(ScopeHeader)msg.getHeader(ID);
                    short scope=hdr == null? 0 : hdr.scope;
                    UpHandler handler=handlers.get(scope);
                    if(handler == null)
                        throw new IllegalArgumentException("no up-handler found for scope=" + scope);
                    return handler.up(evt);

                case Event.VIEW_CHANGE:
                    for(UpHandler tmp: handlers.values())
                        tmp.up(evt);
                    return null;
            }
            UpHandler default_handler=handlers.get((short)0);
            if(default_handler == null)
                throw new IllegalArgumentException("no default handler found");
            return default_handler.up(evt);
        }

        public void put(short scope, UpHandler handler) {
            handlers.put(scope, handler);
        }

        public void remove(short scope) {
            handlers.remove(scope);
        }
    }


    public static class Server {
        final String name;

        public Server(String name) {
            this.name=name;
        }

        public int foo() {
            System.out.println(name);
            return (int)Util.random(100);
        }
    }

    public static class ScopeHeader extends Header {
        private short scope;

        public ScopeHeader() {}
        public ScopeHeader(short scope) {this.scope=scope;}

        public int size() {return Global.SHORT_SIZE;}
        public void writeTo(DataOutputStream out) throws IOException {
            out.writeShort(scope);
        }
        public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException {
            scope=in.readShort();
        }
    }
}
