/*
 * Copyright Debezium Authors.
 *
 * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
 */
package io.debezium.connector.db2;

import java.sql.SQLException;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import io.debezium.config.Configuration;
import io.debezium.connector.db2.util.TestHelper;
import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest;
import io.debezium.util.Testing;

/**
 * @author Chris Cranford
 */
public class DbTestSystemIT extends AbstractAsyncEngineConnectorTest {

    private Db2Connection connection;

    @Before
    public void before() throws SQLException {
        TestHelper.dropAllTables();

        connection = TestHelper.testConnection();
        connection.execute("DELETE FROM ASNCDC.IBMSNAP_REGISTER");
        connection.execute("CREATE TABLE customers (first_name varchar(30), last_name varchar(30), email varchar(255))");

        connection.execute("INSERT INTO customers values ('Sally', 'Thomas', 'sally.thomas@acme.com')");
        connection.execute("INSERT INTO customers values ('George', 'Bailey', 'gbailey@foobar.com')");
        connection.execute("INSERT INTO customers values ('Edward', 'Walker', 'ed@walker.com')");
        connection.execute("INSERT INTO customers values ('Anne', 'Kretchmar', 'annek@noanswer.org')");

        TestHelper.enableTableCdc(connection, "CUSTOMERS");

        initializeConnectorTestFramework();
        Testing.Files.delete(TestHelper.DB_HISTORY_PATH);
        Testing.Print.enable();
    }

    @After
    public void after() throws Exception {
        if (connection != null) {
            // TestHelper.disableDbCdc(connection);
            // TestHelper.disableTableCdc(connection, "CUSTOMERS");
            connection.close();
        }
    }

    @Test
    public void runSequencedTest() throws Exception {
        Configuration config = TestHelper.defaultConfig()
                .with(Db2ConnectorConfig.TABLE_INCLUDE_LIST, "DB2INST1.CUSTOMERS")
                .build();

        start(Db2Connector.class, config);
        assertConnectorIsRunning();

        consumeRecordsByTopic(4);

        TestHelper.enableDbCdc(connection);
        connection.execute("UPDATE ASNCDC.IBMSNAP_REGISTER SET STATE = 'A' WHERE SOURCE_OWNER = 'DB2INST1'");
        TestHelper.refreshAndWait(connection);

        insertCustomer("Tom", "Tester", "tom@test.com");

        consumeRecordsByTopic(1);

        updateCustomer("Tom", "Thomas");

        consumeRecordsByTopic(1);

        stopConnector();

        insertCustomer("Jerry", "Tester", "jerry@test.com");

        System.out.println("**START CONNECTOR**");

        // bombs
        start(Db2Connector.class, config);

        // Thread.sleep(10000);

        TestHelper.refreshAndWait(connection);

        insertCustomer("Nibbles", "Tester", "nibbles@test.com");
        consumeRecordsByTopic(1);
    }

    private void insertCustomer(String firstName, String lsatName, String email) throws Exception {
        connection.execute(String.format("INSERT INTO customers values ('%s', '%s', '%s')", firstName, lsatName, email));

    }

    private void updateCustomer(String oldName, String newName) throws Exception {
        connection.execute(String.format("UPDATE customers SET first_name = '%s' WHERE first_name = '%s'", newName, oldName));
    }
}
