package com.netease.arctic.flink.read.hybrid.enumerator;

import com.netease.arctic.flink.read.hybrid.assigner.SplitAssigner;
import com.netease.arctic.flink.read.hybrid.reader.ReaderStartedEvent;
import com.netease.arctic.flink.read.hybrid.split.ArcticSplit;
import com.netease.arctic.flink.read.hybrid.split.SplitRequestEvent;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/netease/arctic/flink/read/hybrid/enumerator/AbstractArcticEnumerator.class */
public abstract class AbstractArcticEnumerator implements SplitEnumerator<ArcticSplit, ArcticSourceEnumState> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractArcticEnumerator.class);
    private final SplitEnumeratorContext<ArcticSplit> enumeratorContext;
    private final SplitAssigner assigner;
    private final Map<Integer, String> readersAwaitingSplit = new ConcurrentHashMap();
    private final AtomicReference<CompletableFuture<Void>> availableFuture = new AtomicReference<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractArcticEnumerator(SplitEnumeratorContext<ArcticSplit> splitEnumeratorContext, SplitAssigner splitAssigner) {
        this.enumeratorContext = splitEnumeratorContext;
        this.assigner = splitAssigner;
    }

    public void start() {
    }

    public void close() throws IOException {
        this.assigner.close();
    }

    public void handleSplitRequest(int i, @Nullable String str) {
        throw new UnsupportedOperationException(String.format("Received invalid default split request event from subtask %d as Arctic source uses custom split request event", Integer.valueOf(i)));
    }

    public void handleSourceEvent(int i, SourceEvent sourceEvent) {
        if (!(sourceEvent instanceof SplitRequestEvent)) {
            if (!(sourceEvent instanceof ReaderStartedEvent)) {
                throw new IllegalArgumentException(String.format("Received unknown event from subtask %d: %s", Integer.valueOf(i), sourceEvent.getClass().getCanonicalName()));
            }
            LOG.info("Received ReaderStartEvent from subtask {}", Integer.valueOf(i));
        } else {
            SplitRequestEvent splitRequestEvent = (SplitRequestEvent) sourceEvent;
            LOG.info("Received request split event from subtask {}", Integer.valueOf(i));
            this.assigner.onCompletedSplits(splitRequestEvent.finishedSplitIds());
            this.readersAwaitingSplit.put(Integer.valueOf(i), String.valueOf(splitRequestEvent.requesterHostname()));
            assignSplits();
        }
    }

    public void addReader(int i) {
        LOG.info("Added reader: {}", Integer.valueOf(i));
    }

    public void addSplitsBack(List<ArcticSplit> list, int i) {
        LOG.info("addSplitsBack from subtaskId {}, splits {}.", Integer.valueOf(i), list);
        this.assigner.onUnassignedSplits(list);
    }

    protected abstract boolean shouldWaitForMoreSplits();

    /* JADX INFO: Access modifiers changed from: protected */
    public Void assignSplits() {
        Iterator<Map.Entry<Integer, String>> it = this.readersAwaitingSplit.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Integer, String> next = it.next();
            if (this.enumeratorContext.registeredReaders().containsKey(next.getKey())) {
                int intValue = next.getKey().intValue();
                Optional<ArcticSplit> next2 = this.assigner.getNext(intValue);
                if (next2.isPresent()) {
                    ArcticSplit arcticSplit = next2.get();
                    LOG.info("assign a arctic split to subtaskId {}, taskIndex {}, arcticSplit {}.", new Object[]{Integer.valueOf(intValue), arcticSplit.taskIndex(), arcticSplit});
                    this.enumeratorContext.assignSplit(arcticSplit, intValue);
                    it.remove();
                } else if (!shouldWaitForMoreSplits()) {
                    LOG.info("No more splits available for subtask {}", Integer.valueOf(intValue));
                    this.enumeratorContext.signalNoMoreSplits(intValue);
                    it.remove();
                }
            } else {
                it.remove();
            }
        }
        return null;
    }
}
