package com.netease.arctic.flink.table;

import com.netease.arctic.flink.FlinkSchemaUtil;
import com.netease.arctic.flink.table.descriptors.ArcticValidator;
import com.netease.arctic.flink.util.CompatibleFlinkPropertyUtil;
import com.netease.arctic.table.ArcticTable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.FlinkFilters;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/netease/arctic/flink/table/ArcticFileSource.class */
public class ArcticFileSource implements ScanTableSource, SupportsFilterPushDown, SupportsProjectionPushDown, SupportsLimitPushDown, SupportsWatermarkPushDown {
    private static final Logger LOG = LoggerFactory.getLogger(ArcticFileSource.class);
    private int[] projectedFields;
    private long limit;
    private List<Expression> filters;
    private ArcticTable table;

    @Nullable
    protected WatermarkStrategy<RowData> watermarkStrategy;
    private final ArcticTableLoader loader;
    private final TableSchema tableSchema;
    private final ReadableConfig readableConfig;

    private ArcticFileSource(ArcticFileSource arcticFileSource) {
        this.loader = arcticFileSource.loader;
        this.tableSchema = arcticFileSource.tableSchema;
        this.projectedFields = arcticFileSource.projectedFields;
        this.limit = arcticFileSource.limit;
        this.filters = arcticFileSource.filters;
        this.readableConfig = arcticFileSource.readableConfig;
        this.table = arcticFileSource.table;
    }

    public ArcticFileSource(ArcticTableLoader arcticTableLoader, TableSchema tableSchema, int[] iArr, ArcticTable arcticTable, long j, List<Expression> list, ReadableConfig readableConfig) {
        this.loader = arcticTableLoader;
        this.tableSchema = tableSchema;
        this.projectedFields = iArr;
        this.limit = j;
        this.table = arcticTable;
        this.filters = list;
        this.readableConfig = readableConfig;
    }

    public ArcticFileSource(ArcticTableLoader arcticTableLoader, TableSchema tableSchema, ArcticTable arcticTable, ReadableConfig readableConfig) {
        this(arcticTableLoader, tableSchema, null, arcticTable, -1L, ImmutableList.of(), readableConfig);
    }

    public void applyProjection(int[][] iArr) {
        this.projectedFields = new int[iArr.length];
        for (int i = 0; i < iArr.length; i++) {
            Preconditions.checkArgument(iArr[i].length == 1, "Don't support nested projection now.");
            this.projectedFields[i] = iArr[i][0];
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public DataStream<RowData> createDataStream(StreamExecutionEnvironment streamExecutionEnvironment) {
        return FlinkSource.forRowData().env(streamExecutionEnvironment).tableLoader(this.loader).arcticTable(this.table).project(getProjectedSchema()).limit(this.limit).filters(this.filters).flinkConf(this.readableConfig).watermarkStrategy(this.watermarkStrategy).build();
    }

    private TableSchema getProjectedSchema() {
        if (this.projectedFields == null) {
            return this.tableSchema;
        }
        String[] fieldNames = this.tableSchema.getFieldNames();
        DataType[] fieldDataTypes = this.tableSchema.getFieldDataTypes();
        String[] strArr = (String[]) Arrays.stream(this.projectedFields).mapToObj(i -> {
            return fieldNames[i];
        }).toArray(i2 -> {
            return new String[i2];
        });
        TableSchema.Builder fields = TableSchema.builder().fields(strArr, (DataType[]) Arrays.stream(this.projectedFields).mapToObj(i3 -> {
            return fieldDataTypes[i3];
        }).toArray(i4 -> {
            return new DataType[i4];
        }));
        FlinkSchemaUtil.addPrimaryKey(fields, this.table, this.tableSchema, strArr);
        TableSchema build = fields.build();
        LOG.info("TableSchema builder after addPrimaryKey, schema:{}", build);
        return build;
    }

    public void applyLimit(long j) {
        this.limit = j;
    }

    public SupportsFilterPushDown.Result applyFilters(List<ResolvedExpression> list) {
        ArrayList newArrayList = Lists.newArrayList();
        ArrayList newArrayList2 = Lists.newArrayList();
        for (ResolvedExpression resolvedExpression : list) {
            Optional convert = FlinkFilters.convert(resolvedExpression);
            if (convert.isPresent()) {
                newArrayList2.add(convert.get());
                newArrayList.add(resolvedExpression);
            }
        }
        this.filters = newArrayList2;
        return SupportsFilterPushDown.Result.of(newArrayList, list);
    }

    public boolean supportsNestedProjection() {
        return false;
    }

    public ChangelogMode getChangelogMode() {
        return this.table.isUnkeyedTable() ? ChangelogMode.insertOnly() : ChangelogMode.newBuilder().addContainedKind(RowKind.DELETE).addContainedKind(RowKind.INSERT).addContainedKind(RowKind.UPDATE_AFTER).addContainedKind(RowKind.UPDATE_BEFORE).build();
    }

    public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.ScanContext scanContext) {
        return new DataStreamScanProvider() { // from class: com.netease.arctic.flink.table.ArcticFileSource.1
            public DataStream<RowData> produceDataStream(StreamExecutionEnvironment streamExecutionEnvironment) {
                return ArcticFileSource.this.createDataStream(streamExecutionEnvironment);
            }

            public boolean isBounded() {
                return org.apache.iceberg.flink.source.FlinkSource.isBounded(ArcticFileSource.this.table.properties());
            }
        };
    }

    public DynamicTableSource copy() {
        return new ArcticFileSource(this);
    }

    public String asSummaryString() {
        return "Arctic File Source";
    }

    public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
        if (CompatibleFlinkPropertyUtil.propertyAsBoolean(Configuration.fromMap(this.table.properties()), ArcticValidator.DIM_TABLE_ENABLE)) {
            return;
        }
        this.watermarkStrategy = watermarkStrategy;
    }
}
