package com.netease.arctic.flink.write;

import com.netease.arctic.data.DataTreeNode;
import com.netease.arctic.flink.shuffle.ShuffleKey;
import com.netease.arctic.flink.shuffle.ShuffleRulePolicy;
import com.netease.arctic.flink.table.ArcticTableLoader;
import com.netease.arctic.flink.util.ArcticUtils;
import com.netease.arctic.io.ArcticFileIO;
import com.netease.arctic.table.ArcticTable;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.lang.ArrayUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
import org.apache.iceberg.flink.sink.TaskWriterFactory;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.io.BaseEncoding;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/netease/arctic/flink/write/ArcticFileWriter.class */
public class ArcticFileWriter extends AbstractStreamOperator<WriteResult> implements OneInputStreamOperator<RowData, WriteResult>, BoundedOneInput {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(ArcticFileWriter.class);
    private final ShuffleRulePolicy<RowData, ShuffleKey> shuffleRule;
    private final TaskWriterFactory<RowData> taskWriterFactory;
    private final int minFileSplitCount;
    private final ArcticTableLoader tableLoader;
    private final boolean upsert;
    private final boolean submitEmptySnapshot;
    private transient TaskWriter<RowData> writer;
    private transient int subTaskId;
    private transient int attemptId;
    private transient String jobId;
    private transient long checkpointId = 1;
    private transient ListState<Long> checkpointState;
    private transient ArcticTable table;

    public ArcticFileWriter(ShuffleRulePolicy<RowData, ShuffleKey> shuffleRulePolicy, TaskWriterFactory<RowData> taskWriterFactory, int i, ArcticTableLoader arcticTableLoader, boolean z, boolean z2) {
        this.shuffleRule = shuffleRulePolicy;
        this.taskWriterFactory = taskWriterFactory;
        this.minFileSplitCount = i;
        this.tableLoader = arcticTableLoader;
        this.upsert = z;
        this.submitEmptySnapshot = z2;
        LOG.info("ArcticFileWriter is created with minFileSplitCount: {}, upsert: {}, submitEmptySnapshot: {}", new Object[]{Integer.valueOf(i), Boolean.valueOf(z), Boolean.valueOf(z2)});
    }

    public void open() {
        this.attemptId = getRuntimeContext().getAttemptNumber();
        this.jobId = getContainingTask().getEnvironment().getJobID().toString();
        this.table = ArcticUtils.loadArcticTable(this.tableLoader);
        initTaskWriterFactory(Long.valueOf(getMask(this.subTaskId)));
        ArcticFileIO io = this.table.io();
        TaskWriterFactory<RowData> taskWriterFactory = this.taskWriterFactory;
        taskWriterFactory.getClass();
        this.writer = (TaskWriter) io.doAs(taskWriterFactory::create);
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        this.subTaskId = getRuntimeContext().getIndexOfThisSubtask();
        this.checkpointState = stateInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor(this.subTaskId + "-task-file-writer-state", LongSerializer.INSTANCE));
        if (stateInitializationContext.isRestored()) {
            this.checkpointId = ((Long) ((Iterable) this.checkpointState.get()).iterator().next()).longValue();
            this.checkpointId++;
        }
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
        this.checkpointState.clear();
        this.checkpointState.add(Long.valueOf(stateSnapshotContext.getCheckpointId()));
    }

    private void initTaskWriterFactory(Long l) {
        if (this.taskWriterFactory instanceof ArcticRowDataTaskWriterFactory) {
            if (l != null) {
                ((ArcticRowDataTaskWriterFactory) this.taskWriterFactory).setMask(l.longValue());
            }
            ((ArcticRowDataTaskWriterFactory) this.taskWriterFactory).setTransactionId(getTransactionId());
        }
        this.taskWriterFactory.initialize(this.subTaskId, this.attemptId);
    }

    private Long getTransactionId() {
        Long l;
        if (this.table.isKeyedTable()) {
            String encode = BaseEncoding.base16().encode((this.jobId + this.checkpointId).getBytes());
            l = Long.valueOf(this.table.asKeyedTable().beginTransaction(encode));
            LOG.info("table:{}, signature:{}, transactionId:{}. From jobId:{}, ckpId:{}", new Object[]{this.table.name(), encode, l, this.jobId, Long.valueOf(this.checkpointId)});
        } else {
            l = null;
        }
        return l;
    }

    @VisibleForTesting
    public long getCheckpointId() {
        return this.checkpointId;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v12, types: [java.util.Set] */
    /* JADX WARN: Type inference failed for: r0v22, types: [java.util.Set] */
    private long getMask(int i) {
        HashSet newHashSet;
        if (this.shuffleRule != null) {
            newHashSet = (Set) this.shuffleRule.getSubtaskTreeNodes().get(Integer.valueOf(i));
        } else if (this.table.isKeyedTable()) {
            newHashSet = (Set) IntStream.range(0, this.minFileSplitCount).mapToObj(i2 -> {
                return DataTreeNode.of(this.minFileSplitCount - 1, i2);
            }).collect(Collectors.toSet());
        } else {
            newHashSet = Sets.newHashSet();
            newHashSet.add(DataTreeNode.of(0L, 0L));
        }
        return ((DataTreeNode) newHashSet.iterator().next()).mask();
    }

    public void prepareSnapshotPreBarrier(long j) throws Exception {
        this.checkpointId = j + 1;
        this.table.io().doAs(() -> {
            completeAndEmitFiles();
            this.writer = null;
            return null;
        });
    }

    public void endInput() throws Exception {
        this.table.io().doAs(() -> {
            completeAndEmitFiles();
            return null;
        });
    }

    private void completeAndEmitFiles() throws IOException {
        if (this.writer != null) {
            emit(this.writer.complete());
        }
    }

    public void processElement(StreamRecord<RowData> streamRecord) throws Exception {
        RowData rowData = (RowData) streamRecord.getValue();
        this.table.io().doAs(() -> {
            if (this.writer == null) {
                initTaskWriterFactory(null);
                this.writer = this.taskWriterFactory.create();
            }
            if (this.upsert && RowKind.INSERT.equals(rowData.getRowKind())) {
                rowData.setRowKind(RowKind.DELETE);
                this.writer.write(rowData);
                rowData.setRowKind(RowKind.INSERT);
            }
            this.writer.write(rowData);
            return null;
        });
    }

    public void dispose() throws Exception {
        super.dispose();
        if (this.writer != null) {
            this.table.io().doAs(() -> {
                this.writer.close();
                return null;
            });
            this.writer = null;
        }
    }

    private void emit(WriteResult writeResult) {
        if (shouldEmit(writeResult)) {
            this.output.collect(new StreamRecord(writeResult));
        }
    }

    private boolean shouldEmit(WriteResult writeResult) {
        return this.submitEmptySnapshot || !(writeResult == null || (ArrayUtils.isEmpty(writeResult.dataFiles()) && ArrayUtils.isEmpty(writeResult.deleteFiles()) && ArrayUtils.isEmpty(writeResult.referencedDataFiles())));
    }

    @VisibleForTesting
    public TaskWriter<RowData> getWriter() {
        return this.writer;
    }
}
