package org.apache.flink.streaming.connectors.kafka.table;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaConsumerMetricConstants;
import org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer;
import org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.descriptors.KafkaValidator;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.util.Preconditions;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.class */
public class KafkaDynamicSource implements ScanTableSource, SupportsReadingMetadata, SupportsWatermarkPushDown {
    protected DataType producedDataType;
    protected List<String> metadataKeys = Collections.emptyList();

    @Nullable
    protected WatermarkStrategy<RowData> watermarkStrategy = null;
    private static final String VALUE_METADATA_PREFIX = "value.";
    protected final DataType physicalDataType;

    @Nullable
    protected final DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat;
    protected final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat;
    protected final int[] keyProjection;
    protected final int[] valueProjection;

    @Nullable
    protected final String keyPrefix;
    protected final List<String> topics;
    protected final Pattern topicPattern;
    protected final Properties properties;
    protected final StartupMode startupMode;
    protected final Map<KafkaTopicPartition, Long> specificStartupOffsets;
    protected final long startupTimestampMillis;
    protected final boolean upsertMode;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$streaming$connectors$kafka$config$StartupMode = new int[StartupMode.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$streaming$connectors$kafka$config$StartupMode[StartupMode.EARLIEST.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$connectors$kafka$config$StartupMode[StartupMode.LATEST.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$connectors$kafka$config$StartupMode[StartupMode.GROUP_OFFSETS.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$connectors$kafka$config$StartupMode[StartupMode.SPECIFIC_OFFSETS.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$connectors$kafka$config$StartupMode[StartupMode.TIMESTAMP.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource$ReadableMetadata.class */
    public enum ReadableMetadata {
        TOPIC(KafkaConsumerMetricConstants.OFFSETS_BY_TOPIC_METRICS_GROUP, DataTypes.STRING().notNull(), new DynamicKafkaDeserializationSchema.MetadataConverter() { // from class: org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.ReadableMetadata.1
            private static final long serialVersionUID = 1;

            @Override // org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.MetadataConverter
            public Object read(ConsumerRecord<?, ?> consumerRecord) {
                return StringData.fromString(consumerRecord.topic());
            }
        }),
        PARTITION("partition", DataTypes.INT().notNull(), new DynamicKafkaDeserializationSchema.MetadataConverter() { // from class: org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.ReadableMetadata.2
            private static final long serialVersionUID = 1;

            @Override // org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.MetadataConverter
            public Object read(ConsumerRecord<?, ?> consumerRecord) {
                return Integer.valueOf(consumerRecord.partition());
            }
        }),
        HEADERS("headers", DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.BYTES().nullable()).notNull(), new DynamicKafkaDeserializationSchema.MetadataConverter() { // from class: org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.ReadableMetadata.3
            private static final long serialVersionUID = 1;

            @Override // org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.MetadataConverter
            public Object read(ConsumerRecord<?, ?> consumerRecord) {
                HashMap hashMap = new HashMap();
                for (Header header : consumerRecord.headers()) {
                    hashMap.put(StringData.fromString(header.key()), header.value());
                }
                return new GenericMapData(hashMap);
            }
        }),
        LEADER_EPOCH("leader-epoch", DataTypes.INT().nullable(), new DynamicKafkaDeserializationSchema.MetadataConverter() { // from class: org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.ReadableMetadata.4
            private static final long serialVersionUID = 1;

            @Override // org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.MetadataConverter
            public Object read(ConsumerRecord<?, ?> consumerRecord) {
                return consumerRecord.leaderEpoch().orElse(null);
            }
        }),
        OFFSET(KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_OFFSET, DataTypes.BIGINT().notNull(), new DynamicKafkaDeserializationSchema.MetadataConverter() { // from class: org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.ReadableMetadata.5
            private static final long serialVersionUID = 1;

            @Override // org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.MetadataConverter
            public Object read(ConsumerRecord<?, ?> consumerRecord) {
                return Long.valueOf(consumerRecord.offset());
            }
        }),
        TIMESTAMP("timestamp", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(), new DynamicKafkaDeserializationSchema.MetadataConverter() { // from class: org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.ReadableMetadata.6
            private static final long serialVersionUID = 1;

            @Override // org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.MetadataConverter
            public Object read(ConsumerRecord<?, ?> consumerRecord) {
                return TimestampData.fromEpochMillis(consumerRecord.timestamp());
            }
        }),
        TIMESTAMP_TYPE("timestamp-type", DataTypes.STRING().notNull(), new DynamicKafkaDeserializationSchema.MetadataConverter() { // from class: org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.ReadableMetadata.7
            private static final long serialVersionUID = 1;

            @Override // org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.MetadataConverter
            public Object read(ConsumerRecord<?, ?> consumerRecord) {
                return StringData.fromString(consumerRecord.timestampType().toString());
            }
        });

        final String key;
        final DataType dataType;
        final DynamicKafkaDeserializationSchema.MetadataConverter converter;

        ReadableMetadata(String str, DataType dataType, DynamicKafkaDeserializationSchema.MetadataConverter metadataConverter) {
            this.key = str;
            this.dataType = dataType;
            this.converter = metadataConverter;
        }
    }

    public KafkaDynamicSource(DataType dataType, @Nullable DecodingFormat<DeserializationSchema<RowData>> decodingFormat, DecodingFormat<DeserializationSchema<RowData>> decodingFormat2, int[] iArr, int[] iArr2, @Nullable String str, @Nullable List<String> list, @Nullable Pattern pattern, Properties properties, StartupMode startupMode, Map<KafkaTopicPartition, Long> map, long j, boolean z) {
        this.physicalDataType = (DataType) Preconditions.checkNotNull(dataType, "Physical data type must not be null.");
        this.keyDecodingFormat = decodingFormat;
        this.valueDecodingFormat = (DecodingFormat) Preconditions.checkNotNull(decodingFormat2, "Value decoding format must not be null.");
        this.keyProjection = (int[]) Preconditions.checkNotNull(iArr, "Key projection must not be null.");
        this.valueProjection = (int[]) Preconditions.checkNotNull(iArr2, "Value projection must not be null.");
        this.keyPrefix = str;
        this.producedDataType = dataType;
        Preconditions.checkArgument((list != null && pattern == null) || (list == null && pattern != null), "Either Topic or Topic Pattern must be set for source.");
        this.topics = list;
        this.topicPattern = pattern;
        this.properties = (Properties) Preconditions.checkNotNull(properties, "Properties must not be null.");
        this.startupMode = (StartupMode) Preconditions.checkNotNull(startupMode, "Startup mode must not be null.");
        this.specificStartupOffsets = (Map) Preconditions.checkNotNull(map, "Specific offsets must not be null.");
        this.startupTimestampMillis = j;
        this.upsertMode = z;
    }

    public ChangelogMode getChangelogMode() {
        return this.valueDecodingFormat.getChangelogMode();
    }

    public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.ScanContext scanContext) {
        return SourceFunctionProvider.of(createKafkaConsumer(createDeserialization(scanContext, this.keyDecodingFormat, this.keyProjection, this.keyPrefix), createDeserialization(scanContext, this.valueDecodingFormat, this.valueProjection, null), scanContext.createTypeInformation(this.producedDataType)), false);
    }

    public Map<String, DataType> listReadableMetadata() {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        this.valueDecodingFormat.listReadableMetadata().forEach((str, dataType) -> {
        });
        Stream.of((Object[]) ReadableMetadata.values()).forEachOrdered(readableMetadata -> {
        });
        return linkedHashMap;
    }

    public void applyReadableMetadata(List<String> list, DataType dataType) {
        List list2 = (List) list.stream().filter(str -> {
            return str.startsWith(VALUE_METADATA_PREFIX);
        }).collect(Collectors.toList());
        ArrayList arrayList = new ArrayList(list);
        arrayList.removeAll(list2);
        if (this.valueDecodingFormat.listReadableMetadata().size() > 0) {
            this.valueDecodingFormat.applyReadableMetadata((List) list2.stream().map(str2 -> {
                return str2.substring(VALUE_METADATA_PREFIX.length());
            }).collect(Collectors.toList()));
        }
        this.metadataKeys = arrayList;
        this.producedDataType = dataType;
    }

    public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
        this.watermarkStrategy = watermarkStrategy;
    }

    public DynamicTableSource copy() {
        KafkaDynamicSource kafkaDynamicSource = new KafkaDynamicSource(this.physicalDataType, this.keyDecodingFormat, this.valueDecodingFormat, this.keyProjection, this.valueProjection, this.keyPrefix, this.topics, this.topicPattern, this.properties, this.startupMode, this.specificStartupOffsets, this.startupTimestampMillis, this.upsertMode);
        kafkaDynamicSource.producedDataType = this.producedDataType;
        kafkaDynamicSource.metadataKeys = this.metadataKeys;
        kafkaDynamicSource.watermarkStrategy = this.watermarkStrategy;
        return kafkaDynamicSource;
    }

    public String asSummaryString() {
        return "Kafka table source";
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        KafkaDynamicSource kafkaDynamicSource = (KafkaDynamicSource) obj;
        return Objects.equals(this.producedDataType, kafkaDynamicSource.producedDataType) && Objects.equals(this.metadataKeys, kafkaDynamicSource.metadataKeys) && Objects.equals(this.physicalDataType, kafkaDynamicSource.physicalDataType) && Objects.equals(this.keyDecodingFormat, kafkaDynamicSource.keyDecodingFormat) && Objects.equals(this.valueDecodingFormat, kafkaDynamicSource.valueDecodingFormat) && Arrays.equals(this.keyProjection, kafkaDynamicSource.keyProjection) && Arrays.equals(this.valueProjection, kafkaDynamicSource.valueProjection) && Objects.equals(this.keyPrefix, kafkaDynamicSource.keyPrefix) && Objects.equals(this.topics, kafkaDynamicSource.topics) && Objects.equals(String.valueOf(this.topicPattern), String.valueOf(kafkaDynamicSource.topicPattern)) && Objects.equals(this.properties, kafkaDynamicSource.properties) && this.startupMode == kafkaDynamicSource.startupMode && Objects.equals(this.specificStartupOffsets, kafkaDynamicSource.specificStartupOffsets) && this.startupTimestampMillis == kafkaDynamicSource.startupTimestampMillis && Objects.equals(Boolean.valueOf(this.upsertMode), Boolean.valueOf(kafkaDynamicSource.upsertMode)) && Objects.equals(this.watermarkStrategy, kafkaDynamicSource.watermarkStrategy);
    }

    public int hashCode() {
        return Objects.hash(this.producedDataType, this.metadataKeys, this.physicalDataType, this.keyDecodingFormat, this.valueDecodingFormat, this.keyProjection, this.valueProjection, this.keyPrefix, this.topics, this.topicPattern, this.properties, this.startupMode, this.specificStartupOffsets, Long.valueOf(this.startupTimestampMillis), Boolean.valueOf(this.upsertMode), this.watermarkStrategy);
    }

    protected FlinkKafkaConsumer<RowData> createKafkaConsumer(DeserializationSchema<RowData> deserializationSchema, DeserializationSchema<RowData> deserializationSchema2, TypeInformation<RowData> typeInformation) {
        DynamicKafkaDeserializationSchema.MetadataConverter[] metadataConverterArr = (DynamicKafkaDeserializationSchema.MetadataConverter[]) this.metadataKeys.stream().map(str -> {
            return (ReadableMetadata) Stream.of((Object[]) ReadableMetadata.values()).filter(readableMetadata -> {
                return readableMetadata.key.equals(str);
            }).findFirst().orElseThrow(IllegalStateException::new);
        }).map(readableMetadata -> {
            return readableMetadata.converter;
        }).toArray(i -> {
            return new DynamicKafkaDeserializationSchema.MetadataConverter[i];
        });
        boolean z = this.metadataKeys.size() > 0;
        int size = this.producedDataType.getChildren().size() - this.metadataKeys.size();
        DynamicKafkaDeserializationSchema dynamicKafkaDeserializationSchema = new DynamicKafkaDeserializationSchema(size, deserializationSchema, this.keyProjection, deserializationSchema2, IntStream.concat(IntStream.of(this.valueProjection), IntStream.range(this.keyProjection.length + this.valueProjection.length, size)).toArray(), z, metadataConverterArr, typeInformation, this.upsertMode);
        FlinkKafkaConsumer<RowData> flinkKafkaConsumer = this.topics != null ? new FlinkKafkaConsumer<>(this.topics, dynamicKafkaDeserializationSchema, this.properties) : new FlinkKafkaConsumer<>(this.topicPattern, dynamicKafkaDeserializationSchema, this.properties);
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$streaming$connectors$kafka$config$StartupMode[this.startupMode.ordinal()]) {
            case FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITHOUT_TIMESTAMP /* 1 */:
                flinkKafkaConsumer.setStartFromEarliest();
                break;
            case FlinkKafkaShuffleProducer.KafkaSerializer.TAG_WATERMARK /* 2 */:
                flinkKafkaConsumer.setStartFromLatest();
                break;
            case 3:
                flinkKafkaConsumer.setStartFromGroupOffsets();
                break;
            case 4:
                flinkKafkaConsumer.setStartFromSpecificOffsets(this.specificStartupOffsets);
                break;
            case 5:
                flinkKafkaConsumer.setStartFromTimestamp(this.startupTimestampMillis);
                break;
        }
        flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(this.properties.getProperty("group.id") != null);
        if (this.watermarkStrategy != null) {
            flinkKafkaConsumer.assignTimestampsAndWatermarks(this.watermarkStrategy);
        }
        return flinkKafkaConsumer;
    }

    @Nullable
    private DeserializationSchema<RowData> createDeserialization(DynamicTableSource.Context context, @Nullable DecodingFormat<DeserializationSchema<RowData>> decodingFormat, int[] iArr, @Nullable String str) {
        if (decodingFormat == null) {
            return null;
        }
        DataType projectRow = DataTypeUtils.projectRow(this.physicalDataType, iArr);
        if (str != null) {
            projectRow = DataTypeUtils.stripRowPrefix(projectRow, str);
        }
        return (DeserializationSchema) decodingFormat.createRuntimeDecoder(context, projectRow);
    }
}
