package com.netease.arctic.flink.write;

import com.netease.arctic.flink.table.ArcticTableLoader;
import com.netease.arctic.flink.table.descriptors.ArcticValidator;
import com.netease.arctic.flink.util.ArcticUtils;
import com.netease.arctic.table.ArcticTable;
import java.io.Serializable;
import java.time.Duration;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.iceberg.UpdateProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/netease/arctic/flink/write/AutomaticDoubleWriteStatus.class */
public class AutomaticDoubleWriteStatus implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(AutomaticDoubleWriteStatus.class);
    private static final long serialVersionUID = 1;
    private final ArcticTableLoader tableLoader;
    private final AutomaticWriteSpecification specification;
    private ArcticTable table;
    private transient boolean shouldDoubleWrite = false;
    private int subtaskId;

    public AutomaticDoubleWriteStatus(ArcticTableLoader arcticTableLoader, Duration duration) {
        this.tableLoader = arcticTableLoader;
        this.specification = new AutomaticWriteSpecification(duration);
    }

    public void setup(int i) {
        this.subtaskId = i;
    }

    public void open() {
        this.table = ArcticUtils.loadArcticTable(this.tableLoader);
        sync();
    }

    public boolean isDoubleWrite() {
        return this.shouldDoubleWrite;
    }

    public void processWatermark(Watermark watermark) {
        if (!isDoubleWrite() && this.specification.shouldDoubleWrite(watermark.getTimestamp())) {
            this.shouldDoubleWrite = true;
            LOG.info("processWatermark {}, subTaskId is {}, should double write is true.", watermark, Integer.valueOf(this.subtaskId));
            LOG.info("begin update arctic table, set {} to true", ArcticValidator.LOG_STORE_CATCH_UP.key());
            UpdateProperties updateProperties = this.table.updateProperties();
            updateProperties.set(ArcticValidator.LOG_STORE_CATCH_UP.key(), String.valueOf(true));
            updateProperties.set(ArcticValidator.LOG_STORE_CATCH_UP_TIMESTAMP.key(), String.valueOf(System.currentTimeMillis()));
            updateProperties.commit();
            LOG.info("end update arctic table.");
        }
    }

    public void sync() {
        this.table.refresh();
        this.shouldDoubleWrite = Boolean.parseBoolean((String) this.table.properties().getOrDefault(ArcticValidator.LOG_STORE_CATCH_UP.key(), String.valueOf(ArcticValidator.LOG_STORE_CATCH_UP.defaultValue())));
        LOG.info("AutomaticDoubleWriteStatus sync, subTaskId: {}, should double write: {}", Integer.valueOf(this.subtaskId), Boolean.valueOf(this.shouldDoubleWrite));
    }
}
