package com.netease.arctic.flink.write;

import com.netease.arctic.flink.shuffle.ShuffleHelper;
import com.netease.arctic.flink.table.ArcticTableLoader;
import com.netease.arctic.flink.write.hidden.HiddenLogWriter;
import com.netease.arctic.flink.write.hidden.LogMsgFactory;
import com.netease.arctic.log.LogData;
import java.time.Duration;
import java.util.Properties;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.Schema;

/* loaded from: input_file:com/netease/arctic/flink/write/AutomaticLogWriter.class */
public class AutomaticLogWriter extends ArcticLogWriter {
    private final AutomaticDoubleWriteStatus status;
    private final ArcticLogWriter arcticLogWriter;

    public AutomaticLogWriter(Schema schema, Properties properties, String str, LogMsgFactory<RowData> logMsgFactory, LogData.FieldGetterFactory<RowData> fieldGetterFactory, byte[] bArr, ShuffleHelper shuffleHelper, ArcticTableLoader arcticTableLoader, Duration duration) {
        this.arcticLogWriter = new HiddenLogWriter(schema, properties, str, logMsgFactory, fieldGetterFactory, bArr, shuffleHelper);
        this.status = new AutomaticDoubleWriteStatus(arcticTableLoader, duration);
    }

    public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<RowData>> output) {
        super.setup(streamTask, streamConfig, output);
        this.arcticLogWriter.setup(streamTask, streamConfig, output);
        this.status.setup(getRuntimeContext().getIndexOfThisSubtask());
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        this.arcticLogWriter.initializeState(stateInitializationContext);
    }

    public void open() throws Exception {
        super.open();
        this.arcticLogWriter.open();
        this.status.open();
    }

    public void processElement(StreamRecord<RowData> streamRecord) throws Exception {
        if (this.status.isDoubleWrite()) {
            this.arcticLogWriter.processElement(streamRecord);
        }
    }

    public void processWatermark(Watermark watermark) throws Exception {
        this.status.processWatermark(watermark);
        super.processWatermark(watermark);
    }

    public void prepareSnapshotPreBarrier(long j) throws Exception {
        if (this.status.isDoubleWrite()) {
            this.arcticLogWriter.prepareSnapshotPreBarrier(j);
        } else {
            this.status.sync();
        }
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        if (this.status.isDoubleWrite()) {
            this.arcticLogWriter.snapshotState(stateSnapshotContext);
        }
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        if (this.status.isDoubleWrite()) {
            this.arcticLogWriter.notifyCheckpointComplete(j);
        }
    }

    public void notifyCheckpointAborted(long j) throws Exception {
        if (this.status.isDoubleWrite()) {
            this.arcticLogWriter.notifyCheckpointAborted(j);
        }
    }

    public void close() throws Exception {
        if (this.status.isDoubleWrite()) {
            this.arcticLogWriter.close();
        }
    }

    public void endInput() throws Exception {
        if (this.status.isDoubleWrite()) {
            this.arcticLogWriter.endInput();
        }
    }
}
