import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.amqp.GlobalDiskFullTest;
import org.apache.activemq.transport.amqp.client.*;
import org.junit.Assert;
import org.junit.Test;

import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class AMQPBlockingTest extends GlobalDiskFullTest {

    @Override
    protected ActiveMQServer createServer(int port) throws Exception {
        ActiveMQServer server = this.createServer(true, true);
        server.getConfiguration().getAcceptorConfigurations().clear();
        server.getConfiguration().getAcceptorConfigurations().add(this.addAcceptorConfiguration(server, port));
        server.getConfiguration().setName("localhost");
        server.getConfiguration().setJournalDirectory(server.getConfiguration().getJournalDirectory() + port);
        server.getConfiguration().setBindingsDirectory(server.getConfiguration().getBindingsDirectory() + port);
        server.getConfiguration().setPagingDirectory(server.getConfiguration().getPagingDirectory() + port);
        server.getConfiguration().setMessageExpiryScanPeriod(5000L);
        AddressSettings addressSettings = new AddressSettings();
        addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
        server.getConfiguration().addAddressSetting("TEST",addressSettings);
        this.addAdditionalAcceptors(server);
        this.configureAddressPolicy(server);
        this.configureBrokerSecurity(server);
        this.addConfiguration(server);
        server.start();
        this.createAddressAndQueues(server);
        return server;
    }

    @Test
    public void testProducerOnDiskFull() throws Exception {
        FileStoreMonitor monitor = ((ActiveMQServerImpl)server).getMonitor().setMaxUsage(0.0);
        final CountDownLatch latch = new CountDownLatch(1);
        monitor.addCallback(new FileStoreMonitor.Callback() {

            @Override
            public void tick(long usableSpace, long totalSpace) {
            }

            @Override
            public void over(long usableSpace, long totalSpace) {
                latch.countDown();
            }
            @Override
            public void under(long usableSpace, long totalSpace) {
            }
        });

        Assert.assertTrue(latch.await(1, TimeUnit.MINUTES));

        AmqpClient client = createAmqpClient(new URI("tcp://localhost:" + AMQP_PORT));
        AmqpConnection connection = addConnection(client.connect());

        try {
            AmqpSession session = connection.createSession();
            AmqpSender sender = session.createSender("TEST");
            byte[] payload = new byte[1000];


            AmqpSender anonSender = session.createSender();

            CountDownLatch sentWithName = new CountDownLatch(1);
            CountDownLatch sentAnon = new CountDownLatch(1);

            Thread threadWithName = new Thread(() -> {
                try {
                    final AmqpMessage message = new AmqpMessage();
                    message.setBytes(payload);
                    sender.setSendTimeout(-1);
                    sender.send(message);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    sentWithName.countDown();
                }
            });

            threadWithName.start();


            Thread threadWithAnon = new Thread(() -> {
                try {
                    final AmqpMessage message = new AmqpMessage();
                    message.setBytes(payload);
                    anonSender.setSendTimeout(-1);
                    message.setAddress(getQueueName());
                    anonSender.send(message);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    sentAnon.countDown();
                }
            });

            threadWithAnon.start();

            Assert.assertFalse("Thread sender should be blocked", sentWithName.await(500, TimeUnit.MILLISECONDS));
            Assert.assertFalse("Thread sender anonymous should be blocked", sentAnon.await(500, TimeUnit.MILLISECONDS));
            monitor.setMaxUsage(100.0);

            Assert.assertTrue("Thread sender should be released", sentWithName.await(30, TimeUnit.SECONDS));
            Assert.assertTrue("Thread sender anonymous should be released", sentAnon.await(30, TimeUnit.SECONDS));

            threadWithName.join(TimeUnit.SECONDS.toMillis(30));
            threadWithAnon.join(TimeUnit.SECONDS.toMillis(30));
            Assert.assertFalse(threadWithName.isAlive());
            Assert.assertFalse(threadWithAnon.isAlive());
        } finally {
            connection.close();
        }
    }
}
