package com.netease.arctic.flink.table;

import com.netease.arctic.flink.read.FlinkKafkaConsumer;
import com.netease.arctic.flink.read.LogKafkaConsumer;
import com.netease.arctic.flink.table.descriptors.ArcticValidator;
import com.netease.arctic.flink.util.CompatibleFlinkPropertyUtil;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.RowKind;
import org.apache.iceberg.Schema;

/* loaded from: input_file:com/netease/arctic/flink/table/LogDynamicSource.class */
public class LogDynamicSource extends KafkaDynamicSource {
    private boolean tablePrimaryKeyExisted;
    private final Schema schema;
    private final ReadableConfig tableOptions;
    private final String consumerChangelogMode;
    private final boolean logRetractionEnable;
    private static final ChangelogMode ALL_KINDS = ChangelogMode.newBuilder().addContainedKind(RowKind.INSERT).addContainedKind(RowKind.UPDATE_BEFORE).addContainedKind(RowKind.UPDATE_AFTER).addContainedKind(RowKind.DELETE).build();

    /* renamed from: com.netease.arctic.flink.table.LogDynamicSource$1, reason: invalid class name */
    /* loaded from: input_file:com/netease/arctic/flink/table/LogDynamicSource$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$streaming$connectors$kafka$config$StartupMode = new int[StartupMode.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$streaming$connectors$kafka$config$StartupMode[StartupMode.EARLIEST.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$connectors$kafka$config$StartupMode[StartupMode.LATEST.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$connectors$kafka$config$StartupMode[StartupMode.GROUP_OFFSETS.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$connectors$kafka$config$StartupMode[StartupMode.SPECIFIC_OFFSETS.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$connectors$kafka$config$StartupMode[StartupMode.TIMESTAMP.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public LogDynamicSource(DataType dataType, @Nullable DecodingFormat<DeserializationSchema<RowData>> decodingFormat, DecodingFormat<DeserializationSchema<RowData>> decodingFormat2, int[] iArr, int[] iArr2, @Nullable String str, @Nullable List<String> list, @Nullable Pattern pattern, Properties properties, String str2, long j, boolean z, boolean z2, Schema schema, ReadableConfig readableConfig, String str3) {
        this(dataType, decodingFormat, decodingFormat2, iArr, iArr2, str, list, pattern, properties, toInternal(str2), new HashMap(), j, z, z2, schema, readableConfig, str3);
    }

    public static StartupMode toInternal(String str) {
        String lowerCase = str.toLowerCase();
        boolean z = -1;
        switch (lowerCase.hashCode()) {
            case -1109880953:
                if (lowerCase.equals(ArcticValidator.SCAN_STARTUP_MODE_LATEST)) {
                    z = false;
                    break;
                }
                break;
            case -809579181:
                if (lowerCase.equals(ArcticValidator.SCAN_STARTUP_MODE_EARLIEST)) {
                    z = true;
                    break;
                }
                break;
            case 55126294:
                if (lowerCase.equals(ArcticValidator.SCAN_STARTUP_MODE_TIMESTAMP)) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                return StartupMode.LATEST;
            case true:
                return StartupMode.EARLIEST;
            case true:
                return StartupMode.TIMESTAMP;
            default:
                throw new ValidationException(String.format("%s only support '%s', '%s'. But input is '%s'", ArcticValidator.SCAN_STARTUP_MODE, ArcticValidator.SCAN_STARTUP_MODE_LATEST, ArcticValidator.SCAN_STARTUP_MODE_EARLIEST, lowerCase));
        }
    }

    LogDynamicSource(DataType dataType, @Nullable DecodingFormat<DeserializationSchema<RowData>> decodingFormat, DecodingFormat<DeserializationSchema<RowData>> decodingFormat2, int[] iArr, int[] iArr2, @Nullable String str, @Nullable List<String> list, @Nullable Pattern pattern, Properties properties, StartupMode startupMode, Map<KafkaTopicPartition, Long> map, long j, boolean z, boolean z2, Schema schema, ReadableConfig readableConfig, String str2) {
        super(dataType, decodingFormat, decodingFormat2, iArr, iArr2, str, list, pattern, properties, startupMode, map, j, z, str2);
        this.tablePrimaryKeyExisted = false;
        this.tablePrimaryKeyExisted = z2;
        this.schema = schema;
        this.tableOptions = readableConfig;
        this.consumerChangelogMode = (String) readableConfig.get(ArcticValidator.ARCTIC_LOG_CONSUMER_CHANGELOG_MODE);
        this.logRetractionEnable = CompatibleFlinkPropertyUtil.propertyAsBoolean(readableConfig, ArcticValidator.ARCTIC_LOG_CONSISTENCY_GUARANTEE_ENABLE);
    }

    @Override // com.netease.arctic.flink.table.KafkaDynamicSource
    protected FlinkKafkaConsumer<RowData> createKafkaConsumer(DeserializationSchema<RowData> deserializationSchema, DeserializationSchema<RowData> deserializationSchema2, TypeInformation<RowData> typeInformation) {
        KafkaDeserializationSchemaWrapper kafkaDeserializationSchemaWrapper = new KafkaDeserializationSchemaWrapper(deserializationSchema2);
        Schema schema = this.schema;
        if (this.projectedFields != null) {
            List columns = this.schema.columns();
            IntStream stream = Arrays.stream(this.projectedFields);
            columns.getClass();
            schema = new Schema((List) stream.mapToObj(columns::get).collect(Collectors.toList()));
        }
        LogKafkaConsumer logKafkaConsumer = this.topics != null ? new LogKafkaConsumer(this.topics, (KafkaDeserializationSchemaWrapper<RowData>) kafkaDeserializationSchemaWrapper, this.properties, schema, this.tableOptions) : new LogKafkaConsumer(this.topicPattern, (KafkaDeserializationSchemaWrapper<RowData>) kafkaDeserializationSchemaWrapper, this.properties, schema, this.tableOptions);
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$streaming$connectors$kafka$config$StartupMode[this.startupMode.ordinal()]) {
            case 1:
                logKafkaConsumer.setStartFromEarliest();
                break;
            case 2:
                logKafkaConsumer.setStartFromLatest();
                break;
            case 3:
                logKafkaConsumer.setStartFromGroupOffsets();
                break;
            case 4:
                logKafkaConsumer.setStartFromSpecificOffsets(this.specificStartupOffsets);
                break;
            case 5:
                logKafkaConsumer.setStartFromTimestamp(this.startupTimestampMillis);
                break;
        }
        logKafkaConsumer.setCommitOffsetsOnCheckpoints(this.properties.getProperty("group.id") != null);
        if (this.watermarkStrategy != null) {
            logKafkaConsumer.assignTimestampsAndWatermarks(this.watermarkStrategy);
        }
        return logKafkaConsumer;
    }

    @Override // com.netease.arctic.flink.table.KafkaDynamicSource
    public ChangelogMode getChangelogMode() {
        String str = this.consumerChangelogMode;
        boolean z = -1;
        switch (str.hashCode()) {
            case -1932471681:
                if (str.equals(ArcticValidator.LOG_CONSUMER_CHANGELOG_MODE_APPEND_ONLY)) {
                    z = false;
                    break;
                }
                break;
            case -1527953357:
                if (str.equals(ArcticValidator.LOG_CONSUMER_CHANGELOG_MODE_ALL_KINDS)) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (this.logRetractionEnable) {
                    throw new IllegalArgumentException(String.format("Only %s is false when %s is %s", ArcticValidator.ARCTIC_LOG_CONSISTENCY_GUARANTEE_ENABLE.key(), ArcticValidator.ARCTIC_LOG_CONSUMER_CHANGELOG_MODE.key(), ArcticValidator.LOG_CONSUMER_CHANGELOG_MODE_APPEND_ONLY));
                }
                return ChangelogMode.insertOnly();
            case true:
                return ALL_KINDS;
            default:
                throw new UnsupportedOperationException(String.format("As of now, %s can't support this option %s.", ArcticValidator.ARCTIC_LOG_CONSUMER_CHANGELOG_MODE.key(), this.consumerChangelogMode));
        }
    }

    @Override // com.netease.arctic.flink.table.KafkaDynamicSource
    public DynamicTableSource copy() {
        return new LogDynamicSource(this.physicalDataType, this.keyDecodingFormat, this.valueDecodingFormat, this.keyProjection, this.valueProjection, this.keyPrefix, this.topics, this.topicPattern, this.properties, this.startupMode, this.specificStartupOffsets, this.startupTimestampMillis, this.upsertMode, this.tablePrimaryKeyExisted, this.schema, this.tableOptions, this.sourceName);
    }

    @Override // com.netease.arctic.flink.table.KafkaDynamicSource
    public String asSummaryString() {
        return "arctic";
    }
}
