package com.netease.arctic.flink.write.hidden;

import com.netease.arctic.data.ChangeAction;
import com.netease.arctic.flink.shuffle.LogRecordV1;
import com.netease.arctic.flink.shuffle.ShuffleHelper;
import com.netease.arctic.flink.write.ArcticLogWriter;
import com.netease.arctic.flink.write.hidden.GlobalFlipCommitter;
import com.netease.arctic.flink.write.hidden.LogMsgFactory;
import com.netease.arctic.log.FormatVersion;
import com.netease.arctic.log.LogData;
import com.netease.arctic.log.LogDataJsonSerialization;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Properties;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.Schema;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/netease/arctic/flink/write/hidden/AbstractHiddenLogWriter.class */
public abstract class AbstractHiddenLogWriter extends ArcticLogWriter {
    public static final Logger LOG = LoggerFactory.getLogger(AbstractHiddenLogWriter.class);
    private static final long serialVersionUID = 1;
    private int subtaskId;
    private transient ListState<Long> checkpointedState;
    private transient ListState<String> hiddenLogJobIdentifyState;
    private transient ListState<Integer> parallelismState;
    private transient Long ckpComplete;
    private final Schema schema;
    private final Properties producerConfig;
    private final String topic;
    private final ShuffleHelper helper;
    protected final LogMsgFactory<RowData> factory;
    protected LogMsgFactory.Producer<RowData> producer;
    private GlobalFlipCommitter flipCommitter;
    private final LogData.FieldGetterFactory<RowData> fieldGetterFactory;
    protected transient LogDataJsonSerialization<RowData> logDataJsonSerialization;
    protected byte[] jobIdentify;
    protected transient LogData<RowData> logFlip;
    private transient boolean shouldCheckFlipSent = false;
    private transient boolean flipSentSucceed = false;
    protected FormatVersion logVersion = FormatVersion.FORMAT_VERSION_V1;
    protected long epicNo = 1;

    public AbstractHiddenLogWriter(Schema schema, Properties properties, String str, LogMsgFactory<RowData> logMsgFactory, LogData.FieldGetterFactory<RowData> fieldGetterFactory, byte[] bArr, ShuffleHelper shuffleHelper) {
        this.schema = schema;
        this.producerConfig = (Properties) Preconditions.checkNotNull(properties);
        this.topic = (String) Preconditions.checkNotNull(str);
        this.factory = logMsgFactory;
        this.fieldGetterFactory = fieldGetterFactory;
        this.jobIdentify = bArr;
        this.helper = shuffleHelper;
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        this.subtaskId = getRuntimeContext().getIndexOfThisSubtask();
        this.checkpointedState = stateInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor(this.subtaskId + "-task-writer-state", LongSerializer.INSTANCE));
        this.hiddenLogJobIdentifyState = stateInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("hidden-wal-writer-job-identify", StringSerializer.INSTANCE));
        this.parallelismState = stateInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("job-" + Arrays.toString(this.jobIdentify) + "-parallelism", IntSerializer.INSTANCE));
        this.flipCommitter = new GlobalFlipCommitter(getRuntimeContext().getGlobalAggregateManager(), new GlobalFlipCommitter.FlipCommitFunction(getRuntimeContext().getNumberOfParallelSubtasks(), this.schema, this.fieldGetterFactory, this.factory, this.producerConfig, this.topic, this.helper));
        int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
        if (stateInitializationContext.isRestored() && parallelismSame(numberOfParallelSubtasks)) {
            this.ckpComplete = (Long) ((Iterable) this.checkpointedState.get()).iterator().next();
            this.jobIdentify = ((String) ((Iterable) this.hiddenLogJobIdentifyState.get()).iterator().next()).getBytes(StandardCharsets.UTF_8);
            this.epicNo = this.ckpComplete.longValue();
            this.logFlip = new LogRecordV1(this.logVersion, this.jobIdentify, this.epicNo, true, ChangeAction.INSERT, new GenericRowData(0));
            this.shouldCheckFlipSent = true;
            this.flipSentSucceed = this.flipCommitter.commit(this.subtaskId, this.logFlip);
            this.epicNo++;
        } else {
            this.hiddenLogJobIdentifyState.clear();
            this.hiddenLogJobIdentifyState.add(new String(this.jobIdentify, 0, this.jobIdentify.length, StandardCharsets.UTF_8));
        }
        this.logDataJsonSerialization = new LogDataJsonSerialization<>((Schema) Preconditions.checkNotNull(this.schema), (LogData.FieldGetterFactory) Preconditions.checkNotNull(this.fieldGetterFactory));
        this.producer = this.factory.createProducer(this.producerConfig, this.topic, this.logDataJsonSerialization, this.helper);
        this.parallelismState.clear();
        this.parallelismState.add(Integer.valueOf(numberOfParallelSubtasks));
        LOG.info("initializeState subtaskId={}, restore={}, lastCkpComplete={}.", new Object[]{Integer.valueOf(this.subtaskId), Boolean.valueOf(stateInitializationContext.isRestored()), this.ckpComplete});
    }

    private boolean parallelismSame(int i) throws Exception {
        if (this.parallelismState == null || this.parallelismState.get() == null || !((Iterable) this.parallelismState.get()).iterator().hasNext()) {
            LOG.info("Can't find out parallelism state, ignore sending flips.");
            return false;
        }
        int intValue = ((Integer) ((Iterable) this.parallelismState.get()).iterator().next()).intValue();
        if (intValue == i) {
            return true;
        }
        LOG.warn("This job restored from state, but has changed parallelism, before:{}, now:{}, So ignore sending flips now.", Integer.valueOf(intValue), Integer.valueOf(i));
        return false;
    }

    public void open() throws Exception {
        this.producer.open();
    }

    public void processElement(StreamRecord<RowData> streamRecord) throws Exception {
        int i = 0;
        while (shouldCheckFlip() && !alreadySentFlip()) {
            Thread.sleep(100L);
            int i2 = i;
            i++;
            if (i2 % 100 == 0) {
                LOG.info("Still waiting for sending flip, while the other subtasks have committed to Global State. this subtask is {}.", Integer.valueOf(this.subtaskId));
            }
        }
    }

    private boolean alreadySentFlip() throws IOException {
        if (!this.flipSentSucceed) {
            this.flipSentSucceed = this.flipCommitter.hasCommittedFlip(this.logFlip);
        }
        return this.flipSentSucceed;
    }

    private boolean shouldCheckFlip() {
        return this.shouldCheckFlipSent;
    }

    public void prepareSnapshotPreBarrier(long j) throws Exception {
        super.prepareSnapshotPreBarrier(j);
        LOG.info("prepareSnapshotPreBarrier subtaskId={}, checkpointId={}.", Integer.valueOf(this.subtaskId), Long.valueOf(j));
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
        this.producer.flush();
        LOG.info("snapshotState subtaskId={}, checkpointId={}.", Integer.valueOf(this.subtaskId), Long.valueOf(stateSnapshotContext.getCheckpointId()));
        this.checkpointedState.clear();
        this.checkpointedState.add(Long.valueOf(stateSnapshotContext.getCheckpointId()));
        this.epicNo++;
    }

    public void close() throws Exception {
        if (this.producer != null) {
            this.producer.close();
        }
    }
}
