package com.netease.arctic.flink.read.hybrid.reader;

import com.netease.arctic.flink.read.hybrid.split.ArcticSplit;
import com.netease.arctic.flink.read.hybrid.split.ChangelogSplit;
import com.netease.arctic.flink.read.hybrid.split.SnapshotSplit;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Queue;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.base.source.reader.RecordsBySplits;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.iceberg.io.CloseableIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/netease/arctic/flink/read/hybrid/reader/HybridSplitReader.class */
public class HybridSplitReader<T> implements SplitReader<ArcticRecordWithOffset<T>, ArcticSplit> {
    private static final Logger LOG = LoggerFactory.getLogger(HybridSplitReader.class);
    private final ReaderFunction<T> openSplitFunction;
    private final int indexOfSubtask;
    private final Queue<ArcticSplit> splits = new ArrayDeque();
    private CloseableIterator<RecordsWithSplitIds<ArcticRecordWithOffset<T>>> currentReader;
    private String currentSplitId;

    public HybridSplitReader(ReaderFunction<T> readerFunction, SourceReaderContext sourceReaderContext) {
        this.openSplitFunction = readerFunction;
        this.indexOfSubtask = sourceReaderContext.getIndexOfSubtask();
    }

    public RecordsWithSplitIds<ArcticRecordWithOffset<T>> fetch() throws IOException {
        if (this.currentReader == null) {
            if (this.splits.isEmpty()) {
                return new RecordsBySplits(Collections.emptyMap(), Collections.emptySet());
            }
            ArcticSplit poll = this.splits.poll();
            this.currentReader = this.openSplitFunction.apply(poll);
            this.currentSplitId = poll.splitId();
        }
        if (!this.currentReader.hasNext()) {
            return finishSplit();
        }
        try {
            return (RecordsWithSplitIds) this.currentReader.next();
        } catch (UncheckedIOException e) {
            throw e.getCause();
        }
    }

    public void handleSplitsChanges(SplitsChange<ArcticSplit> splitsChange) {
        if (!(splitsChange instanceof SplitsAddition)) {
            throw new UnsupportedOperationException(String.format("The SplitChange type of %s is not supported.", splitsChange.getClass()));
        }
        LOG.info("Handling a split change {}.", splitsChange);
        splitsChange.splits().forEach(arcticSplit -> {
            if (!(arcticSplit instanceof SnapshotSplit) && !(arcticSplit instanceof ChangelogSplit)) {
                throw new IllegalArgumentException(String.format("As of now, The %s of SourceSplit type is unsupported, available source splits are %s, %s.", arcticSplit.getClass().getSimpleName(), SnapshotSplit.class.getSimpleName(), ChangelogSplit.class.getSimpleName()));
            }
            this.splits.add(arcticSplit);
        });
    }

    public void wakeUp() {
    }

    public void close() throws Exception {
        this.currentSplitId = null;
        if (this.currentReader != null) {
            this.currentReader.close();
        }
    }

    private RecordsWithSplitIds<ArcticRecordWithOffset<T>> finishSplit() throws IOException {
        if (this.currentReader != null) {
            this.currentReader.close();
            this.currentReader = null;
        }
        ArrayBatchRecords finishedSplit = ArrayBatchRecords.finishedSplit(this.currentSplitId);
        LOG.info("Split reader {} finished split: {}", Integer.valueOf(this.indexOfSubtask), this.currentSplitId);
        this.currentSplitId = null;
        return finishedSplit;
    }
}
