package com.netease.arctic.flink.table;

import com.netease.arctic.flink.FlinkSchemaUtil;
import com.netease.arctic.flink.InternalCatalogBuilder;
import com.netease.arctic.flink.catalog.ArcticCatalog;
import com.netease.arctic.flink.catalog.descriptors.ArcticCatalogValidator;
import com.netease.arctic.flink.read.FlinkKafkaConsumerBase;
import com.netease.arctic.flink.table.descriptors.ArcticValidator;
import com.netease.arctic.flink.util.ArcticUtils;
import com.netease.arctic.flink.util.CompatibleFlinkPropertyUtil;
import com.netease.arctic.table.ArcticTable;
import com.netease.arctic.table.TableIdentifier;
import com.netease.arctic.utils.CompatiblePropertyUtil;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.connectors.kafka.table.KafkaOptions;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
import org.apache.iceberg.Schema;
import org.apache.iceberg.util.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/netease/arctic/flink/table/DynamicTableFactory.class */
public class DynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
    private static final Logger LOG = LoggerFactory.getLogger(DynamicTableFactory.class);
    public static final String IDENTIFIER = "arctic";
    private ArcticCatalog arcticCatalog;
    private InternalCatalogBuilder internalCatalogBuilder;
    private String internalCatalogName;

    public DynamicTableFactory(ArcticCatalog arcticCatalog, InternalCatalogBuilder internalCatalogBuilder, String str) {
        this.arcticCatalog = arcticCatalog;
        this.internalCatalogBuilder = internalCatalogBuilder;
        this.internalCatalogName = str;
    }

    public DynamicTableFactory() {
    }

    public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context context) {
        SupportsWatermarkPushDown createLogSource;
        CatalogTable catalogTable = context.getCatalogTable();
        ObjectIdentifier objectIdentifier = context.getObjectIdentifier();
        Configuration options = FactoryUtil.createTableFactoryHelper(this, context).getOptions();
        InternalCatalogBuilder internalCatalogBuilder = this.internalCatalogBuilder;
        String str = this.internalCatalogName;
        if (internalCatalogBuilder == null || str == null) {
            String str2 = (String) options.get(ArcticCatalogValidator.METASTORE_URL_OPTION);
            Preconditions.checkNotNull(str2, String.format("%s should be set", ArcticCatalogValidator.METASTORE_URL));
            internalCatalogBuilder = InternalCatalogBuilder.builder().metastoreUrl(str2);
            str = (String) options.get(ArcticValidator.ARCTIC_CATALOG);
            Preconditions.checkNotNull(str, String.format("%s should be set", ArcticValidator.ARCTIC_CATALOG.key()));
        }
        ArcticTableLoader createTableLoader = createTableLoader((options.containsKey(ArcticValidator.ARCTIC_DATABASE.key()) && options.containsKey(ArcticValidator.ARCTIC_TABLE.key())) ? new ObjectPath((String) options.get(ArcticValidator.ARCTIC_DATABASE), (String) options.get(ArcticValidator.ARCTIC_TABLE)) : new ObjectPath(objectIdentifier.getDatabaseName(), objectIdentifier.getObjectName()), str, internalCatalogBuilder, options.toMap());
        ArcticTable loadArcticTable = ArcticUtils.loadArcticTable(createTableLoader);
        String propertyAsString = PropertyUtil.propertyAsString(loadArcticTable.properties(), ArcticValidator.ARCTIC_READ_MODE, "file");
        TableSchema physicalSchema = FlinkSchemaUtil.getPhysicalSchema(catalogTable.getSchema(), CompatibleFlinkPropertyUtil.propertyAsBoolean(loadArcticTable.properties(), ArcticValidator.DIM_TABLE_ENABLE.key(), ((Boolean) ArcticValidator.DIM_TABLE_ENABLE.defaultValue()).booleanValue()));
        boolean z = -1;
        switch (propertyAsString.hashCode()) {
            case 107332:
                if (propertyAsString.equals("log")) {
                    z = true;
                    break;
                }
                break;
            case 3143036:
                if (propertyAsString.equals("file")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                LOG.info("build file reader");
                createLogSource = new ArcticFileSource(createTableLoader, physicalSchema, loadArcticTable, options);
                break;
            case true:
            default:
                Preconditions.checkArgument(CompatiblePropertyUtil.propertyAsBoolean(loadArcticTable.properties(), "log-store.enabled", false), String.format("Read log should enable %s at first", "log-store.enabled"));
                createLogSource = createLogSource(loadArcticTable, context, options);
                break;
        }
        return new ArcticDynamicSource(objectIdentifier.getObjectName(), createLogSource, loadArcticTable, physicalSchema, loadArcticTable.properties());
    }

    /* renamed from: createDynamicTableSink, reason: merged with bridge method [inline-methods] */
    public ArcticDynamicSink m56createDynamicTableSink(DynamicTableFactory.Context context) {
        CatalogTable catalogTable = context.getCatalogTable();
        ObjectIdentifier objectIdentifier = context.getObjectIdentifier();
        Map options = catalogTable.getOptions();
        String str = (String) options.get("log-store.topic");
        ArcticTableLoader createTableLoader = createTableLoader(new ObjectPath(objectIdentifier.getDatabaseName(), objectIdentifier.getObjectName()), this.internalCatalogName, this.internalCatalogBuilder, options);
        return new ArcticDynamicSink(catalogTable, createTableLoader, str, ArcticUtils.loadArcticTable(createTableLoader).isKeyedTable());
    }

    private static ArcticTableLoader createTableLoader(ObjectPath objectPath, String str, InternalCatalogBuilder internalCatalogBuilder, Map<String, String> map) {
        return ArcticTableLoader.of(TableIdentifier.of(str, objectPath.getDatabaseName(), objectPath.getObjectName()), internalCatalogBuilder, map);
    }

    public String factoryIdentifier() {
        return "arctic";
    }

    public Set<ConfigOption<?>> requiredOptions() {
        return new HashSet();
    }

    public Set<ConfigOption<?>> optionalOptions() {
        HashSet hashSet = new HashSet();
        hashSet.add(KafkaOptions.TOPIC);
        hashSet.add(KafkaOptions.PROPS_BOOTSTRAP_SERVERS);
        hashSet.add(KafkaOptions.PROPS_GROUP_ID);
        hashSet.add(ArcticValidator.SCAN_STARTUP_MODE);
        hashSet.add(ArcticValidator.SCAN_STARTUP_TIMESTAMP_MILLIS);
        hashSet.add(KafkaOptions.SINK_PARTITIONER);
        hashSet.add(ArcticValidator.ARCTIC_CATALOG);
        hashSet.add(ArcticValidator.ARCTIC_TABLE);
        hashSet.add(ArcticValidator.ARCTIC_DATABASE);
        hashSet.add(ArcticValidator.DIM_TABLE_ENABLE);
        hashSet.add(ArcticCatalogValidator.METASTORE_URL_OPTION);
        return hashSet;
    }

    private static Optional<DecodingFormat<DeserializationSchema<RowData>>> getKeyDecodingFormat(FactoryUtil.TableFactoryHelper tableFactoryHelper) {
        Optional<DecodingFormat<DeserializationSchema<RowData>>> discoverOptionalDecodingFormat = tableFactoryHelper.discoverOptionalDecodingFormat(DeserializationFormatFactory.class, KafkaOptions.KEY_FORMAT);
        discoverOptionalDecodingFormat.ifPresent(decodingFormat -> {
            if (!decodingFormat.getChangelogMode().containsOnly(RowKind.INSERT)) {
                throw new ValidationException(String.format("A key format should only deal with INSERT-only records. But %s has a changelog mode of %s.", tableFactoryHelper.getOptions().get(KafkaOptions.KEY_FORMAT), decodingFormat.getChangelogMode()));
            }
        });
        return discoverOptionalDecodingFormat;
    }

    private static DecodingFormat<DeserializationSchema<RowData>> getValueDecodingFormat(FactoryUtil.TableFactoryHelper tableFactoryHelper) {
        return (DecodingFormat) tableFactoryHelper.discoverOptionalDecodingFormat(DeserializationFormatFactory.class, FactoryUtil.FORMAT).orElseGet(() -> {
            return tableFactoryHelper.discoverDecodingFormat(DeserializationFormatFactory.class, KafkaOptions.VALUE_FORMAT);
        });
    }

    private LogDynamicSource createLogSource(ArcticTable arcticTable, DynamicTableFactory.Context context, ReadableConfig readableConfig) {
        CatalogTable catalogTable = context.getCatalogTable();
        Schema convert = org.apache.iceberg.flink.FlinkSchemaUtil.convert(TableSchemaUtils.getPhysicalSchema(catalogTable.getSchema()));
        FactoryUtil.TableFactoryHelper createTableFactoryHelper = FactoryUtil.createTableFactoryHelper(this, context);
        Optional<DecodingFormat<DeserializationSchema<RowData>>> keyDecodingFormat = getKeyDecodingFormat(createTableFactoryHelper);
        DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat = getValueDecodingFormat(createTableFactoryHelper);
        KafkaOptions.validateSourceTopic(readableConfig);
        Properties kafkaProperties = KafkaOptions.getKafkaProperties(catalogTable.getOptions());
        kafkaProperties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, String.valueOf(readableConfig.getOptional(KafkaOptions.SCAN_TOPIC_PARTITION_DISCOVERY).map((v0) -> {
            return v0.toMillis();
        }).orElse(Long.MIN_VALUE)));
        DataType physicalRowDataType = catalogTable.getSchema().toPhysicalRowDataType();
        int[] createKeyFormatProjection = KafkaOptions.createKeyFormatProjection(readableConfig, physicalRowDataType);
        int[] createValueFormatProjection = KafkaOptions.createValueFormatProjection(readableConfig, physicalRowDataType);
        String str = (String) readableConfig.getOptional(KafkaOptions.KEY_FIELDS_PREFIX).orElse(null);
        String str2 = (String) readableConfig.get(ArcticValidator.SCAN_STARTUP_MODE);
        long j = 0;
        if (Objects.equals(str2.toLowerCase(), ArcticValidator.SCAN_STARTUP_MODE_TIMESTAMP)) {
            j = ((Long) Preconditions.checkNotNull(readableConfig.get(ArcticValidator.SCAN_STARTUP_TIMESTAMP_MILLIS), String.format("'%s' should be set in '%s' mode", ArcticValidator.SCAN_STARTUP_TIMESTAMP_MILLIS.key(), ArcticValidator.SCAN_STARTUP_MODE_TIMESTAMP))).longValue();
        }
        LOG.info("build log source");
        return new LogDynamicSource(physicalRowDataType, keyDecodingFormat.orElse(null), valueDecodingFormat, createKeyFormatProjection, createValueFormatProjection, str, KafkaOptions.getSourceTopics(readableConfig), KafkaOptions.getSourceTopicPattern(readableConfig), kafkaProperties, str2, j, false, arcticTable.isKeyedTable() && arcticTable.asKeyedTable().primaryKeySpec().primaryKeyExisted(), convert, readableConfig, arcticTable.name());
    }
}
