package com.netease.arctic.flink.table;

import com.netease.arctic.flink.shuffle.ReadShuffleRulePolicy;
import com.netease.arctic.flink.shuffle.ShuffleHelper;
import com.netease.arctic.flink.table.descriptors.ArcticValidator;
import com.netease.arctic.table.ArcticTable;
import com.netease.arctic.table.DistributionHashMode;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.Schema;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/netease/arctic/flink/table/ArcticDynamicSource.class */
public class ArcticDynamicSource implements ScanTableSource, SupportsFilterPushDown, SupportsProjectionPushDown, SupportsLimitPushDown, SupportsWatermarkPushDown {
    public static final Logger LOG = LoggerFactory.getLogger(ArcticDynamicSource.class);
    protected final String tableName;
    private final ScanTableSource arcticDynamicSource;
    private final ArcticTable arcticTable;
    private final Map<String, String> properties;
    private RowType flinkSchemaRowType;
    private Schema readSchema;

    @Nullable
    protected WatermarkStrategy<RowData> watermarkStrategy;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.netease.arctic.flink.table.ArcticDynamicSource$2, reason: invalid class name */
    /* loaded from: input_file:com/netease/arctic/flink/table/ArcticDynamicSource$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$iceberg$DistributionMode;
        static final /* synthetic */ int[] $SwitchMap$com$netease$arctic$table$DistributionHashMode = new int[DistributionHashMode.values().length];

        static {
            try {
                $SwitchMap$com$netease$arctic$table$DistributionHashMode[DistributionHashMode.NONE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$netease$arctic$table$DistributionHashMode[DistributionHashMode.PRIMARY_KEY.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$netease$arctic$table$DistributionHashMode[DistributionHashMode.PARTITION_KEY.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$netease$arctic$table$DistributionHashMode[DistributionHashMode.PRIMARY_PARTITION_KEY.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            $SwitchMap$org$apache$iceberg$DistributionMode = new int[DistributionMode.values().length];
            try {
                $SwitchMap$org$apache$iceberg$DistributionMode[DistributionMode.NONE.ordinal()] = 1;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$iceberg$DistributionMode[DistributionMode.HASH.ordinal()] = 2;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    public ArcticDynamicSource(String str, ScanTableSource scanTableSource, ArcticTable arcticTable, Schema schema, RowType rowType, Map<String, String> map) {
        this.tableName = str;
        this.arcticDynamicSource = scanTableSource;
        this.arcticTable = arcticTable;
        this.properties = map;
        this.readSchema = schema;
        this.flinkSchemaRowType = rowType;
    }

    public ArcticDynamicSource(String str, ScanTableSource scanTableSource, ArcticTable arcticTable, TableSchema tableSchema, Map<String, String> map) {
        this.tableName = str;
        this.arcticDynamicSource = scanTableSource;
        this.arcticTable = arcticTable;
        this.properties = map;
        if (tableSchema == null) {
            this.readSchema = arcticTable.schema();
            this.flinkSchemaRowType = FlinkSchemaUtil.convert(this.readSchema);
        } else {
            this.readSchema = TypeUtil.reassignIds(FlinkSchemaUtil.convert(com.netease.arctic.flink.FlinkSchemaUtil.filterWatermark(tableSchema)), arcticTable.schema());
            this.flinkSchemaRowType = tableSchema.toRowDataType().getLogicalType();
        }
    }

    public ChangelogMode getChangelogMode() {
        return this.arcticDynamicSource.getChangelogMode();
    }

    public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.ScanContext scanContext) {
        DataStreamScanProvider scanRuntimeProvider = this.arcticDynamicSource.getScanRuntimeProvider(scanContext);
        Preconditions.checkArgument(scanRuntimeProvider instanceof DataStreamScanProvider, "file or log ScanRuntimeProvider should be DataStreamScanProvider, but provided is " + scanRuntimeProvider.getClass());
        final DataStreamScanProvider dataStreamScanProvider = scanRuntimeProvider;
        final DistributionHashMode distributionHashMode = getDistributionHashMode();
        return new DataStreamScanProvider() { // from class: com.netease.arctic.flink.table.ArcticDynamicSource.1
            public DataStream<RowData> produceDataStream(StreamExecutionEnvironment streamExecutionEnvironment) {
                DataStream<RowData> distribute = ArcticDynamicSource.this.distribute(dataStreamScanProvider.produceDataStream(streamExecutionEnvironment), distributionHashMode);
                UserGroupInformation.reset();
                ArcticDynamicSource.LOG.info("ugi reset");
                return distribute;
            }

            public boolean isBounded() {
                return false;
            }
        };
    }

    private DistributionHashMode getDistributionHashMode() {
        switch (AnonymousClass2.$SwitchMap$org$apache$iceberg$DistributionMode[DistributionMode.fromName(PropertyUtil.propertyAsString(this.properties, "read.distribution-mode", "hash")).ordinal()]) {
            case 1:
                return DistributionHashMode.NONE;
            case 2:
                String propertyAsString = PropertyUtil.propertyAsString(this.properties, "read.distribution.hash-mode", (String) null);
                if (propertyAsString == null) {
                    if (this.arcticTable.isUnkeyedTable()) {
                        return DistributionHashMode.NONE;
                    }
                    propertyAsString = ArcticValidator.ARCTIC_EMIT_AUTO;
                }
                return DistributionHashMode.valueOfDesc(propertyAsString);
            default:
                return DistributionHashMode.AUTO;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public DataStream<RowData> distribute(DataStream<RowData> dataStream, DistributionHashMode distributionHashMode) {
        ShuffleHelper build = ShuffleHelper.build(this.arcticTable, this.readSchema, this.flinkSchemaRowType);
        if (distributionHashMode == DistributionHashMode.AUTO) {
            distributionHashMode = DistributionHashMode.autoSelect(this.arcticTable.isKeyedTable(), build.isPartitionKeyExist());
        }
        LOG.info("source distribute mode in effect. {}", distributionHashMode);
        switch (AnonymousClass2.$SwitchMap$com$netease$arctic$table$DistributionHashMode[distributionHashMode.ordinal()]) {
            case 1:
                return dataStream;
            case 2:
                Preconditions.checkArgument(this.arcticTable.isKeyedTable(), "illegal shuffle policy " + distributionHashMode.getDesc() + " for table without primary key");
                return hash(dataStream, build, DistributionHashMode.PRIMARY_KEY);
            case 3:
                Preconditions.checkArgument(!this.arcticTable.spec().isUnpartitioned(), "illegal shuffle policy " + distributionHashMode.getDesc() + " for table without partition key");
                return hash(dataStream, build, DistributionHashMode.PARTITION_KEY);
            case 4:
                Preconditions.checkArgument(this.arcticTable.isKeyedTable() && !this.arcticTable.spec().isUnpartitioned(), "illegal shuffle policy " + distributionHashMode.getDesc() + " for table without primary key or partition key");
                return hash(dataStream, build, DistributionHashMode.PRIMARY_PARTITION_KEY);
            default:
                throw new RuntimeException("Unrecognized read.distribution.hash-mode: " + distributionHashMode);
        }
    }

    public DataStream<RowData> hash(DataStream<RowData> dataStream, ShuffleHelper shuffleHelper, DistributionHashMode distributionHashMode) {
        ReadShuffleRulePolicy readShuffleRulePolicy = new ReadShuffleRulePolicy(shuffleHelper, distributionHashMode);
        return dataStream.partitionCustom(readShuffleRulePolicy.generatePartitioner(), readShuffleRulePolicy.generateKeySelector());
    }

    public DynamicTableSource copy() {
        return new ArcticDynamicSource(this.tableName, this.arcticDynamicSource, this.arcticTable, this.readSchema, this.flinkSchemaRowType, this.properties);
    }

    public String asSummaryString() {
        return "Arctic Dynamic Source";
    }

    public SupportsFilterPushDown.Result applyFilters(List<ResolvedExpression> list) {
        return this.arcticDynamicSource instanceof SupportsFilterPushDown ? this.arcticDynamicSource.applyFilters(list) : SupportsFilterPushDown.Result.of(Collections.emptyList(), list);
    }

    public boolean supportsNestedProjection() {
        if (this.arcticDynamicSource instanceof SupportsProjectionPushDown) {
            return this.arcticDynamicSource.supportsNestedProjection();
        }
        return false;
    }

    public void applyProjection(int[][] iArr) {
        int[] iArr2 = new int[iArr.length];
        for (int i = 0; i < iArr.length; i++) {
            org.apache.flink.util.Preconditions.checkArgument(iArr[i].length == 1, "Don't support nested projection now.");
            iArr2[i] = iArr[i][0];
        }
        List columns = this.readSchema.columns();
        IntStream stream = Arrays.stream(iArr2);
        columns.getClass();
        this.readSchema = new Schema(com.netease.arctic.flink.FlinkSchemaUtil.addPrimaryKey((List) stream.mapToObj(columns::get).collect(Collectors.toList()), this.arcticTable));
        this.flinkSchemaRowType = FlinkSchemaUtil.convert(this.readSchema);
        if (this.arcticDynamicSource instanceof SupportsProjectionPushDown) {
            this.arcticDynamicSource.applyProjection(iArr);
        }
    }

    public void applyLimit(long j) {
        if (this.arcticDynamicSource instanceof SupportsLimitPushDown) {
            this.arcticDynamicSource.applyLimit(j);
        }
    }

    public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
        if (this.arcticDynamicSource instanceof SupportsWatermarkPushDown) {
            this.arcticDynamicSource.applyWatermark(watermarkStrategy);
        }
    }
}
