package com.netease.arctic.flink.read.hybrid.assigner;

import com.netease.arctic.data.DataTreeNode;
import com.netease.arctic.data.PrimaryKeyedFile;
import com.netease.arctic.flink.read.hybrid.split.ArcticSplit;
import com.netease.arctic.flink.read.hybrid.split.ArcticSplitState;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.util.FlinkRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/netease/arctic/flink/read/hybrid/assigner/ShuffleSplitAssigner.class */
public class ShuffleSplitAssigner implements SplitAssigner {
    private static final Logger LOG = LoggerFactory.getLogger(ShuffleSplitAssigner.class);
    private static final long POLL_TIMEOUT = 200;
    private final SplitEnumeratorContext<ArcticSplit> enumeratorContext;
    private int totalParallelism;
    private int totalSplitNum;
    private Long currentMaskOfTreeNode;
    private final Object lock = new Object();
    private final Map<Long, Integer> partitionIndexSubtaskMap = new ConcurrentHashMap();
    private final Map<Integer, PriorityBlockingQueue<ArcticSplit>> subtaskSplitMap = new ConcurrentHashMap();

    public ShuffleSplitAssigner(SplitEnumeratorContext<ArcticSplit> splitEnumeratorContext) {
        this.enumeratorContext = splitEnumeratorContext;
        this.totalParallelism = splitEnumeratorContext.currentParallelism();
    }

    public ShuffleSplitAssigner(SplitEnumeratorContext<ArcticSplit> splitEnumeratorContext, Collection<ArcticSplitState> collection, long[] jArr) {
        this.enumeratorContext = splitEnumeratorContext;
        deserializePartitionIndex(jArr);
        collection.forEach(arcticSplitState -> {
            onDiscoveredSplits(Collections.singleton(arcticSplitState.toSourceSplit()));
        });
    }

    @Override // com.netease.arctic.flink.read.hybrid.assigner.SplitAssigner
    public Optional<ArcticSplit> getNext() {
        throw new UnsupportedOperationException("ShuffleSplitAssigner couldn't support this operation.");
    }

    @Override // com.netease.arctic.flink.read.hybrid.assigner.SplitAssigner
    public Optional<ArcticSplit> getNext(int i) {
        int currentParallelism = this.enumeratorContext.currentParallelism();
        if (this.totalParallelism != currentParallelism) {
            throw new FlinkRuntimeException(String.format("Source parallelism has been changed, before parallelism is %s, now is %s", Integer.valueOf(this.totalParallelism), Integer.valueOf(currentParallelism)));
        }
        if (!this.subtaskSplitMap.containsKey(Integer.valueOf(i))) {
            LOG.info("Subtask {}, it's an idle subtask due to the empty queue with this subtask.", Integer.valueOf(i));
            return Optional.empty();
        }
        ArcticSplit arcticSplit = null;
        try {
            arcticSplit = this.subtaskSplitMap.get(Integer.valueOf(i)).poll(POLL_TIMEOUT, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            LOG.warn("interruptedException", e);
        }
        if (arcticSplit == null) {
            LOG.debug("Subtask {}, couldn't retrieve arctic source split in the queue.", Integer.valueOf(i));
            return Optional.empty();
        }
        LOG.info("get next arctic split taskIndex {}, totalSplitNum {}, arcticSplit {}.", new Object[]{arcticSplit.taskIndex(), Integer.valueOf(this.totalSplitNum), arcticSplit});
        return Optional.of(arcticSplit);
    }

    @Override // com.netease.arctic.flink.read.hybrid.assigner.SplitAssigner
    public void onDiscoveredSplits(Collection<ArcticSplit> collection) {
        collection.forEach(this::putArcticIntoQueue);
        this.totalSplitNum += collection.size();
    }

    @Override // com.netease.arctic.flink.read.hybrid.assigner.SplitAssigner
    public void onUnassignedSplits(Collection<ArcticSplit> collection) {
        onDiscoveredSplits(collection);
    }

    void putArcticIntoQueue(ArcticSplit arcticSplit) {
        int subtaskIdByArcticSplit = getSubtaskIdByArcticSplit(arcticSplit);
        PriorityBlockingQueue<ArcticSplit> orDefault = this.subtaskSplitMap.getOrDefault(Integer.valueOf(subtaskIdByArcticSplit), new PriorityBlockingQueue<>());
        LOG.info("put split into queue: {}", arcticSplit);
        orDefault.add(arcticSplit);
        this.subtaskSplitMap.put(Integer.valueOf(subtaskIdByArcticSplit), orDefault);
    }

    private int getSubtaskIdByArcticSplit(ArcticSplit arcticSplit) {
        PrimaryKeyedFile findAnyFileInArcticSplit = findAnyFileInArcticSplit(arcticSplit);
        int intValue = this.partitionIndexSubtaskMap.computeIfAbsent(Long.valueOf(partitionAndIndexHashCode(findAnyFileInArcticSplit.partition().toString(), arcticSplit)), l -> {
            return Integer.valueOf((this.partitionIndexSubtaskMap.size() + 1) % this.totalParallelism);
        }).intValue();
        LOG.info("partition = {}, index = {}, subtaskId = {}", new Object[]{findAnyFileInArcticSplit.partition().toString(), Long.valueOf(findAnyFileInArcticSplit.node().index()), Integer.valueOf(intValue)});
        return intValue;
    }

    @Override // com.netease.arctic.flink.read.hybrid.assigner.SplitAssigner
    public Collection<ArcticSplitState> state() {
        ArrayList arrayList = new ArrayList();
        this.subtaskSplitMap.forEach((num, priorityBlockingQueue) -> {
            arrayList.addAll((Collection) priorityBlockingQueue.stream().map(ArcticSplitState::new).collect(Collectors.toList()));
        });
        return arrayList;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.subtaskSplitMap.clear();
        this.partitionIndexSubtaskMap.clear();
    }

    public long[] serializePartitionIndex() {
        long[] jArr = new long[(this.partitionIndexSubtaskMap.size() * 2) + 3];
        jArr[0] = this.totalParallelism;
        jArr[1] = this.totalSplitNum;
        jArr[2] = this.currentMaskOfTreeNode == null ? -1L : this.currentMaskOfTreeNode.longValue();
        int i = 3;
        Iterator<Map.Entry<Long, Integer>> it = this.partitionIndexSubtaskMap.entrySet().iterator();
        while (it.hasNext()) {
            int i2 = i;
            int i3 = i + 1;
            jArr[i2] = it.next().getKey().longValue();
            i = i3 + 1;
            jArr[i3] = r0.getValue().intValue();
        }
        return jArr;
    }

    void deserializePartitionIndex(long[] jArr) {
        this.totalParallelism = (int) jArr[0];
        this.totalSplitNum = (int) jArr[1];
        this.currentMaskOfTreeNode = jArr[2] == -1 ? null : Long.valueOf(jArr[2]);
        int i = 3;
        while (i < jArr.length) {
            Map<Long, Integer> map = this.partitionIndexSubtaskMap;
            Long valueOf = Long.valueOf(jArr[i]);
            int i2 = i + 1;
            map.put(valueOf, Integer.valueOf((int) jArr[i2]));
            i = i2 + 1;
        }
    }

    private long partitionAndIndexHashCode(String str, ArcticSplit arcticSplit) {
        return Math.abs(str.hashCode() + getExactlyIndexOfTreeNode(arcticSplit));
    }

    @VisibleForTesting
    public long getExactlyIndexOfTreeNode(ArcticSplit arcticSplit) {
        DataTreeNode dataTreeNode = arcticSplit.dataTreeNode();
        long index = dataTreeNode.index();
        long mask = dataTreeNode.mask();
        synchronized (this.lock) {
            if (this.currentMaskOfTreeNode == null) {
                this.currentMaskOfTreeNode = Long.valueOf(mask);
            }
        }
        boolean z = mask != this.currentMaskOfTreeNode.longValue();
        boolean z2 = mask > this.currentMaskOfTreeNode.longValue();
        long j = mask + 1;
        while (j != this.currentMaskOfTreeNode.longValue() + 1) {
            if (z2) {
                j >>= 1;
                index >>= 1;
            } else {
                j <<= 1;
                index <<= 1;
            }
        }
        if (z) {
            DataTreeNode of = DataTreeNode.of(this.currentMaskOfTreeNode.longValue(), index);
            LOG.info("original dataTreeNode is {}, new dataTreeNode is {}.", dataTreeNode, of);
            arcticSplit.modifyTreeNode(of);
        }
        return index;
    }

    private PrimaryKeyedFile findAnyFileInArcticSplit(ArcticSplit arcticSplit) {
        AtomicReference atomicReference = new AtomicReference();
        if (arcticSplit.isChangelogSplit()) {
            ArrayList arrayList = new ArrayList(arcticSplit.asChangelogSplit().insertTasks());
            arrayList.addAll(arcticSplit.asChangelogSplit().deleteTasks());
            arrayList.stream().findFirst().ifPresent(arcticFileScanTask -> {
                atomicReference.set(arcticFileScanTask.file());
            });
            if (atomicReference.get() != null) {
                return (PrimaryKeyedFile) atomicReference.get();
            }
        }
        new ArrayList(arcticSplit.asSnapshotSplit().insertTasks()).stream().findFirst().ifPresent(arcticFileScanTask2 -> {
            atomicReference.set(arcticFileScanTask2.file());
        });
        if (atomicReference.get() != null) {
            return (PrimaryKeyedFile) atomicReference.get();
        }
        throw new FlinkRuntimeException("Couldn't find a primaryKeyedFile.");
    }
}
