package com.netease.arctic.flink.write;

import com.netease.arctic.flink.metric.MetricsGenerator;
import com.netease.arctic.flink.shuffle.RoundRobinShuffleRulePolicy;
import com.netease.arctic.flink.shuffle.ShuffleHelper;
import com.netease.arctic.flink.shuffle.ShuffleKey;
import com.netease.arctic.flink.shuffle.ShuffleRulePolicy;
import com.netease.arctic.flink.table.ArcticTableLoader;
import com.netease.arctic.flink.table.descriptors.ArcticValidator;
import com.netease.arctic.flink.util.ArcticUtils;
import com.netease.arctic.flink.util.CompatibleFlinkPropertyUtil;
import com.netease.arctic.flink.util.IcebergClassUtil;
import com.netease.arctic.flink.util.ProxyUtil;
import com.netease.arctic.table.ArcticTable;
import com.netease.arctic.table.DistributionHashMode;
import java.time.Duration;
import java.util.Map;
import java.util.Properties;
import javax.annotation.Nullable;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.Schema;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.sink.TaskWriterFactory;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/netease/arctic/flink/write/FlinkSink.class */
public class FlinkSink {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkSink.class);
    public static final String FILES_COMMITTER_NAME = "FilesCommitter";

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.netease.arctic.flink.write.FlinkSink$1, reason: invalid class name */
    /* loaded from: input_file:com/netease/arctic/flink/write/FlinkSink$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$iceberg$DistributionMode = new int[DistributionMode.values().length];

        static {
            try {
                $SwitchMap$org$apache$iceberg$DistributionMode[DistributionMode.NONE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$iceberg$DistributionMode[DistributionMode.HASH.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$iceberg$DistributionMode[DistributionMode.RANGE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* loaded from: input_file:com/netease/arctic/flink/write/FlinkSink$Builder.class */
    public static class Builder {
        private DataStream<RowData> rowDataInput;
        private ArcticTable table;
        private ArcticTableLoader tableLoader;
        private TableSchema flinkSchema;
        private Properties producerConfig;
        private String topic;
        private boolean overwrite;
        private DistributionHashMode distributionMode;

        private Builder() {
            this.rowDataInput = null;
            this.overwrite = false;
            this.distributionMode = null;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Builder forRowData(DataStream<RowData> dataStream) {
            this.rowDataInput = dataStream;
            return this;
        }

        public Builder table(ArcticTable arcticTable) {
            this.table = arcticTable;
            return this;
        }

        public Builder flinkSchema(TableSchema tableSchema) {
            this.flinkSchema = tableSchema;
            return this;
        }

        public Builder producerConfig(Properties properties) {
            this.producerConfig = properties;
            return this;
        }

        public Builder topic(String str) {
            this.topic = str;
            return this;
        }

        public Builder tableLoader(ArcticTableLoader arcticTableLoader) {
            this.tableLoader = arcticTableLoader;
            return this;
        }

        public Builder overwrite(boolean z) {
            this.overwrite = z;
            return this;
        }

        public Builder distribute(DistributionHashMode distributionHashMode) {
            this.distributionMode = distributionHashMode;
            return this;
        }

        DataStreamSink<?> withEmit(DataStream<RowData> dataStream, ArcticLogWriter arcticLogWriter, ArcticFileWriter arcticFileWriter, OneInputStreamOperator<WriteResult, Void> oneInputStreamOperator, int i, MetricsGenerator metricsGenerator, String str) {
            SingleOutputStreamOperator parallelism = dataStream.transform(ArcticWriter.class.getName(), TypeExtractor.createTypeInfo(WriteResult.class), new ArcticWriter(arcticLogWriter, arcticFileWriter, metricsGenerator)).name(String.format("ArcticWriter %s(%s)", this.table.name(), str)).setParallelism(i);
            if (oneInputStreamOperator != null) {
                parallelism = parallelism.transform(FlinkSink.FILES_COMMITTER_NAME, Types.VOID, oneInputStreamOperator).setParallelism(1).setMaxParallelism(1);
            }
            return parallelism.addSink(new DiscardingSink()).name(String.format("ArcticSink %s", this.table.name())).setParallelism(1);
        }

        public DataStreamSink<?> build() {
            Preconditions.checkNotNull(this.tableLoader, "table loader can not be null");
            initTableIfNeeded();
            Configuration configuration = new Configuration();
            Map properties = this.table.properties();
            configuration.getClass();
            properties.forEach(configuration::setString);
            RowType logicalType = this.flinkSchema.toRowDataType().getLogicalType();
            Schema reassignIds = TypeUtil.reassignIds(FlinkSchemaUtil.convert(this.flinkSchema), this.table.schema());
            int propertyAsInt = PropertyUtil.propertyAsInt(this.table.properties(), FactoryUtil.SINK_PARALLELISM.key(), this.rowDataInput.getParallelism());
            DistributionHashMode distributionHashMode = getDistributionHashMode();
            FlinkSink.LOG.info("take effect distribute mode: {}", distributionHashMode);
            ShuffleHelper build = ShuffleHelper.build(this.table, reassignIds, logicalType);
            ShuffleRulePolicy<RowData, ShuffleKey> buildShuffleRulePolicy = buildShuffleRulePolicy(build, propertyAsInt, distributionHashMode, this.overwrite, this.table);
            FlinkSink.LOG.info("shuffle policy config={}, actual={}", distributionHashMode, buildShuffleRulePolicy == null ? DistributionMode.NONE : distributionHashMode.getDesc());
            String str = (String) this.table.properties().getOrDefault(ArcticValidator.ARCTIC_EMIT_MODE.key(), ArcticValidator.ARCTIC_EMIT_MODE.defaultValue());
            boolean propertyAsBoolean = CompatibleFlinkPropertyUtil.propertyAsBoolean(this.table.properties(), ArcticValidator.ARCTIC_LATENCY_METRIC_ENABLE, false);
            boolean propertyAsBoolean2 = CompatibleFlinkPropertyUtil.propertyAsBoolean(this.table.properties(), ArcticValidator.ARCTIC_THROUGHPUT_METRIC_ENABLE, false);
            Duration duration = (Duration) configuration.get(ArcticValidator.AUTO_EMIT_LOGSTORE_WATERMARK_GAP);
            ArcticFileWriter createFileWriter = FlinkSink.createFileWriter(this.table, buildShuffleRulePolicy, this.overwrite, logicalType, str, this.tableLoader);
            ArcticLogWriter buildArcticLogWriter = ArcticUtils.buildArcticLogWriter(this.table.properties(), this.producerConfig, this.topic, this.flinkSchema, str, build, this.tableLoader, duration);
            MetricsGenerator metricsGenerator = ArcticUtils.getMetricsGenerator(propertyAsBoolean, propertyAsBoolean2, this.table, logicalType, reassignIds);
            if (buildShuffleRulePolicy != null) {
                this.rowDataInput = this.rowDataInput.partitionCustom(buildShuffleRulePolicy.generatePartitioner(), buildShuffleRulePolicy.generateKeySelector());
            }
            return withEmit(this.rowDataInput, buildArcticLogWriter, createFileWriter, FlinkSink.createFileCommitter(this.table, this.tableLoader, this.overwrite, str), propertyAsInt, metricsGenerator, str);
        }

        private void initTableIfNeeded() {
            if (this.table == null) {
                this.table = ArcticUtils.loadArcticTable(this.tableLoader);
            }
        }

        private DistributionHashMode getDistributionHashMode() {
            if (this.distributionMode != null) {
                return this.distributionMode;
            }
            switch (AnonymousClass1.$SwitchMap$org$apache$iceberg$DistributionMode[DistributionMode.fromName(PropertyUtil.propertyAsString(this.table.properties(), "write.distribution-mode", "hash")).ordinal()]) {
                case 1:
                    return DistributionHashMode.NONE;
                case 2:
                    return DistributionHashMode.valueOfDesc(PropertyUtil.propertyAsString(this.table.properties(), "write.distribution.hash-mode", ArcticValidator.ARCTIC_EMIT_AUTO));
                case 3:
                    FlinkSink.LOG.warn("Fallback to use 'none' distribution mode, because {}={} is not supported in flink now", "write.distribution-mode", DistributionMode.RANGE.modeName());
                    return DistributionHashMode.NONE;
                default:
                    return DistributionHashMode.AUTO;
            }
        }

        @Nullable
        public static ShuffleRulePolicy<RowData, ShuffleKey> buildShuffleRulePolicy(ShuffleHelper shuffleHelper, int i, DistributionHashMode distributionHashMode, boolean z, ArcticTable arcticTable) {
            if (distributionHashMode == DistributionHashMode.AUTO) {
                distributionHashMode = DistributionHashMode.autoSelect(shuffleHelper.isPrimaryKeyExist(), shuffleHelper.isPartitionKeyExist());
            }
            if (distributionHashMode == DistributionHashMode.NONE) {
                return null;
            }
            if (distributionHashMode.mustByPrimaryKey() && !shuffleHelper.isPrimaryKeyExist()) {
                throw new IllegalArgumentException("illegal shuffle policy " + distributionHashMode.getDesc() + " for table without primary key");
            }
            if (!distributionHashMode.mustByPartition() || shuffleHelper.isPartitionKeyExist()) {
                return new RoundRobinShuffleRulePolicy(shuffleHelper, i, ArcticUtils.isToBase(z) ? PropertyUtil.propertyAsInt(arcticTable.properties(), "base.file-index.hash-bucket", 4) : PropertyUtil.propertyAsInt(arcticTable.properties(), "change.file-index.hash-bucket", 4), distributionHashMode);
            }
            throw new IllegalArgumentException("illegal shuffle policy " + distributionHashMode.getDesc() + " for table without partition");
        }

        /* synthetic */ Builder(AnonymousClass1 anonymousClass1) {
            this();
        }
    }

    public static Builder forRowData(DataStream<RowData> dataStream) {
        return new Builder(null).forRowData(dataStream);
    }

    public static ArcticFileWriter createFileWriter(ArcticTable arcticTable, ShuffleRulePolicy shuffleRulePolicy, boolean z, RowType rowType, ArcticTableLoader arcticTableLoader) {
        return createFileWriter(arcticTable, shuffleRulePolicy, z, rowType, "file", arcticTableLoader);
    }

    public static ArcticFileWriter createFileWriter(ArcticTable arcticTable, ShuffleRulePolicy shuffleRulePolicy, boolean z, RowType rowType, String str, ArcticTableLoader arcticTableLoader) {
        if (!ArcticUtils.arcticFileWriterEnable(str)) {
            return null;
        }
        LOG.info("with maxOpenFilesSizeBytes = {}MB, close biggest/earliest file to avoid OOM", Long.valueOf(PropertyUtil.propertyAsLong(arcticTable.properties(), ArcticValidator.ARCTIC_WRITE_MAX_OPEN_FILE_SIZE, ArcticValidator.ARCTIC_WRITE_MAX_OPEN_FILE_SIZE_DEFAULT) >> 20));
        return new ArcticFileWriter(shuffleRulePolicy, createTaskWriterFactory(arcticTable, z, rowType), PropertyUtil.propertyAsInt(arcticTable.properties(), "change.file-index.hash-bucket", 4), arcticTableLoader, arcticTable.isKeyedTable() && PropertyUtil.propertyAsBoolean(arcticTable.properties(), "write.upsert.enabled", false), PropertyUtil.propertyAsBoolean(arcticTable.properties(), ArcticValidator.SUBMIT_EMPTY_SNAPSHOTS.key(), ((Boolean) ArcticValidator.SUBMIT_EMPTY_SNAPSHOTS.defaultValue()).booleanValue()));
    }

    private static TaskWriterFactory<RowData> createTaskWriterFactory(ArcticTable arcticTable, boolean z, RowType rowType) {
        return new ArcticRowDataTaskWriterFactory(arcticTable, rowType, z);
    }

    public static OneInputStreamOperator<WriteResult, Void> createFileCommitter(ArcticTable arcticTable, ArcticTableLoader arcticTableLoader, boolean z) {
        return createFileCommitter(arcticTable, arcticTableLoader, z, "file");
    }

    public static OneInputStreamOperator<WriteResult, Void> createFileCommitter(ArcticTable arcticTable, ArcticTableLoader arcticTableLoader, boolean z, String str) {
        if (!ArcticUtils.arcticFileWriterEnable(str)) {
            return null;
        }
        arcticTableLoader.switchLoadInternalTableForKeyedTable(ArcticUtils.isToBase(z));
        return (OneInputStreamOperator) ProxyUtil.getProxy(IcebergClassUtil.newIcebergFilesCommitter(arcticTableLoader, z, arcticTable.io()), arcticTable.io());
    }
}
