package com.netease.arctic.flink.read.hybrid.reader;

import com.netease.arctic.flink.read.hybrid.split.ArcticSplit;
import com.netease.arctic.flink.read.hybrid.split.ChangelogSplit;
import com.netease.arctic.flink.read.source.ChangeLogDataIterator;
import com.netease.arctic.flink.read.source.DataIterator;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.iceberg.io.CloseableIterator;

/* loaded from: input_file:com/netease/arctic/flink/read/hybrid/reader/DataIteratorReaderFunction.class */
public abstract class DataIteratorReaderFunction<T> implements ReaderFunction<T> {
    private final DataIteratorBatcher<T> batcher;

    public DataIteratorReaderFunction(DataIteratorBatcher<T> dataIteratorBatcher) {
        this.batcher = dataIteratorBatcher;
    }

    protected abstract DataIterator<T> createDataIterator(ArcticSplit arcticSplit);

    @Override // java.util.function.Function
    public CloseableIterator<RecordsWithSplitIds<ArcticRecordWithOffset<T>>> apply(ArcticSplit arcticSplit) {
        DataIterator<T> createDataIterator = createDataIterator(arcticSplit);
        if (createDataIterator instanceof ChangeLogDataIterator) {
            ChangeLogDataIterator changeLogDataIterator = (ChangeLogDataIterator) createDataIterator;
            ChangelogSplit asChangelogSplit = arcticSplit.asChangelogSplit();
            changeLogDataIterator.seek(asChangelogSplit.insertFileOffset(), asChangelogSplit.deleteFileOffset(), asChangelogSplit.insertRecordOffset(), asChangelogSplit.deleteRecordOffset());
        } else {
            createDataIterator.seek(arcticSplit.asSnapshotSplit().insertFileOffset(), arcticSplit.asSnapshotSplit().insertRecordOffset());
        }
        return this.batcher.batch(arcticSplit.splitId(), createDataIterator);
    }
}
