/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.usecases;

import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.QueueConnection;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NetworkOfXBrokersWithNDestsTest extends JmsMultipleBrokersTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(NetworkOfXBrokersWithNDestsTest.class);
    StringBuilder brokersUrl = new StringBuilder();
    final int portBase = 61600;
    final int numBrokers = 2;
    final int numConsumers = 30;
    final int numDests = 300;

    final int numMessages = 1000;
    final int compositeProducerBatch = 10;
    private ArrayList<Throwable> exceptions = new ArrayList<Throwable>();

    protected void buildUrlList() throws Exception {
        for (int i = 0; i < numBrokers; i++) {
            brokersUrl.append("tcp://localhost:" + (portBase + i));
            if (i != numBrokers - 1) {
                brokersUrl.append(',');
            }
        }
    }

    protected BrokerService createBroker(int brokerid) throws Exception {
        BrokerService broker = new BrokerService();
        broker.setPersistent(true);
        broker.setDeleteAllMessagesOnStartup(true);
        broker.setUseJmx(true);
        broker.setBrokerName("B" + brokerid);
        broker.addConnector(new URI("nio://localhost:" + (portBase + brokerid)));

        addNetworConnector(broker);
        broker.setSchedulePeriodForDestinationPurge(0);
        broker.getSystemUsage().setSendFailIfNoSpace(true);
        broker.getSystemUsage().getMemoryUsage().setUsage(2 * 1024 * 1024 * 1024);


        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setMemoryLimit(5 * 1024 * 1024);
        policyEntry.setExpireMessagesPeriod(0);
        policyEntry.setQueuePrefetch(100);
        policyEntry.setOptimizedDispatch(true);
        policyEntry.setProducerFlowControl(false);
        //policyEntry.setMemoryLimit(5 * 1024 * 1024);
        policyEntry.setEnableAudit(false);
        policyMap.put(new ActiveMQQueue("STORE.OUT.>"), policyEntry);
        broker.setDestinationPolicy(policyMap);

        brokers.put(broker.getBrokerName(), new BrokerItem(broker));

        return broker;
    }

    private void addNetworConnector(BrokerService broker) throws Exception {
        StringBuilder networkConnectorUrl = new StringBuilder("static:(").append(brokersUrl.toString());
        networkConnectorUrl.append(')');

        for (int i = 0; i < 2; i++) {
            NetworkConnector nc = new DiscoveryNetworkConnector(new URI(networkConnectorUrl.toString()));
            nc.setName("Bridge-" + i);
            nc.setNetworkTTL(1);
            nc.setDecreaseNetworkConsumerPriority(true);
            nc.setDynamicOnly(true);
            //nc.setDispatchAsync(false);
            nc.setDestinationFilter("Queue.S.OUT.>,ActiveMQ.Advisory.Consumer.Queue.S.OUT.>");
            //nc.setDynamicallyIncludedDestinations(
              //      Arrays.asList(new ActiveMQDestination[]{new ActiveMQQueue("S.OUT.*")}));
            broker.addNetworkConnector(nc);
        }
    }

    class ConsumerState {
        AtomicInteger accumulator;
        String brokerName;
        QueueReceiver receiver;
        ActiveMQDestination destination;
    }

    public void testBrokers() throws Exception {

        buildUrlList();

        for (int i = 0; i < numBrokers; i++) {
            createBroker(i);
        }

        startAllBrokers();
        waitForBridgeFormation(numBrokers - 1);

        verifyPeerBrokerInfos(numBrokers - 1);

        // demand
        final List<ConsumerState> consumerStates = consume_all_brokers_all_dests(numConsumers, numDests);

        LOG.info("Waiting for percolation of consumers..");
        TimeUnit.SECONDS.sleep(5);

        LOG.info("Produce mesages..");

        // produce
        produce(numMessages, numDests);

        assertTrue("Got all sent", Wait.waitFor(new Wait.Condition() {
            @Override
            public boolean isSatisified() throws Exception {
                for (ConsumerState tally : consumerStates) {
                    final int expected = numMessages * (tally.destination.isComposite() ? tally.destination.getCompositeDestinations().length : 1);
                    LOG.info("Tally for: " + tally.brokerName + ", dest: " + tally.destination + " - " + tally.accumulator.get());
                    if (tally.accumulator.get() != expected) {
                        LOG.info("Tally for: " + tally.brokerName + ", dest: " + tally.destination + " - " + tally.accumulator.get() + " != " + expected );
                        return false;
                    }
                    LOG.info("got tally on " + tally.brokerName);
                }
                return true;
            }
        }, 1000 * 60 * 1000l));

        assertTrue("No exceptions:" + exceptions, exceptions.isEmpty());

        LOG.info("got all messages. closing consumers");
        //for (ConsumerState state: receivedCounts) {
        //    state.receiver.close();
        //}
        //System.err.println("WAit for ...");
        //System.in.read();
        LOG.info("done");
    }

    HashMap<ActiveMQQueue, AtomicInteger> accumulators = new HashMap<ActiveMQQueue, AtomicInteger>();
    private List<ConsumerState> consume_random_distribution_all_dests(final int numConsumers, int numDestinations) throws Exception {
        List<ConsumerState> consumerStates = new LinkedList<ConsumerState>();

        int batchSize = numDestinations > numConsumers ? numDestinations / numConsumers : 1;
        int remainder = numDestinations > numConsumers ? numDestinations % numConsumers : 0;
        LOG.info("Consumer CompositeDest batch size:" + batchSize);
        int batchStart = 0;
        for (int i = 0; i < numConsumers; i++, batchStart += batchSize) {
            final ConsumerState consumerState = new ConsumerState();
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(" + brokersUrl + ")");
            connectionFactory.setWatchTopicAdvisories(false);
            QueueConnection queueConnection = connectionFactory.createQueueConnection();
            queueConnection.start();

            QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

            if (batchStart >= numDests) {
                batchStart = 0;
            }
            int end = batchStart + batchSize;
            // tag any remainder onto the last consumer
            if (i == numConsumers - 1) {
                end += remainder;
            }

            StringBuffer compositeDest = new StringBuffer();
            for (int k = batchStart; k < end && k < numDestinations; k++) {
                compositeDest.append("S.OUT." + k);
                if (k + 1 != end) {
                    compositeDest.append(',');
                }
            }
            ActiveMQQueue compositeQ = new ActiveMQQueue(compositeDest.toString());
            QueueReceiver receiver = queueSession.createReceiver(compositeQ);
            consumerState.brokerName = ((ActiveMQConnection) queueConnection).getBrokerName();
            consumerState.receiver = receiver;
            consumerState.destination = compositeQ;
            if (!accumulators.containsKey(compositeQ)) {
                accumulators.put(compositeQ, new AtomicInteger(0));
            }
            consumerState.accumulator = accumulators.get(compositeQ);
            consumerStates.add(consumerState);
            receiver.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    consumerState.accumulator.incrementAndGet();
                }
            });
            LOG.info("Consumer: " + i + " to " + consumerState.brokerName + ", on:" + compositeQ);
        }
        return consumerStates;
    }

    private List<ConsumerState> consume_all_brokers_all_dests(final int numConsumers, int numDestinations) throws Exception {
        List<ConsumerState> consumerStates = new LinkedList<ConsumerState>();

        int consumersPerBroker = numConsumers > numBrokers ? numConsumers/numBrokers : 1;
        int batchSize = numDestinations > consumersPerBroker ? numDestinations / consumersPerBroker : 1;
        int remainder = numDestinations > consumersPerBroker ? numDestinations % consumersPerBroker : 0;


        LOG.info("Consumer CompositeDest batch size:" + batchSize);
        for (int id=0; id<numBrokers; id++) {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (portBase + id) + ")");
            connectionFactory.setWatchTopicAdvisories(false);
            QueueConnection queueConnection = connectionFactory.createQueueConnection();
            queueConnection.start();

            QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

            int batchStart = 0;
            for (int i = 0; i < consumersPerBroker; i++, batchStart += batchSize) {
                final ConsumerState consumerState = new ConsumerState();

                if (batchStart >= numDestinations) {
                    batchStart = 0;
                }
                int end = batchStart + batchSize;

                // tag any remainder onto the last consumer
                if (i == consumersPerBroker - 1) {
                    end += remainder;
                }

                StringBuffer compositeDest = new StringBuffer();
                for (int k = batchStart; k < end && k < numDestinations; k++) {
                    compositeDest.append("S.OUT." + k);
                    if (k + 1 != end) {
                        compositeDest.append(',');
                    }
                }
            ActiveMQQueue compositeQ = new ActiveMQQueue(compositeDest.toString());
            QueueReceiver receiver = queueSession.createReceiver(compositeQ);
            consumerState.brokerName = ((ActiveMQConnection) queueConnection).getBrokerName();
            consumerState.receiver = receiver;
            consumerState.destination = compositeQ;
            if (!accumulators.containsKey(compositeQ)) {
                accumulators.put(compositeQ, new AtomicInteger(0));
            }
            consumerState.accumulator = accumulators.get(compositeQ);
            consumerStates.add(consumerState);
            receiver.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    consumerState.accumulator.incrementAndGet();
                }
            });
            LOG.info("Consumer: " + i + " to " + consumerState.brokerName + ", on:" + compositeQ);
        }
        }
        return consumerStates;
    }

    static final String payload = new String(new byte[10*1024]);
    private void produce(int numMessages, final int numDests) throws Exception {
        ExecutorService executorService = Executors.newFixedThreadPool(numBrokers);
        final AtomicInteger toSend = new AtomicInteger(numMessages);
        for (int i = 0; i < numBrokers; i++) {
            final int id = i;
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (portBase + id) + ")");
                        connectionFactory.setWatchTopicAdvisories(false);
                        QueueConnection queueConnection = connectionFactory.createQueueConnection();
                        queueConnection.start();
                        QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
                        MessageProducer producer = queueSession.createProducer(null);
                        int val = 0;
                        while ((val = toSend.decrementAndGet()) >= 0) {
                            for (int j = 0; j < numDests; j += compositeProducerBatch) {
                                // batch the composite sends
                                StringBuffer compositeDest = new StringBuffer();
                                for (int k = j; k < j + compositeProducerBatch && k < numDests; k++) {
                                    compositeDest.append("S.OUT." + k);
                                    if (k + 1 != j + compositeProducerBatch && k + 1 != numDests) {
                                        compositeDest.append(',');
                                    }
                                }
                                ActiveMQQueue compositeQ = new ActiveMQQueue(compositeDest.toString());
                                LOG.info("Send to: " + ((ActiveMQConnection) queueConnection).getBrokerName() + ", "  + val + ", dest:" + compositeQ);
                                producer.send(compositeQ, queueSession.createTextMessage(((ActiveMQConnection) queueConnection).getBrokerName() + "->" + val + " payload:" + payload));
                            }
                        }
                        queueConnection.close();

                    } catch (Throwable throwable) {
                        throwable.printStackTrace();
                        exceptions.add(throwable);
                    }
                }
            });
        }
    }


    private void verifyPeerBrokerInfo(BrokerItem brokerItem, final int max) throws Exception {
        final BrokerService broker = brokerItem.broker;
        final RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
        Wait.waitFor(new Wait.Condition() {
            @Override
            public boolean isSatisified() throws Exception {
                LOG.info("verify infos " + broker.getBrokerName() + ", len: " + regionBroker.getPeerBrokerInfos().length);
                return max == regionBroker.getPeerBrokerInfos().length;
            }
        });
        LOG.info("verify infos " + broker.getBrokerName() + ", len: " + regionBroker.getPeerBrokerInfos().length);
        List<String> missing = new ArrayList<String>();
        for (int i = 0; i < max; i++) {
            missing.add("B" + i);
        }
        if (max != regionBroker.getPeerBrokerInfos().length) {
            for (BrokerInfo info : regionBroker.getPeerBrokerInfos()) {
                LOG.info(info.getBrokerName());
                missing.remove(info.getBrokerName());
            }
            LOG.info("Broker infos off.." + missing);
        }
        assertEquals(broker.getBrokerName(), max, regionBroker.getPeerBrokerInfos().length);
    }

    private void verifyPeerBrokerInfos(final int max) throws Exception {
        Collection<BrokerItem> brokerList = brokers.values();
        for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext(); ) {
            verifyPeerBrokerInfo(i.next(), max);
        }
    }

    @Override
    public void setUp() throws Exception {
        super.setAutoFail(false);
        super.setUp();
    }

    @Override
    public void tearDown() throws Exception {
        super.tearDown();
    }
}
