package com.ververica.cdc.connectors.mongodb.table;

import com.ververica.cdc.connectors.mongodb.MongoDBSource;
import com.ververica.cdc.connectors.utils.AssertUtils;
import com.ververica.cdc.debezium.utils.ResolvedSchemaUtils;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
import org.apache.flink.util.ExceptionUtils;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:com/ververica/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.class */
public class MongoDBTableFactoryTest {
    private static final String MY_HOSTS = "localhost:27017,localhost:27018";
    private static final String USER = "flinkuser";
    private static final String PASSWORD = "flinkpw";
    private static final String MY_DATABASE = "myDB";
    private static final String MY_TABLE = "myTable";
    private static final String ERROR_TOLERANCE = "none";
    private static final ResolvedSchema SCHEMA = new ResolvedSchema(Arrays.asList(Column.physical("_id", DataTypes.STRING().notNull()), Column.physical("bbb", DataTypes.STRING().notNull()), Column.physical("ccc", DataTypes.DOUBLE()), Column.physical("ddd", DataTypes.DECIMAL(31, 18)), Column.physical("eee", DataTypes.TIMESTAMP(3))), Collections.emptyList(), UniqueConstraint.primaryKey("pk", Arrays.asList("_id")));
    private static final ResolvedSchema SCHEMA_WITH_METADATA = new ResolvedSchema(Arrays.asList(Column.physical("_id", DataTypes.STRING().notNull()), Column.physical("bbb", DataTypes.STRING().notNull()), Column.physical("ccc", DataTypes.DOUBLE()), Column.physical("ddd", DataTypes.DECIMAL(31, 18)), Column.physical("eee", DataTypes.TIMESTAMP(3)), Column.metadata("time", DataTypes.TIMESTAMP_LTZ(3), "op_ts", true), Column.metadata("_database_name", DataTypes.STRING(), "database_name", true)), Collections.emptyList(), UniqueConstraint.primaryKey("pk", Collections.singletonList("_id")));
    private static final Boolean ERROR_LOGS_ENABLE = true;
    private static final Boolean COPY_EXISTING = true;
    private static final ZoneId LOCAL_TIME_ZONE = ZoneId.systemDefault();

    @Test
    public void testCommonProperties() {
        Assert.assertEquals(new MongoDBTableSource(SCHEMA, MY_HOSTS, USER, PASSWORD, MY_DATABASE, MY_TABLE, (String) null, ERROR_TOLERANCE, ERROR_LOGS_ENABLE, COPY_EXISTING, (String) null, (Integer) null, (Integer) null, 1000, 1500, (Integer) null, LOCAL_TIME_ZONE), createTableSource(SCHEMA, getAllOptions()));
    }

    @Test
    public void testOptionalProperties() {
        Map<String, String> allOptions = getAllOptions();
        allOptions.put("connection.options", "replicaSet=test&connectTimeoutMS=300000");
        allOptions.put("errors.tolerance", "all");
        allOptions.put("errors.log.enable", "false");
        allOptions.put("copy.existing", "false");
        allOptions.put("copy.existing.pipeline", "[ { \"$match\": { \"closed\": \"false\" } } ]");
        allOptions.put("copy.existing.max.threads", "1");
        allOptions.put("copy.existing.queue.size", "101");
        allOptions.put("poll.max.batch.size", "102");
        allOptions.put("poll.await.time.ms", "103");
        allOptions.put("heartbeat.interval.ms", "104");
        Assert.assertEquals(new MongoDBTableSource(SCHEMA, MY_HOSTS, USER, PASSWORD, MY_DATABASE, MY_TABLE, "replicaSet=test&connectTimeoutMS=300000", MongoDBSource.ERROR_TOLERANCE_ALL, false, false, "[ { \"$match\": { \"closed\": \"false\" } } ]", 1, 101, 102, 103, 104, LOCAL_TIME_ZONE), createTableSource(SCHEMA, allOptions));
    }

    @Test
    public void testMetadataColumns() {
        MongoDBTableSource createTableSource = createTableSource(SCHEMA_WITH_METADATA, getAllOptions());
        createTableSource.applyReadableMetadata(Arrays.asList("op_ts", "database_name"), SCHEMA_WITH_METADATA.toSourceRowDataType());
        DynamicTableSource copy = createTableSource.copy();
        MongoDBTableSource mongoDBTableSource = new MongoDBTableSource(ResolvedSchemaUtils.getPhysicalSchema(SCHEMA_WITH_METADATA), MY_HOSTS, USER, PASSWORD, MY_DATABASE, MY_TABLE, (String) null, ERROR_TOLERANCE, ERROR_LOGS_ENABLE, COPY_EXISTING, (String) null, (Integer) null, (Integer) null, 1000, 1500, (Integer) null, LOCAL_TIME_ZONE);
        mongoDBTableSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
        mongoDBTableSource.metadataKeys = Arrays.asList("op_ts", "database_name");
        Assert.assertEquals(mongoDBTableSource, copy);
        AssertUtils.assertProducedTypeOfSourceFunction(createTableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE).createSourceFunction(), mongoDBTableSource.producedDataType);
    }

    @Test
    public void testValidation() {
        try {
            Map<String, String> allOptions = getAllOptions();
            allOptions.put("unknown", "abc");
            createTableSource(SCHEMA, allOptions);
            Assert.fail("exception expected");
        } catch (Throwable th) {
            Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(th, "Unsupported options:\n\nunknown").isPresent());
        }
    }

    private Map<String, String> getAllOptions() {
        HashMap hashMap = new HashMap();
        hashMap.put("connector", "mongodb-cdc");
        hashMap.put("hosts", MY_HOSTS);
        hashMap.put("username", USER);
        hashMap.put("password", PASSWORD);
        hashMap.put("database", MY_DATABASE);
        hashMap.put("collection", MY_TABLE);
        return hashMap;
    }

    private static DynamicTableSource createTableSource(ResolvedSchema resolvedSchema, Map<String, String> map) {
        return FactoryUtil.createTableSource((Catalog) null, ObjectIdentifier.of("default", "default", "t1"), new ResolvedCatalogTable(CatalogTable.of(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(), "mock source", new ArrayList(), map), resolvedSchema), new Configuration(), MongoDBTableFactoryTest.class.getClassLoader(), false);
    }
}
