package com.netease.arctic.flink.read.hybrid.enumerator;

import com.netease.arctic.flink.read.hybrid.assigner.ShuffleSplitAssigner;
import com.netease.arctic.flink.read.hybrid.assigner.SplitAssigner;
import com.netease.arctic.flink.read.hybrid.reader.ReaderStartedEvent;
import com.netease.arctic.flink.read.hybrid.split.ArcticSplit;
import com.netease.arctic.flink.read.hybrid.split.SplitRequestEvent;
import com.netease.arctic.flink.read.hybrid.split.TemporalJoinSplits;
import com.netease.arctic.flink.read.source.ArcticScanContext;
import com.netease.arctic.flink.table.ArcticTableLoader;
import com.netease.arctic.flink.table.descriptors.ArcticValidator;
import com.netease.arctic.flink.util.ArcticUtils;
import com.netease.arctic.table.KeyedTable;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.iceberg.Snapshot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/netease/arctic/flink/read/hybrid/enumerator/ArcticSourceEnumerator.class */
public class ArcticSourceEnumerator extends AbstractArcticEnumerator {
    private static final Logger LOG = LoggerFactory.getLogger(ArcticSourceEnumerator.class);
    private transient KeyedTable keyedTable;
    private volatile transient TemporalJoinSplits temporalJoinSplits;
    private final ArcticTableLoader loader;
    private final SplitEnumeratorContext<ArcticSplit> context;
    private final ContinuousSplitPlanner continuousSplitPlanner;
    private final SplitAssigner splitAssigner;
    private final ArcticScanContext scanContext;
    private final long snapshotDiscoveryIntervalMs;
    private final boolean dimTable;
    private volatile boolean sourceEventBeforeFirstPlan;
    private final AtomicReference<ArcticEnumeratorOffset> enumeratorPosition;
    private final AtomicBoolean lock;

    public ArcticSourceEnumerator(SplitEnumeratorContext<ArcticSplit> splitEnumeratorContext, SplitAssigner splitAssigner, ArcticTableLoader arcticTableLoader, ArcticScanContext arcticScanContext, @Nullable ArcticSourceEnumState arcticSourceEnumState, boolean z) {
        super(splitEnumeratorContext, splitAssigner);
        this.temporalJoinSplits = null;
        this.sourceEventBeforeFirstPlan = false;
        this.lock = new AtomicBoolean(false);
        this.loader = arcticTableLoader;
        this.context = splitEnumeratorContext;
        this.splitAssigner = splitAssigner;
        this.scanContext = arcticScanContext;
        this.continuousSplitPlanner = new ContinuousSplitPlannerImpl(arcticTableLoader);
        this.snapshotDiscoveryIntervalMs = arcticScanContext.monitorInterval().toMillis();
        this.enumeratorPosition = new AtomicReference<>();
        if (arcticSourceEnumState != null) {
            this.enumeratorPosition.set(arcticSourceEnumState.lastEnumeratedOffset());
            this.temporalJoinSplits = arcticSourceEnumState.temporalJoinSplits();
        }
        this.dimTable = z;
        LOG.info("dimTable: {}", Boolean.valueOf(z));
    }

    @Override // com.netease.arctic.flink.read.hybrid.enumerator.AbstractArcticEnumerator
    public void start() {
        if (this.keyedTable == null) {
            this.keyedTable = ArcticUtils.loadArcticTable(this.loader).asKeyedTable();
        }
        if (this.enumeratorPosition.get() == null && ArcticValidator.SCAN_STARTUP_MODE_LATEST.equalsIgnoreCase(this.scanContext.scanStartupMode())) {
            this.keyedTable.refresh();
            Snapshot currentSnapshot = this.keyedTable.changeTable().currentSnapshot();
            long snapshotId = currentSnapshot == null ? Long.MIN_VALUE : currentSnapshot.snapshotId();
            this.enumeratorPosition.set(ArcticEnumeratorOffset.of(snapshotId, null));
            LOG.info("{} is {}, the current snapshot id of the change table {}  is {}.", new Object[]{ArcticValidator.SCAN_STARTUP_MODE.key(), ArcticValidator.SCAN_STARTUP_MODE_LATEST, this.keyedTable.id(), Long.valueOf(snapshotId)});
        }
        if (this.snapshotDiscoveryIntervalMs > 0) {
            LOG.info("Starting the ArcticSourceEnumerator with arctic table {} snapshot discovery interval of {} ms.", this.keyedTable, Long.valueOf(this.snapshotDiscoveryIntervalMs));
            this.context.callAsync(this::planSplits, this::handleResultOfSplits, 0L, this.snapshotDiscoveryIntervalMs);
            this.context.callAsync(this::assignSplits, (r5, th) -> {
                if (th != null) {
                    throw new FlinkRuntimeException("Failed to assign arctic split due to ", th);
                }
            }, 1000L, 500L);
        }
    }

    private ContinuousEnumerationResult planSplits() {
        ContinuousEnumerationResult doPlanSplits = doPlanSplits();
        if (this.dimTable && this.temporalJoinSplits == null) {
            this.temporalJoinSplits = new TemporalJoinSplits(doPlanSplits.splits(), this.context.metricGroup());
            if (doPlanSplits.isEmpty() && this.sourceEventBeforeFirstPlan) {
                notifyReaders();
            }
        }
        return doPlanSplits;
    }

    private ContinuousEnumerationResult doPlanSplits() {
        if (this.lock.get()) {
            LOG.info("prefix plan splits thread haven't finished.");
            return ContinuousEnumerationResult.EMPTY;
        }
        this.lock.set(true);
        LOG.info("begin to plan splits current offset {}.", this.enumeratorPosition.get());
        return this.continuousSplitPlanner.planSplits(this.enumeratorPosition.get());
    }

    private void handleResultOfSplits(ContinuousEnumerationResult continuousEnumerationResult, Throwable th) {
        if (th != null) {
            this.lock.set(false);
            throw new FlinkRuntimeException("Failed to scan arctic table due to ", th);
        }
        if (!continuousEnumerationResult.isEmpty()) {
            this.splitAssigner.onDiscoveredSplits(continuousEnumerationResult.splits());
            this.enumeratorPosition.set(continuousEnumerationResult.toOffset());
        }
        LOG.info("handled result of splits, discover splits size {}, latest offset {}.", Integer.valueOf(continuousEnumerationResult.splits().size()), this.enumeratorPosition.get());
        this.lock.set(false);
    }

    @Override // com.netease.arctic.flink.read.hybrid.enumerator.AbstractArcticEnumerator
    public void handleSourceEvent(int i, SourceEvent sourceEvent) {
        super.handleSourceEvent(i, sourceEvent);
        if (sourceEvent instanceof SplitRequestEvent) {
            Collection<String> finishedSplitIds = ((SplitRequestEvent) sourceEvent).finishedSplitIds();
            if (this.dimTable) {
                checkAndNotifyReader(finishedSplitIds);
                return;
            }
            return;
        }
        if (!(sourceEvent instanceof ReaderStartedEvent)) {
            throw new IllegalArgumentException(String.format("Received unknown event from subtask %d: %s", Integer.valueOf(i), sourceEvent.getClass().getCanonicalName()));
        }
        if (this.dimTable && this.temporalJoinSplits != null && this.temporalJoinSplits.hasNotifiedReader()) {
            this.sourceEventBeforeFirstPlan = true;
            LOG.info("send InitializationFinishedEvent to reader again.");
            this.context.sendEventToSourceReader(i, InitializationFinishedEvent.INSTANCE);
        }
    }

    public void checkAndNotifyReader(Collection<String> collection) {
        if (this.temporalJoinSplits == null) {
            this.sourceEventBeforeFirstPlan = true;
        } else {
            if (this.temporalJoinSplits.hasNotifiedReader() || !this.temporalJoinSplits.removeAndReturnIfAllFinished(collection)) {
                return;
            }
            notifyReaders();
        }
    }

    private void notifyReaders() {
        LOG.info("all splits finished, send events to readers");
        IntStream.range(0, this.context.currentParallelism()).forEach(i -> {
            this.context.sendEventToSourceReader(i, InitializationFinishedEvent.INSTANCE);
        });
        this.temporalJoinSplits.clear();
        this.temporalJoinSplits.notifyReader();
    }

    /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
    public ArcticSourceEnumState m26snapshotState() throws Exception {
        long[] jArr = null;
        if (this.splitAssigner instanceof ShuffleSplitAssigner) {
            jArr = ((ShuffleSplitAssigner) this.splitAssigner).serializePartitionIndex();
        }
        return new ArcticSourceEnumState(this.splitAssigner.state(), this.enumeratorPosition.get(), jArr, this.temporalJoinSplits);
    }

    @Override // com.netease.arctic.flink.read.hybrid.enumerator.AbstractArcticEnumerator
    public void close() throws IOException {
        this.continuousSplitPlanner.close();
        this.splitAssigner.close();
        super.close();
    }

    @Override // com.netease.arctic.flink.read.hybrid.enumerator.AbstractArcticEnumerator
    protected boolean shouldWaitForMoreSplits() {
        return true;
    }
}
