package de.psi.pjf.wcc.sample.jmsclient;

import java.util.Properties;

/*
 * Copyright 2005-2014 Red Hat, Inc.
 * Red Hat licenses this file to you under the Apache License, version
 * 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
 * implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.naming.Context;
import javax.naming.InitialContext;

/**
 * A simple example that shows a JMS Topic clustered across two nodes of a cluster. Messages are sent on one
 * node and received by consumers on both nodes.
 */
public class ClusteredTopicTest
{

    private static final String DEFAULT_CONNECTION_FACTORY = "jms/RemoteConnectionFactory";
    private static final String TOPIC_JNDI_NAME = "jms/topic/ClusterStateTopic";
    private static int ROUNDS = 10;

    private boolean failure = false;
    private String[] args;

    public static void main( final String[] args )
    {
        new ClusteredTopicTest().run( args );
    }

    protected void run( final String[] args1 )
    {
        args = args1;
        // if we have a cluster of servers wait a while for the cluster to form properly
        if( args != null && args.length > 1 )
        {
            System.out.println( "****pausing to allow cluster to form****" );
            Thread.yield();
            try
            {
                Thread.sleep( 2000 );
            }
            catch( InterruptedException e )
            {
                // ignore
            }
        }
        try
        {
            if( !runExample() )
            {
                failure = true;
            }
            System.out.println( "example complete" );
        }
        catch( Throwable e )
        {
            failure = true;
            e.printStackTrace();
        }

        reportResultAndExit();
    }

    private boolean runExample() throws Exception
    {

        Connection connection0 = null;
        Connection connection1 = null;
        InitialContext ic0 = null;
        InitialContext ic1 = null;
        try
        {
            // Step 1. Get an initial context for looking up JNDI from server 0
            ic0 = getContext( 0 );

            // Step 2. Look-up the JMS Topic object from JNDI
            Topic topic = (Topic)ic0.lookup( TOPIC_JNDI_NAME );

            // Step 3. Look-up a JMS Connection Factory object from JNDI on server 0
            ConnectionFactory cf0 = (ConnectionFactory)ic0.lookup( DEFAULT_CONNECTION_FACTORY );

            // Step 4. Get an initial context for looking up JNDI from server 1
            ic1 = getContext( 1 );

            // Step 5. Look-up a JMS Connection Factory object from JNDI on server 1
            ConnectionFactory cf1 = (ConnectionFactory)ic1.lookup( DEFAULT_CONNECTION_FACTORY );

            // Step 6. We create a JMS Connection connection0 which is a connection to server 0
            connection0 = cf0.createConnection( "pjf", "pjf" );

            // Step 7. We create a JMS Connection connection1 which is a connection to server 1
            connection1 = cf1.createConnection( "pjf", "pjf" );

            // Step 8. We create a JMS Session on server 0
            Session session0 = connection0.createSession( false, Session.AUTO_ACKNOWLEDGE );

            // Step 9. We create a JMS Session on server 1
            Session session1 = connection1.createSession( false, Session.AUTO_ACKNOWLEDGE );

            // Step 10. We start the connections to ensure delivery occurs on them
            connection0.start();
            connection1.start();

            // Step 11. We create JMS MessageConsumer objects on server 0 and server 1
            MessageConsumer consumer0 = session0.createConsumer( topic );
            MessageConsumer consumer1 = session1.createConsumer( topic );
            Thread.sleep( 1000 );

            // Step 12. We create a JMS MessageProducer object on server 0
            MessageProducer producer = session0.createProducer( topic );

            // Step 13. We send some messages to server 0
            final int numMessages = ROUNDS;
            for( int i = 0; i < numMessages; i++ )
            {
                TextMessage message = session0.createTextMessage( "This is text message " + i );
                producer.send( message );
                System.out.println( "Sent message: " + message.getText() );
            }
            // Step 14. We now consume those messages on *both* server 0 and server 1.
            // We note that all messages have been consumed by *both* consumers.
            // JMS Topics implement *publish-subscribe* messaging where all consumers get a copy of all
            // messages
            for( int i = 0; i < numMessages; i++ )
            {
                TextMessage message0 = (TextMessage)consumer0.receive( 5000 );
                if( message0 != null )
                {
                    System.out.println( i + " Recieved message: " + message0.getText() + " from node 0 from "
                        + TOPIC_JNDI_NAME );
                }
                else
                {
                    System.out.println( i + " error receiving message from node 0" );
                }
            }
            for( int i = 0; i < numMessages; i++ )
            {
                TextMessage message1 = (TextMessage)consumer1.receive( 5000 );
                if( message1 != null )
                {
                    System.err.println( i + " Recieved message: " + message1.getText() + " from node 1 from "
                        + TOPIC_JNDI_NAME );
                }
                else
                {
                    System.out.println( i + " error receiving message from node 1" );
                }
            }
            return true;
        }
        finally
        {
            // Step 15. Be sure to close our JMS resources!
            if( connection0 != null )
            {
                connection0.close();
            }
            if( connection1 != null )
            {
                connection1.close();
            }
            if( ic0 != null )
            {
                ic0.close();
            }
            if( ic1 != null )
            {
                ic1.close();
            }
        }
    }

    private InitialContext getContext( final int serverId ) throws Exception
    {
        AbstractHornetQTest.log.info( "using " + args[ serverId ] + " for jndi" );
        Properties props = new Properties();

        props.put( Context.INITIAL_CONTEXT_FACTORY, "org.jboss.naming.remote.client.InitialContextFactory" );
        props.put( Context.PROVIDER_URL, "http-remoting://" + args[ serverId ] + ":8080" );
        props.put( Context.SECURITY_PRINCIPAL, "pjf" );
        props.put( Context.SECURITY_CREDENTIALS, "pjf" );
        props.put( "jboss.naming.client.connect.options.org.xnio.Options.SASL_POLICY_NOPLAINTEXT", "false" );
        props.put( "jboss.naming.client.connect.options.org.xnio.Options.SASL_POLICY_NOANONYMOUS", "false" );
        props.put( "jboss.naming.client.connect.options.org.xnio.Options.SASL_DISALLOWED_MECHANISMS",
            "JBOSS-LOCAL-USER" );
        props.put( "jboss.naming.client.connectionprovider.create.options.org.xnio.Options.SSL_ENABLED",
            "false" );

        return new InitialContext( props );
    }

    private void reportResultAndExit()
    {
        if( failure )
        {
            System.err.println();
            System.err.println( "#####################" );
            System.err.println( "### FAILURE! ###" );
            System.err.println( "#####################" );
            throw new RuntimeException( "failure in running example" );
        }
        else
        {
            System.out.println();
            System.out.println( "#####################" );
            System.out.println( "### SUCCESS! ###" );
            System.out.println( "#####################" );
        }
    }

}
