package com.netease.arctic.flink.read;

import com.netease.arctic.flink.read.hybrid.assigner.ShuffleSplitAssigner;
import com.netease.arctic.flink.read.hybrid.enumerator.ArcticSourceEnumState;
import com.netease.arctic.flink.read.hybrid.enumerator.ArcticSourceEnumStateSerializer;
import com.netease.arctic.flink.read.hybrid.enumerator.ArcticSourceEnumerator;
import com.netease.arctic.flink.read.hybrid.enumerator.StaticArcticSourceEnumerator;
import com.netease.arctic.flink.read.hybrid.reader.ArcticSourceReader;
import com.netease.arctic.flink.read.hybrid.reader.ReaderFunction;
import com.netease.arctic.flink.read.hybrid.split.ArcticSplit;
import com.netease.arctic.flink.read.hybrid.split.ArcticSplitSerializer;
import com.netease.arctic.flink.read.source.ArcticScanContext;
import com.netease.arctic.flink.table.ArcticTableLoader;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/netease/arctic/flink/read/ArcticSource.class */
public class ArcticSource<T> implements Source<T, ArcticSplit, ArcticSourceEnumState>, ResultTypeQueryable<T> {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(ArcticSource.class);
    private final ArcticScanContext scanContext;
    private final ReaderFunction<T> readerFunction;
    private final TypeInformation<T> typeInformation;
    private final ArcticTableLoader loader;
    private final String tableName;
    private final boolean dimTable;

    public ArcticSource(ArcticTableLoader arcticTableLoader, ArcticScanContext arcticScanContext, ReaderFunction<T> readerFunction, TypeInformation<T> typeInformation, String str, boolean z) {
        this.loader = arcticTableLoader;
        this.scanContext = arcticScanContext;
        this.readerFunction = readerFunction;
        this.typeInformation = typeInformation;
        this.tableName = str;
        this.dimTable = z;
    }

    public Boundedness getBoundedness() {
        return this.scanContext.isStreaming() ? Boundedness.CONTINUOUS_UNBOUNDED : Boundedness.BOUNDED;
    }

    public SourceReader<T, ArcticSplit> createReader(SourceReaderContext sourceReaderContext) throws Exception {
        return new ArcticSourceReader(this.readerFunction, sourceReaderContext.getConfiguration(), sourceReaderContext, this.dimTable);
    }

    public SplitEnumerator<ArcticSplit, ArcticSourceEnumState> createEnumerator(SplitEnumeratorContext<ArcticSplit> splitEnumeratorContext) throws Exception {
        return createEnumerator(splitEnumeratorContext, null);
    }

    private SplitEnumerator<ArcticSplit, ArcticSourceEnumState> createEnumerator(SplitEnumeratorContext<ArcticSplit> splitEnumeratorContext, ArcticSourceEnumState arcticSourceEnumState) {
        ShuffleSplitAssigner shuffleSplitAssigner;
        if (arcticSourceEnumState == null) {
            shuffleSplitAssigner = new ShuffleSplitAssigner(splitEnumeratorContext);
        } else {
            LOG.info("Arctic source restored {} splits from state for table {}", Integer.valueOf(arcticSourceEnumState.pendingSplits().size()), this.tableName);
            shuffleSplitAssigner = new ShuffleSplitAssigner(splitEnumeratorContext, arcticSourceEnumState.pendingSplits(), arcticSourceEnumState.shuffleSplitRelation());
        }
        return this.scanContext.isStreaming() ? new ArcticSourceEnumerator(splitEnumeratorContext, shuffleSplitAssigner, this.loader, this.scanContext, arcticSourceEnumState, this.dimTable) : new StaticArcticSourceEnumerator(splitEnumeratorContext, shuffleSplitAssigner, this.loader, this.scanContext, null);
    }

    public SplitEnumerator<ArcticSplit, ArcticSourceEnumState> restoreEnumerator(SplitEnumeratorContext<ArcticSplit> splitEnumeratorContext, ArcticSourceEnumState arcticSourceEnumState) throws Exception {
        return createEnumerator(splitEnumeratorContext, arcticSourceEnumState);
    }

    public SimpleVersionedSerializer<ArcticSplit> getSplitSerializer() {
        return new ArcticSplitSerializer();
    }

    public SimpleVersionedSerializer<ArcticSourceEnumState> getEnumeratorCheckpointSerializer() {
        return new ArcticSourceEnumStateSerializer();
    }

    public TypeInformation<T> getProducedType() {
        return this.typeInformation;
    }

    public /* bridge */ /* synthetic */ SplitEnumerator restoreEnumerator(SplitEnumeratorContext splitEnumeratorContext, Object obj) throws Exception {
        return restoreEnumerator((SplitEnumeratorContext<ArcticSplit>) splitEnumeratorContext, (ArcticSourceEnumState) obj);
    }
}
