/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.activemq.usecases;

import java.net.URI;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.management.ObjectInstance;
import javax.management.ObjectName;
import junit.framework.Test;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.DestinationView;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.MessageIdList;
import org.apache.activemq.util.Wait;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Assert;

public class NetworkBridgeRemoveInflightTest extends
        JmsMultipleBrokersTestSupport {

    // Protect against hanging test.
    private static final long MAX_TEST_TIME = 120000;

    private static final Log LOG = LogFactory
            .getLog(NetworkBridgeRemoveInflightTest.class);

    // Combo flag set to true/false by the test framework.
    public boolean persistentTestMessages = true;
    public boolean networkIsAlwaysSendSync = true;

    private Vector<Throwable> exceptions = new Vector<Throwable>();

    public static Test suite() {
        return suite(NetworkBridgeRemoveInflightTest.class);
    }

    @Override
    protected void setUp() throws Exception {
        //setAutoFail(true);
        //setMaxTestTime(MAX_TEST_TIME);
        super.setUp();
    }

    public void testFastAndSlowRemoteConsumers() throws Exception {
        final int NUM_MESSAGES = 100;
        final long TEST_MESSAGE_SIZE = 1024;

        final ActiveMQQueue FAST_SHARED_QUEUE = new ActiveMQQueue(
            NetworkBridgeRemoveInflightTest.class.getSimpleName()
                    + ".fast.shared?consumer.prefetchSize=1");

        final BrokerService localBroker = createBroker(new URI("broker:(tcp://localhost:0"
                + ")?brokerName=broker0&persistent=false&useJmx=true"));
        BrokerService remoteBroker = createBroker(new URI(
                "broker:(tcp://localhost:0"
                        + ")?brokerName=broker1&persistent=false&useJmx=true"));

        // Set a policy on the remote broker that limits the maximum size of the
        // slow shared queue.
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setOptimizedDispatch(true);
        policyEntry.setMemoryLimit(5 * TEST_MESSAGE_SIZE);
        PolicyMap policyMap = new PolicyMap();
        policyMap.put(FAST_SHARED_QUEUE, policyEntry);
        remoteBroker.setDestinationPolicy(policyMap);

        // Create an outbound bridge from the local broker to the remote broker.
        NetworkConnector nc = bridgeBrokers("broker0", "broker1");
        nc.setAlwaysSyncSend(networkIsAlwaysSendSync);
        nc.setPrefetchSize(15);

        startAllBrokers();
        waitForBridgeFormation();

        persistentDelivery = persistentTestMessages;
        sendMessages("broker0", FAST_SHARED_QUEUE, NUM_MESSAGES);

        BrokerItem brokerItem;
        brokerItem = brokers.get("broker1");
        Connection conn = brokerItem.createConnection();
        conn.start();
        Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        MessageConsumer messageConsumer = session.createConsumer(FAST_SHARED_QUEUE);
        // consume nothing so broker0 blocks up

        assertTrue("5 messages get moved", Wait.waitFor(new Wait.Condition() {
            @Override
            public boolean isSatisified() throws Exception {
                LOG.info("MessageCont: " + localBroker.getAdminView().getTotalMessageCount());
                return 95 == localBroker.getAdminView().getTotalMessageCount();
            }
        }));



        final ObjectName fastQ = localBroker.getAdminView().getQueues()[0];
        assertTrue("15 inflight", Wait.waitFor(new Wait.Condition() {
                    @Override
                    public boolean isSatisified() throws Exception {
                        Object inflight = localBroker.getManagementContext().getAttribute(fastQ, "InFlightCount");
            LOG.info(fastQ + ", Inflight: " + inflight);
                        return inflight.equals(15l);
                    }
                }));

        messageConsumer.close();

        assertTrue("15 inflight", Wait.waitFor(new Wait.Condition() {
                    @Override
                    public boolean isSatisified() throws Exception {
                        Object inflight = localBroker.getManagementContext().getAttribute(fastQ, "InFlightCount");
            LOG.info(fastQ + ", Inflight: " + inflight);
                        return inflight.equals(15l);
                    }
                }));


        // allow remote to accept by giving it more memory usage
        ObjectName removefastQ = remoteBroker.getAdminView().getQueues()[0];
        QueueViewMBean queueViewMBean = (QueueViewMBean) remoteBroker.getManagementContext().newProxyInstance(removefastQ, QueueViewMBean.class, false);
        queueViewMBean.setMemoryLimit(50 * TEST_MESSAGE_SIZE);


        assertTrue("0 inflight", Wait.waitFor(new Wait.Condition() {
                    @Override
                    public boolean isSatisified() throws Exception {
                        Object inflight = localBroker.getManagementContext().getAttribute(fastQ, "InFlightCount");
            LOG.info(fastQ + ", Inflight: " + inflight);
                        return inflight.equals(0l);
                    }
                }));


        assertTrue("no exceptions on the wait threads:" + exceptions,
                exceptions.isEmpty());

    }

    public void testFastAndSlowRemoteConsumersOnXDests() throws Exception {
        final int NUM_MESSAGES = 100;
        final long TEST_MESSAGE_SIZE = 1024;
        final int numDestinations = 500;
        final int prefetch = 15;

        final BrokerService localBroker = createBroker(new URI("broker:(tcp://localhost:0"
                + ")?brokerName=broker0&persistent=false&useJmx=true"));
        BrokerService remoteBroker = createBroker(new URI(
                "broker:(tcp://localhost:0"
                        + ")?brokerName=broker1&persistent=false&useJmx=true"));

        // Set a policy on the remote broker that limits the maximum size of the
        // slow shared queue.
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setOptimizedDispatch(true);
        policyEntry.setMemoryLimit(5 * TEST_MESSAGE_SIZE);
        PolicyMap policyMap = new PolicyMap();
        policyMap.put(new ActiveMQQueue("FA.>"), policyEntry);
        remoteBroker.setDestinationPolicy(policyMap);

        // Create an outbound bridge from the local broker to the remote broker.
        NetworkConnector nc = bridgeBrokers("broker0", "broker1");
        nc.setAlwaysSyncSend(networkIsAlwaysSendSync);
        nc.setPrefetchSize(prefetch);

        startAllBrokers();
        waitForBridgeFormation();

        for (int i=0;i< numDestinations; i++) {
             ActiveMQQueue FAST_SHARED_QUEUE = new ActiveMQQueue(
                     "FA." + i + "?consumer.prefetchSize=1");

            persistentDelivery = persistentTestMessages;
            sendMessages("broker0", FAST_SHARED_QUEUE, NUM_MESSAGES);

        }


        BrokerItem brokerItem = brokers.get("broker1");
        Connection conn = brokerItem.createConnection();
        conn.start();
        Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);

        for (int i=0;i< numDestinations; i++) {
                     ActiveMQQueue FAST_SHARED_QUEUE = new ActiveMQQueue(
                             "FA." + i + "?consumer.prefetchSize=1");

           MessageConsumer messageConsumer = session.createConsumer(FAST_SHARED_QUEUE);
        }

        assertTrue(prefetch + " inflight", Wait.waitFor(new Wait.Condition() {
                    @Override
                    public boolean isSatisified() throws Exception {
                        if (false)
                        for (ObjectName fastQ : localBroker.getAdminView().getQueues()) {
                            Object inflight = localBroker.getManagementContext().getAttribute(fastQ, "InFlightCount");
                            LOG.info(fastQ + ", Inflight: " + inflight);
                            if (!inflight.equals(prefetch)) {
                                return false;
                            }
                        }
                        return true;
                    }
                }));

        for (Connection connection : brokerItem.connections) {
            connection.close();
        }

        assertTrue("15 inflight", Wait.waitFor(new Wait.Condition() {
                    @Override
                    public boolean isSatisified() throws Exception {

                        if (false)
                        for (ObjectName fastQ : localBroker.getAdminView().getQueues()) {
                            Object inflight = localBroker.getManagementContext().getAttribute(fastQ, "InFlightCount");
                            LOG.info(fastQ + ", Inflight: " + inflight);
                            if (!inflight.equals(prefetch)) {
                                return false;
                            }
                        }
                        return true;
                    }
                }));



        // allow remote to accept by giving it more memory usage
        for (ObjectName removefastQ : remoteBroker.getAdminView().getQueues()) {

            QueueViewMBean queueViewMBean = (QueueViewMBean) remoteBroker.getManagementContext().newProxyInstance(removefastQ, QueueViewMBean.class, false);
            queueViewMBean.setMemoryLimit(50 * TEST_MESSAGE_SIZE);
        }

        assertTrue("0 inflight", Wait.waitFor(new Wait.Condition() {
                            @Override
                            public boolean isSatisified() throws Exception {

                                for (ObjectName fastQ : localBroker.getAdminView().getQueues()) {
                                    Object inflight = localBroker.getManagementContext().getAttribute(fastQ, "InFlightCount");
                                    LOG.info(fastQ + ", 0? Inflight: " + inflight);
                                    if (!inflight.equals(0l)) {
                                        LOG.info("QS:"  + localBroker.getNetworkConnectors().get(0).activeBridges().toArray()[0]);
                                        return false;
                                    }
                                }
                                return true;
                            }
                        }, TimeUnit.MINUTES.toMillis(5)));


        assertTrue("no exceptions on the wait threads:" + exceptions,
                exceptions.isEmpty());

    }
}