package com.netease.arctic.flink.read.hybrid.enumerator;

import com.netease.arctic.flink.read.FlinkSplitPlanner;
import com.netease.arctic.flink.read.hybrid.split.ArcticSplit;
import com.netease.arctic.flink.table.ArcticTableLoader;
import com.netease.arctic.flink.util.ArcticUtils;
import com.netease.arctic.table.KeyedTable;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.annotation.Internal;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.TableScan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:com/netease/arctic/flink/read/hybrid/enumerator/ContinuousSplitPlannerImpl.class */
public class ContinuousSplitPlannerImpl implements ContinuousSplitPlanner {
    private transient KeyedTable table;
    private final ArcticTableLoader loader;
    private static final Logger LOG = LoggerFactory.getLogger(ContinuousSplitPlannerImpl.class);
    private static final AtomicInteger splitCount = new AtomicInteger();

    public ContinuousSplitPlannerImpl(ArcticTableLoader arcticTableLoader) {
        this.loader = arcticTableLoader;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
    }

    @Override // com.netease.arctic.flink.read.hybrid.enumerator.ContinuousSplitPlanner
    public ContinuousEnumerationResult planSplits(ArcticEnumeratorOffset arcticEnumeratorOffset) {
        if (this.table == null) {
            this.table = ArcticUtils.loadArcticTable(this.loader).asKeyedTable();
        }
        this.table.refresh();
        return arcticEnumeratorOffset != null ? discoverIncrementalSplits(arcticEnumeratorOffset) : discoverInitialSplits();
    }

    private ContinuousEnumerationResult discoverIncrementalSplits(ArcticEnumeratorOffset arcticEnumeratorOffset) {
        long longValue = arcticEnumeratorOffset.changeSnapshotId().longValue();
        Snapshot currentSnapshot = this.table.changeTable().currentSnapshot();
        if (currentSnapshot == null || currentSnapshot.snapshotId() == longValue) {
            return ContinuousEnumerationResult.EMPTY;
        }
        long snapshotId = currentSnapshot.snapshotId();
        TableScan newScan = this.table.changeTable().newScan();
        if (longValue != Long.MIN_VALUE) {
            newScan = newScan.appendsBetween(longValue, snapshotId);
        }
        return new ContinuousEnumerationResult(FlinkSplitPlanner.planChangeTable(newScan, splitCount), arcticEnumeratorOffset, ArcticEnumeratorOffset.of(snapshotId, null));
    }

    private ContinuousEnumerationResult discoverInitialSplits() {
        Snapshot currentSnapshot = this.table.changeTable().currentSnapshot();
        List<ArcticSplit> planFullTable = FlinkSplitPlanner.planFullTable(this.table, splitCount);
        long snapshotId = currentSnapshot != null ? currentSnapshot.snapshotId() : Long.MIN_VALUE;
        if (currentSnapshot != null || !CollectionUtils.isEmpty(planFullTable)) {
            return new ContinuousEnumerationResult(planFullTable, null, ArcticEnumeratorOffset.of(snapshotId, null));
        }
        LOG.info("There have no change snapshot, and no base splits in table: {}.", this.table);
        return ContinuousEnumerationResult.EMPTY;
    }
}
