package com.netease.arctic.flink.table;

import com.netease.arctic.flink.interceptor.ProxyFactory;
import com.netease.arctic.flink.read.ArcticSource;
import com.netease.arctic.flink.read.hybrid.reader.RowDataReaderFunction;
import com.netease.arctic.flink.read.source.ArcticScanContext;
import com.netease.arctic.flink.table.descriptors.ArcticValidator;
import com.netease.arctic.flink.util.ArcticUtils;
import com.netease.arctic.flink.util.CompatibleFlinkPropertyUtil;
import com.netease.arctic.flink.util.IcebergClassUtil;
import com.netease.arctic.flink.util.ProxyUtil;
import com.netease.arctic.table.ArcticTable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.source.FlinkInputFormat;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

/* loaded from: input_file:com/netease/arctic/flink/table/FlinkSource.class */
public class FlinkSource {

    /* loaded from: input_file:com/netease/arctic/flink/table/FlinkSource$Builder.class */
    public static final class Builder {
        private StreamExecutionEnvironment env;
        private ArcticTable arcticTable;
        private ArcticTableLoader tableLoader;
        private TableSchema projectedSchema;
        private List<Expression> filters;
        private ReadableConfig flinkConf;
        private Map<String, String> properties;
        private long limit;
        private WatermarkStrategy<RowData> watermarkStrategy;
        private final ArcticScanContext.Builder contextBuilder;

        private Builder() {
            this.flinkConf = new Configuration();
            this.properties = new HashMap();
            this.limit = -1L;
            this.watermarkStrategy = WatermarkStrategy.noWatermarks();
            this.contextBuilder = ArcticScanContext.arcticBuilder();
        }

        public Builder env(StreamExecutionEnvironment streamExecutionEnvironment) {
            this.env = streamExecutionEnvironment;
            return this;
        }

        public Builder arcticTable(ArcticTable arcticTable) {
            this.arcticTable = arcticTable;
            this.properties.putAll(arcticTable.properties());
            return this;
        }

        public Builder tableLoader(ArcticTableLoader arcticTableLoader) {
            this.tableLoader = arcticTableLoader;
            return this;
        }

        public Builder project(TableSchema tableSchema) {
            this.projectedSchema = tableSchema;
            return this;
        }

        public Builder limit(long j) {
            this.limit = j;
            this.contextBuilder.limit(j);
            return this;
        }

        public Builder filters(List<Expression> list) {
            this.filters = list;
            this.contextBuilder.filters(list);
            return this;
        }

        public Builder flinkConf(ReadableConfig readableConfig) {
            this.flinkConf = readableConfig;
            return this;
        }

        public Builder properties(Map<String, String> map) {
            this.properties.putAll(map);
            return this;
        }

        public Builder watermarkStrategy(WatermarkStrategy<RowData> watermarkStrategy) {
            if (watermarkStrategy != null) {
                this.watermarkStrategy = watermarkStrategy;
            }
            return this;
        }

        public DataStream<RowData> build() {
            Preconditions.checkNotNull(this.env, "StreamExecutionEnvironment should not be null");
            loadTableIfNeeded();
            if (this.arcticTable.isUnkeyedTable()) {
                return buildUnkeyedTableSource();
            }
            if (this.projectedSchema == null) {
                this.contextBuilder.project(this.arcticTable.schema());
            } else {
                this.contextBuilder.project(FlinkSchemaUtil.convert(this.arcticTable.schema(), com.netease.arctic.flink.FlinkSchemaUtil.filterWatermark(this.projectedSchema)));
            }
            this.contextBuilder.fromProperties(this.properties);
            ArcticScanContext build = this.contextBuilder.build();
            RowDataReaderFunction rowDataReaderFunction = new RowDataReaderFunction(this.flinkConf, this.arcticTable.schema(), build.project(), this.arcticTable.asKeyedTable().primaryKeySpec(), build.nameMapping(), build.caseSensitive(), this.arcticTable.io());
            boolean propertyAsBoolean = CompatibleFlinkPropertyUtil.propertyAsBoolean(this.properties, ArcticValidator.DIM_TABLE_ENABLE.key(), ((Boolean) ArcticValidator.DIM_TABLE_ENABLE.defaultValue()).booleanValue());
            return this.env.fromSource(new ArcticSource(this.tableLoader, build, rowDataReaderFunction, InternalTypeInfo.of(this.projectedSchema != null ? propertyAsBoolean ? com.netease.arctic.flink.FlinkSchemaUtil.toRowType(this.projectedSchema) : com.netease.arctic.flink.FlinkSchemaUtil.toRowType(com.netease.arctic.flink.FlinkSchemaUtil.filterWatermark(this.projectedSchema)) : FlinkSchemaUtil.convert(build.project())), this.arcticTable.name(), propertyAsBoolean), this.watermarkStrategy, ArcticSource.class.getName());
        }

        private void loadTableIfNeeded() {
            if (this.tableLoader == null || this.arcticTable != null) {
                return;
            }
            this.arcticTable = ArcticUtils.loadArcticTable(this.tableLoader);
            this.properties.putAll(this.arcticTable.properties());
        }

        public DataStream<RowData> buildUnkeyedTableSource() {
            return wrapKrb(org.apache.iceberg.flink.source.FlinkSource.forRowData().env(this.env).project(this.projectedSchema).tableLoader(this.tableLoader).filters(this.filters).properties(this.properties).flinkConf(this.flinkConf).limit(this.limit).build());
        }

        private DataStream<RowData> wrapKrb(DataStream<RowData> dataStream) {
            IcebergClassUtil.clean(this.env);
            LegacySourceTransformation transformation = dataStream.getTransformation();
            if (!(transformation instanceof OneInputTransformation)) {
                LegacySourceTransformation legacySourceTransformation = transformation;
                return this.env.createInput((InputFormat) ProxyUtil.getProxy(IcebergClassUtil.getSourceFunction(legacySourceTransformation.getOperator()).getFormat(), this.arcticTable.io()), legacySourceTransformation.getOutputType()).setParallelism(transformation.getParallelism());
            }
            OneInputTransformation transformation2 = dataStream.getTransformation();
            ProxyFactory<FlinkInputFormat> inputFormatProxyFactory = IcebergClassUtil.getInputFormatProxyFactory(transformation2.getOperatorFactory(), this.arcticTable.io(), this.arcticTable.schema());
            if (transformation2.getInputs().isEmpty()) {
                return this.env.addSource(new UnkeyedInputFormatSourceFunction(inputFormatProxyFactory, transformation2.getOutputType())).setParallelism(transformation2.getParallelism());
            }
            LegacySourceTransformation legacySourceTransformation2 = (LegacySourceTransformation) transformation2.getInputs().get(0);
            return this.env.addSource((SourceFunction) ProxyUtil.getProxy(IcebergClassUtil.getSourceFunction(legacySourceTransformation2.getOperator()), this.arcticTable.io()), legacySourceTransformation2.getName(), legacySourceTransformation2.getOutputType()).transform(transformation2.getName(), transformation2.getOutputType(), new UnkeyedInputFormatOperatorFactory(inputFormatProxyFactory));
        }
    }

    private FlinkSource() {
    }

    public static Builder forRowData() {
        return new Builder();
    }
}
