package com.netease.arctic.flink.write;

import com.netease.arctic.flink.metric.MetricsGenerator;
import java.util.Objects;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.OutputTag;

/* loaded from: input_file:com/netease/arctic/flink/write/ArcticWriter.class */
public class ArcticWriter<OUT> extends AbstractStreamOperator<OUT> implements OneInputStreamOperator<RowData, OUT>, BoundedOneInput {
    private transient Meter meterFlowRate;
    private transient Meter meterSpeed;
    private final AbstractStreamOperator fileWriter;
    private final ArcticLogWriter logWriter;
    private final MetricsGenerator metricsGenerator;
    private static final String INFLUXDB_TAG_NAME = "arctic_task_id";
    private static final Output<StreamRecord<RowData>> EMPTY_OUTPUT = new Output<StreamRecord<RowData>>() { // from class: com.netease.arctic.flink.write.ArcticWriter.1
        public void emitWatermark(Watermark watermark) {
        }

        public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> streamRecord) {
        }

        public void collect(StreamRecord<RowData> streamRecord) {
        }

        public void emitLatencyMarker(LatencyMarker latencyMarker) {
        }

        public void close() {
        }
    };

    public ArcticWriter(ArcticLogWriter arcticLogWriter, AbstractStreamOperator abstractStreamOperator, MetricsGenerator metricsGenerator) {
        this.logWriter = arcticLogWriter;
        this.fileWriter = abstractStreamOperator;
        this.metricsGenerator = metricsGenerator;
    }

    public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<OUT>> output) {
        super.setup(streamTask, streamConfig, output);
        if (this.logWriter != null) {
            this.logWriter.setup(streamTask, streamConfig, EMPTY_OUTPUT);
        }
        if (this.fileWriter != null) {
            this.fileWriter.setup(streamTask, streamConfig, output);
        }
    }

    public void open() throws Exception {
        ExecutionConfig.GlobalJobParameters globalJobParameters = getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
        String str = Objects.nonNull(globalJobParameters.toMap().get(INFLUXDB_TAG_NAME)) ? (String) globalJobParameters.toMap().get(INFLUXDB_TAG_NAME) : "null";
        if (this.metricsGenerator.enable()) {
            MetricGroup addGroup = getRuntimeContext().getMetricGroup().addGroup(INFLUXDB_TAG_NAME, str);
            MetricsGenerator metricsGenerator = this.metricsGenerator;
            metricsGenerator.getClass();
            addGroup.gauge("record-latency", metricsGenerator::getCurrentLatency);
            LOG.info("add metrics record-latency");
        }
        if (this.metricsGenerator.isMetricEnable()) {
            this.meterFlowRate = getRuntimeContext().getMetricGroup().addGroup(INFLUXDB_TAG_NAME, str).meter("record-meter", new MeterView(60));
            LOG.info("add metrics record-meter");
            this.meterSpeed = getRuntimeContext().getMetricGroup().addGroup(INFLUXDB_TAG_NAME, str).meter("record-count", new MeterView(60));
            LOG.info("add metrics record-count");
        }
        if (this.logWriter != null) {
            this.logWriter.open();
        }
        if (this.fileWriter != null) {
            this.fileWriter.open();
        }
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        if (this.logWriter != null) {
            this.logWriter.initializeState(stateInitializationContext);
        }
        if (this.fileWriter != null) {
            this.fileWriter.initializeState(stateInitializationContext);
        }
    }

    public void prepareSnapshotPreBarrier(long j) throws Exception {
        if (this.logWriter != null) {
            this.logWriter.prepareSnapshotPreBarrier(j);
        }
        if (this.fileWriter != null) {
            this.fileWriter.prepareSnapshotPreBarrier(j);
        }
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        if (this.logWriter != null) {
            this.logWriter.snapshotState(stateSnapshotContext);
        }
        if (this.fileWriter != null) {
            this.fileWriter.snapshotState(stateSnapshotContext);
        }
    }

    public void endInput() throws Exception {
        if (this.logWriter != null) {
            this.logWriter.endInput();
        }
        if (this.fileWriter instanceof BoundedOneInput) {
            this.fileWriter.endInput();
        }
    }

    public void processElement(StreamRecord<RowData> streamRecord) throws Exception {
        if (this.metricsGenerator.isMetricEnable()) {
            this.meterSpeed.markEvent();
        }
        if (this.logWriter != null) {
            this.logWriter.processElement(streamRecord);
        }
        if (this.fileWriter instanceof Input) {
            this.fileWriter.processElement(streamRecord);
        }
        this.metricsGenerator.recordLatency(streamRecord);
    }

    public void processWatermark(Watermark watermark) throws Exception {
        if (this.logWriter != null) {
            this.logWriter.processWatermark(watermark);
        }
        if (this.fileWriter instanceof Input) {
            this.fileWriter.processWatermark(watermark);
        }
        super.processWatermark(watermark);
    }

    public void dispose() throws Exception {
        super.dispose();
        if (this.logWriter != null) {
            this.logWriter.dispose();
        }
        if (this.fileWriter != null) {
            this.fileWriter.dispose();
        }
    }

    public void close() throws Exception {
        if (this.logWriter != null) {
            this.logWriter.close();
        }
        if (this.fileWriter != null) {
            this.fileWriter.close();
        }
    }
}
