package com.ververica.cdc.connectors.oracle.table;

import com.ververica.cdc.connectors.oracle.utils.OracleTestUtils;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.stream.Stream;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.test.util.AbstractTestBase;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.OracleContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;

/* loaded from: input_file:com/ververica/cdc/connectors/oracle/table/OracleConnectorITCase.class */
public class OracleConnectorITCase extends AbstractTestBase {
    private static final Logger LOG = LoggerFactory.getLogger(OracleConnectorITCase.class);
    private OracleContainer oracleContainer = OracleTestUtils.ORACLE_CONTAINER.withLogConsumer(new Slf4jLogConsumer(LOG));
    private final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    private final StreamTableEnvironment tEnv = StreamTableEnvironment.create(this.env, EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build());

    @Before
    public void before() throws Exception {
        LOG.info("Starting containers...");
        Startables.deepStart(Stream.of(this.oracleContainer)).join();
        LOG.info("Containers are started.");
        TestValuesTableFactory.clearAllData();
        this.env.setParallelism(1);
    }

    @After
    public void teardown() {
        this.oracleContainer.stop();
    }

    @Test
    public void testConsumingAllEvents() throws SQLException, ExecutionException, InterruptedException {
        this.tEnv.executeSql(String.format("CREATE TABLE debezium_source ( ID INT NOT NULL, NAME STRING, DESCRIPTION STRING, WEIGHT DECIMAL(10,3)) WITH ( 'connector' = 'oracle-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = 'XE', 'schema-name' = '%s', 'table-name' = '%s')", this.oracleContainer.getHost(), this.oracleContainer.getOraclePort(), OracleTestUtils.CONNECTOR_USER, "dbz", OracleTestUtils.SCHEMA_USER, "products"));
        this.tEnv.executeSql("CREATE TABLE sink ( name STRING, weightSum DECIMAL(10,3), PRIMARY KEY (name) NOT ENFORCED) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false', 'sink-expected-messages-num' = '20')");
        TableResult executeSql = this.tEnv.executeSql("INSERT INTO sink SELECT NAME, SUM(WEIGHT) FROM debezium_source GROUP BY NAME");
        waitForSnapshotStarted("sink");
        Connection jdbcConnection = getJdbcConnection();
        Throwable th = null;
        try {
            Statement createStatement = jdbcConnection.createStatement();
            Throwable th2 = null;
            try {
                try {
                    createStatement.execute("UPDATE debezium.products SET DESCRIPTION='18oz carpenter hammer' WHERE ID=106");
                    createStatement.execute("UPDATE debezium.products SET WEIGHT=5.1 WHERE ID=107");
                    createStatement.execute("INSERT INTO debezium.products VALUES (111,'jacket','water resistent white wind breaker',0.2)");
                    createStatement.execute("INSERT INTO debezium.products VALUES (112,'scooter','Big 2-wheel scooter ',5.18)");
                    createStatement.execute("UPDATE debezium.products SET DESCRIPTION='new water resistent white wind breaker', WEIGHT=0.5 WHERE ID=111");
                    createStatement.execute("UPDATE debezium.products SET WEIGHT=5.17 WHERE ID=112");
                    createStatement.execute("DELETE FROM debezium.products WHERE ID=112");
                    if (createStatement != null) {
                        if (0 != 0) {
                            try {
                                createStatement.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            createStatement.close();
                        }
                    }
                    waitForSinkSize("sink", 20);
                    Assert.assertThat(TestValuesTableFactory.getResults("sink"), Matchers.containsInAnyOrder(new String[]{"+I[scooter, 3.140]", "+I[car battery, 8.100]", "+I[12-pack drill bits, 0.800]", "+I[hammer, 2.625]", "+I[rocks, 5.100]", "+I[jacket, 0.600]", "+I[spare tire, 22.200]"}));
                    ((JobClient) executeSql.getJobClient().get()).cancel().get();
                } finally {
                }
            } catch (Throwable th4) {
                if (createStatement != null) {
                    if (th2 != null) {
                        try {
                            createStatement.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        createStatement.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (jdbcConnection != null) {
                if (0 != 0) {
                    try {
                        jdbcConnection.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    jdbcConnection.close();
                }
            }
        }
    }

    @Test
    public void testMetadataColumns() throws Throwable {
        this.tEnv.executeSql(String.format("CREATE TABLE debezium_source ( DB_NAME STRING METADATA FROM 'database_name' VIRTUAL, SCHEMA_NAME STRING METADATA FROM 'schema_name' VIRTUAL, TABLE_NAME STRING METADATA  FROM 'table_name' VIRTUAL, ID INT NOT NULL, NAME STRING, DESCRIPTION STRING, WEIGHT DECIMAL(10,3)) WITH ( 'connector' = 'oracle-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = 'XE', 'schema-name' = '%s', 'table-name' = '%s')", this.oracleContainer.getHost(), this.oracleContainer.getOraclePort(), OracleTestUtils.CONNECTOR_USER, "dbz", OracleTestUtils.SCHEMA_USER, "products"));
        this.tEnv.executeSql("CREATE TABLE sink ( database_name STRING, schema_name STRING, table_name STRING, id INT, name STRING, description STRING, weight DECIMAL(10,3), PRIMARY KEY (id) NOT ENFORCED) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false', 'sink-expected-messages-num' = '20')");
        TableResult executeSql = this.tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source");
        waitForSnapshotStarted("sink");
        Connection jdbcConnection = getJdbcConnection();
        Throwable th = null;
        try {
            Statement createStatement = jdbcConnection.createStatement();
            Throwable th2 = null;
            try {
                try {
                    createStatement.execute("UPDATE debezium.products SET DESCRIPTION='18oz carpenter hammer' WHERE ID=106");
                    createStatement.execute("UPDATE debezium.products SET WEIGHT=5.1 WHERE ID=107");
                    createStatement.execute("INSERT INTO debezium.products VALUES (111,'jacket','water resistent white wind breaker',0.2)");
                    createStatement.execute("INSERT INTO debezium.products VALUES (112,'scooter','Big 2-wheel scooter ',5.18)");
                    createStatement.execute("UPDATE debezium.products SET DESCRIPTION='new water resistent white wind breaker', WEIGHT=0.5 WHERE ID=111");
                    createStatement.execute("UPDATE debezium.products SET WEIGHT=5.17 WHERE ID=112");
                    createStatement.execute("DELETE FROM debezium.products WHERE ID=112");
                    if (createStatement != null) {
                        if (0 != 0) {
                            try {
                                createStatement.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            createStatement.close();
                        }
                    }
                    waitForSinkSize("sink", 16);
                    List asList = Arrays.asList("+I[XE, DEBEZIUM, PRODUCTS, 101, scooter, Small 2-wheel scooter, 3.140]", "+I[XE, DEBEZIUM, PRODUCTS, 102, car battery, 12V car battery, 8.100]", "+I[XE, DEBEZIUM, PRODUCTS, 103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.800]", "+I[XE, DEBEZIUM, PRODUCTS, 104, hammer, 12oz carpenters hammer, 0.750]", "+I[XE, DEBEZIUM, PRODUCTS, 105, hammer, 14oz carpenters hammer, 0.875]", "+I[XE, DEBEZIUM, PRODUCTS, 106, hammer, 16oz carpenters hammer, 1.000]", "+I[XE, DEBEZIUM, PRODUCTS, 107, rocks, box of assorted rocks, 5.300]", "+I[XE, DEBEZIUM, PRODUCTS, 108, jacket, water resistent black wind breaker, 0.100]", "+I[XE, DEBEZIUM, PRODUCTS, 109, spare tire, 24 inch spare tire, 22.200]", "+I[XE, DEBEZIUM, PRODUCTS, 111, jacket, water resistent white wind breaker, 0.200]", "+I[XE, DEBEZIUM, PRODUCTS, 112, scooter, Big 2-wheel scooter , 5.180]", "+U[XE, DEBEZIUM, PRODUCTS, 106, hammer, 18oz carpenter hammer, 1.000]", "+U[XE, DEBEZIUM, PRODUCTS, 107, rocks, box of assorted rocks, 5.100]", "+U[XE, DEBEZIUM, PRODUCTS, 111, jacket, new water resistent white wind breaker, 0.500]", "+U[XE, DEBEZIUM, PRODUCTS, 112, scooter, Big 2-wheel scooter , 5.170]", "-D[XE, DEBEZIUM, PRODUCTS, 112, scooter, Big 2-wheel scooter , 5.170]");
                    List rawResults = TestValuesTableFactory.getRawResults("sink");
                    Collections.sort(rawResults);
                    Assert.assertEquals(asList, rawResults);
                    ((JobClient) executeSql.getJobClient().get()).cancel().get();
                } finally {
                }
            } catch (Throwable th4) {
                if (createStatement != null) {
                    if (th2 != null) {
                        try {
                            createStatement.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        createStatement.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (jdbcConnection != null) {
                if (0 != 0) {
                    try {
                        jdbcConnection.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    jdbcConnection.close();
                }
            }
        }
    }

    @Test
    public void testStartupFromLatestOffset() throws Exception {
        this.tEnv.executeSql(String.format("CREATE TABLE debezium_source ( ID INT NOT NULL, NAME STRING, DESCRIPTION STRING, WEIGHT DECIMAL(10,3)) WITH ( 'connector' = 'oracle-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = 'XE', 'schema-name' = '%s', 'table-name' = '%s' , 'scan.startup.mode' = 'latest-offset')", this.oracleContainer.getHost(), this.oracleContainer.getOraclePort(), OracleTestUtils.CONNECTOR_USER, "dbz", OracleTestUtils.SCHEMA_USER, "products"));
        this.tEnv.executeSql("CREATE TABLE sink  WITH ( 'connector' = 'values', 'sink-insert-only' = 'false') LIKE debezium_source (EXCLUDING OPTIONS)");
        TableResult executeSql = this.tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source");
        Thread.sleep(5000L);
        Connection jdbcConnection = getJdbcConnection();
        Throwable th = null;
        try {
            Statement createStatement = jdbcConnection.createStatement();
            Throwable th2 = null;
            try {
                try {
                    createStatement.execute("INSERT INTO debezium.products VALUES (110,'jacket','water resistent white wind breaker',0.2)");
                    createStatement.execute("INSERT INTO debezium.products VALUES (111,'scooter','Big 2-wheel scooter ',5.18)");
                    createStatement.execute("UPDATE debezium.products SET description='new water resistent white wind breaker', weight=0.5 WHERE id=110");
                    createStatement.execute("UPDATE debezium.products SET weight=5.17 WHERE id=111");
                    createStatement.execute("DELETE FROM debezium.products WHERE id=111");
                    if (createStatement != null) {
                        if (0 != 0) {
                            try {
                                createStatement.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            createStatement.close();
                        }
                    }
                    waitForSinkSize("sink", 7);
                    Assert.assertThat(TestValuesTableFactory.getResults("sink"), Matchers.containsInAnyOrder(new String[]{"+I[110, jacket, new water resistent white wind breaker, 0.500]"}));
                    ((JobClient) executeSql.getJobClient().get()).cancel().get();
                } finally {
                }
            } catch (Throwable th4) {
                if (createStatement != null) {
                    if (th2 != null) {
                        try {
                            createStatement.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        createStatement.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (jdbcConnection != null) {
                if (0 != 0) {
                    try {
                        jdbcConnection.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    jdbcConnection.close();
                }
            }
        }
    }

    @Test
    public void testConsumingNumericColumns() throws Exception {
        Connection testConnection = OracleTestUtils.testConnection(this.oracleContainer);
        Throwable th = null;
        try {
            Statement createStatement = testConnection.createStatement();
            Throwable th2 = null;
            try {
                try {
                    createStatement.execute("CREATE TABLE debezium.test_numeric_table ( ID NUMBER(18,0), TEST_BOOLEAN NUMBER(1,0), TEST_TINYINT NUMBER(2,0), TEST_SMALLINT NUMBER(4,0), TEST_INT NUMBER(9,0), TEST_BIG_NUMERIC NUMBER(32,0), TEST_DECIMAL NUMBER(20,8), TEST_NUMBER NUMBER, TEST_NUMERIC NUMBER, TEST_FLOAT FLOAT(63), PRIMARY KEY (ID))");
                    createStatement.execute("INSERT INTO debezium.test_numeric_table VALUES (11000000000, 0, 98, 9998, 987654320, 20000000000000000000, 987654321.12345678, 2147483647, 1024.955, 1024.955)");
                    createStatement.execute("INSERT INTO debezium.test_numeric_table VALUES (11000000001, 1, 99, 9999, 987654321, 20000000000000000001, 987654321.87654321, 2147483648, 1024.965, 1024.965)");
                    if (createStatement != null) {
                        if (0 != 0) {
                            try {
                                createStatement.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            createStatement.close();
                        }
                    }
                    this.tEnv.executeSql(String.format("CREATE TABLE test_numeric_table ( ID BIGINT, TEST_BOOLEAN BOOLEAN, TEST_TINYINT TINYINT, TEST_SMALLINT SMALLINT, TEST_INT INT, TEST_BIG_NUMERIC DECIMAL(32, 0), TEST_DECIMAL DECIMAL(20, 8), TEST_NUMBER BIGINT, TEST_NUMERIC DECIMAL(10, 3), TEST_FLOAT FLOAT, PRIMARY KEY (ID) NOT ENFORCED) WITH ( 'connector' = 'oracle-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = 'XE', 'schema-name' = '%s', 'table-name' = '%s')", this.oracleContainer.getHost(), this.oracleContainer.getOraclePort(), OracleTestUtils.CONNECTOR_USER, "dbz", OracleTestUtils.SCHEMA_USER, "test_numeric_table"));
                    this.tEnv.executeSql("CREATE TABLE test_numeric_sink ( id BIGINT, test_boolean BOOLEAN, test_tinyint TINYINT, test_smallint SMALLINT, test_int INT, test_big_numeric DECIMAL(32, 0), test_decimal DECIMAL(20, 8), test_number BIGINT, test_numeric DECIMAL(10, 3), test_float FLOAT, PRIMARY KEY (id) NOT ENFORCED) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false', 'sink-expected-messages-num' = '20')");
                    TableResult executeSql = this.tEnv.executeSql("INSERT INTO test_numeric_sink SELECT * FROM test_numeric_table");
                    waitForSnapshotStarted("test_numeric_sink");
                    waitForSinkSize("test_numeric_sink", 2);
                    List asList = Arrays.asList("+I[11000000000, false, 98, 9998, 987654320, 20000000000000000000, 987654321.12345678, 2147483647, 1024.955, 1024.955]", "+I[11000000001, true, 99, 9999, 987654321, 20000000000000000001, 987654321.87654321, 2147483648, 1024.965, 1024.965]");
                    List rawResults = TestValuesTableFactory.getRawResults("test_numeric_sink");
                    Collections.sort(rawResults);
                    Assert.assertEquals(asList, rawResults);
                    ((JobClient) executeSql.getJobClient().get()).cancel().get();
                } finally {
                }
            } catch (Throwable th4) {
                if (createStatement != null) {
                    if (th2 != null) {
                        try {
                            createStatement.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        createStatement.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (testConnection != null) {
                if (0 != 0) {
                    try {
                        testConnection.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    testConnection.close();
                }
            }
        }
    }

    private static void waitForSnapshotStarted(String str) throws InterruptedException {
        while (sinkSize(str) == 0) {
            Thread.sleep(100L);
        }
    }

    private static void waitForSinkSize(String str, int i) throws InterruptedException {
        while (sinkSize(str) < i) {
            Thread.sleep(100L);
        }
    }

    private static int sinkSize(String str) {
        int size;
        synchronized (TestValuesTableFactory.class) {
            try {
                size = TestValuesTableFactory.getRawResults(str).size();
            } catch (IllegalArgumentException e) {
                return 0;
            }
        }
        return size;
    }

    public Connection getJdbcConnection() throws SQLException {
        return DriverManager.getConnection(this.oracleContainer.getJdbcUrl(), OracleTestUtils.CONNECTOR_USER, "dbz");
    }
}
