package com.ververica.cdc.connectors.mysql;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.Feature;
import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:com/ververica/cdc/connectors/mysql/LegacyMySqlSourceITCase.class */
public class LegacyMySqlSourceITCase extends LegacyMySqlTestBase {
    private final UniqueDatabase fullTypesDatabase = new UniqueDatabase(MYSQL_CONTAINER, "column_type_test", "mysqluser", "mysqlpw");

    @Test
    public void testConsumingAllEventsWithJsonFormatIncludeSchema() throws Exception {
        testConsumingAllEventsWithJsonFormat(true);
    }

    @Test
    public void testConsumingAllEventsWithJsonFormatExcludeSchema() throws Exception {
        testConsumingAllEventsWithJsonFormat(false);
    }

    @Test
    public void testConsumingAllEventsWithJsonFormatWithNumericDecimal() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("decimal.format", "numeric");
        testConsumingAllEventsWithJsonFormat(false, hashMap, "file/debezium-data-schema-exclude-with-numeric-decimal.json");
    }

    private void testConsumingAllEventsWithJsonFormat(Boolean bool, Map<String, Object> map, String str) throws Exception {
        this.fullTypesDatabase.createAndInitialize();
        DebeziumSourceFunction build = MySqlSource.builder().hostname(MYSQL_CONTAINER.getHost()).port(MYSQL_CONTAINER.getDatabasePort()).databaseList(new String[]{this.fullTypesDatabase.getDatabaseName()}).username(this.fullTypesDatabase.getUsername()).password(this.fullTypesDatabase.getPassword()).deserializer(map == null ? new JsonDebeziumDeserializationSchema(bool) : new JsonDebeziumDeserializationSchema(bool, map)).build();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.enableCheckpointing(1000L);
        StreamTableEnvironment create = StreamTableEnvironment.create(executionEnvironment, EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build());
        JSONObject jSONObject = (JSONObject) JSONObject.parseObject(readLines(str), JSONObject.class, new Feature[0]);
        JSONObject jSONObject2 = jSONObject.getJSONObject("expected_snapshot");
        create.createTemporaryView("full_types", executionEnvironment.addSource(build));
        TableResult executeSql = create.executeSql("SELECT * FROM full_types");
        CloseableIterator collect = executeSql.collect();
        waitForSnapshotStarted(collect);
        Assert.assertTrue(dataInJsonIsEquals(fetchRows(collect, 1).get(0).toString(), jSONObject2.toString()));
        Connection jdbcConnection = this.fullTypesDatabase.getJdbcConnection();
        Throwable th = null;
        try {
            Statement createStatement = jdbcConnection.createStatement();
            Throwable th2 = null;
            try {
                try {
                    createStatement.execute("UPDATE full_types SET timestamp_c = '2020-07-17 18:33:22' WHERE id=1;");
                    if (createStatement != null) {
                        if (0 != 0) {
                            try {
                                createStatement.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            createStatement.close();
                        }
                    }
                    Assert.assertTrue(dataInJsonIsEquals(fetchRows(executeSql.collect(), 1).get(0).toString(), jSONObject.getJSONObject("expected_binlog").toString()));
                    ((JobClient) executeSql.getJobClient().get()).cancel().get();
                } finally {
                }
            } catch (Throwable th4) {
                if (createStatement != null) {
                    if (th2 != null) {
                        try {
                            createStatement.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        createStatement.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (jdbcConnection != null) {
                if (0 != 0) {
                    try {
                        jdbcConnection.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    jdbcConnection.close();
                }
            }
        }
    }

    private void testConsumingAllEventsWithJsonFormat(Boolean bool) throws Exception {
        testConsumingAllEventsWithJsonFormat(bool, null, bool.booleanValue() ? "file/debezium-data-schema-include.json" : "file/debezium-data-schema-exclude.json");
    }

    private static List<Object> fetchRows(Iterator<Row> it, int i) {
        ArrayList arrayList = new ArrayList(i);
        while (i > 0 && it.hasNext()) {
            arrayList.add(it.next().getField(0));
            i--;
        }
        return arrayList;
    }

    private static void waitForSnapshotStarted(CloseableIterator<Row> closeableIterator) throws Exception {
        while (!closeableIterator.hasNext()) {
            Thread.sleep(100L);
        }
    }

    private static byte[] readLines(String str) throws IOException, URISyntaxException {
        return Files.readAllBytes(Paths.get(((URL) Objects.requireNonNull(LegacyMySqlSourceITCase.class.getClassLoader().getResource(str))).toURI()));
    }

    private static boolean dataInJsonIsEquals(String str, String str2) {
        JSONObject parseObject = JSONObject.parseObject(str);
        JSONObject parseObject2 = JSONObject.parseObject(str2);
        if (parseObject2.getJSONObject("payload") != null && parseObject.getJSONObject("payload") != null) {
            parseObject2 = parseObject2.getJSONObject("payload");
            parseObject = parseObject.getJSONObject("payload");
        }
        return jsonObjectEquals(parseObject2.getJSONObject("after"), parseObject.getJSONObject("after")) && jsonObjectEquals(parseObject2.getJSONObject("before"), parseObject.getJSONObject("before")) && Objects.equals(parseObject2.get("op"), parseObject.get("op"));
    }

    private static boolean jsonObjectEquals(JSONObject jSONObject, JSONObject jSONObject2) {
        return jSONObject == jSONObject2 || (jSONObject != null && jSONObject.toString().equals(jSONObject2.toString()));
    }
}
