package com.netease.arctic.flink.read.hybrid.reader;

import com.netease.arctic.flink.read.hybrid.enumerator.InitializationFinishedEvent;
import com.netease.arctic.flink.read.hybrid.split.ArcticSplit;
import com.netease.arctic.flink.read.hybrid.split.ArcticSplitState;
import com.netease.arctic.flink.read.hybrid.split.SplitRequestEvent;
import com.netease.arctic.flink.util.FlinkClassReflectionUtil;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks;
import org.apache.flink.util.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/netease/arctic/flink/read/hybrid/reader/ArcticSourceReader.class */
public class ArcticSourceReader<T> extends SingleThreadMultiplexSourceReaderBase<ArcticRecordWithOffset<T>, T, ArcticSplit, ArcticSplitState> {
    public static final Logger LOGGER = LoggerFactory.getLogger(ArcticSourceReader.class);
    public ReaderOutput<T> output;
    private volatile boolean maxWatermarkToBeEmitted;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/netease/arctic/flink/read/hybrid/reader/ArcticSourceReader$ArcticReaderOutput.class */
    public static class ArcticReaderOutput<T> implements ReaderOutput<T> {
        private final ReaderOutput<T> internal;

        public ArcticReaderOutput(ReaderOutput<T> readerOutput) {
            Preconditions.checkArgument(readerOutput instanceof SourceOutputWithWatermarks, "readerOutput should be SourceOutputWithWatermarks, but was %s", new Object[]{readerOutput.getClass()});
            this.internal = readerOutput;
        }

        public void collect(T t) {
            this.internal.collect(t);
        }

        public void collect(T t, long j) {
            this.internal.collect(t, j);
        }

        public void emitWatermark(Watermark watermark) {
            this.internal.emitWatermark(watermark);
        }

        public void markIdle() {
            this.internal.markIdle();
        }

        public SourceOutput<T> createOutputForSplit(String str) {
            return this.internal.createOutputForSplit(str);
        }

        public void releaseOutputForSplit(String str) {
            FlinkClassReflectionUtil.emitPeriodWatermark(FlinkClassReflectionUtil.getSplitLocalOutput(this.internal));
            this.internal.releaseOutputForSplit(str);
        }
    }

    public ArcticSourceReader(ReaderFunction<T> readerFunction, Configuration configuration, SourceReaderContext sourceReaderContext, boolean z) {
        super(() -> {
            return new HybridSplitReader(readerFunction, sourceReaderContext);
        }, new ArcticRecordEmitter(z), configuration, sourceReaderContext);
        this.maxWatermarkToBeEmitted = false;
    }

    public void start() {
        if (getNumberOfCurrentlyAssignedSplits() == 0) {
            requestSplit(Collections.emptyList());
        }
        this.context.sendSourceEventToCoordinator(ReaderStartedEvent.INSTANCE);
    }

    protected void onSplitFinished(Map<String, ArcticSplitState> map) {
        requestSplit(Lists.newArrayList(map.keySet()));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ArcticSplitState initializedState(ArcticSplit arcticSplit) {
        return new ArcticSplitState(arcticSplit);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ArcticSplit toSplitType(String str, ArcticSplitState arcticSplitState) {
        return arcticSplitState.toSourceSplit();
    }

    private void requestSplit(Collection<String> collection) {
        this.context.sendSourceEventToCoordinator(new SplitRequestEvent(collection));
    }

    public void handleSourceEvents(SourceEvent sourceEvent) {
        if (sourceEvent instanceof InitializationFinishedEvent) {
            LOGGER.info("receive InitializationFinishedEvent");
            this.maxWatermarkToBeEmitted = true;
            emitWatermarkIfNeeded();
        }
    }

    private void emitWatermarkIfNeeded() {
        if (this.output == null || !this.maxWatermarkToBeEmitted) {
            return;
        }
        LOGGER.info("emit watermark");
        this.output.emitWatermark(new Watermark(Long.MAX_VALUE));
        this.maxWatermarkToBeEmitted = false;
    }

    public InputStatus pollNext(ReaderOutput<T> readerOutput) throws Exception {
        this.output = readerOutput;
        emitWatermarkIfNeeded();
        return super.pollNext(wrapOutput(readerOutput));
    }

    public ReaderOutput<T> wrapOutput(ReaderOutput<T> readerOutput) {
        return !(readerOutput instanceof SourceOutputWithWatermarks) ? readerOutput : new ArcticReaderOutput(readerOutput);
    }
}
