package com.netease.arctic.flink.shuffle;

import com.netease.arctic.data.DataTreeNode;
import com.netease.arctic.table.DistributionHashMode;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/netease/arctic/flink/shuffle/RoundRobinShuffleRulePolicy.class */
public class RoundRobinShuffleRulePolicy implements ShuffleRulePolicy<RowData, ShuffleKey> {
    private static final Logger LOG = LoggerFactory.getLogger(RoundRobinShuffleRulePolicy.class);
    private final ShuffleHelper helper;
    private final int downStreamOperatorParallelism;
    private final int fileSplit;
    private int factor;
    private Map<Integer, Set<DataTreeNode>> subtaskTreeNodes;
    private final DistributionHashMode distributionHashMode;

    /* loaded from: input_file:com/netease/arctic/flink/shuffle/RoundRobinShuffleRulePolicy$PrimaryKeySelector.class */
    static class PrimaryKeySelector implements KeySelector<RowData, ShuffleKey> {
        PrimaryKeySelector() {
        }

        public ShuffleKey getKey(RowData rowData) throws Exception {
            return new ShuffleKey(rowData);
        }
    }

    /* loaded from: input_file:com/netease/arctic/flink/shuffle/RoundRobinShuffleRulePolicy$RoundRobinPartitioner.class */
    static class RoundRobinPartitioner implements Partitioner<ShuffleKey> {
        private final int downStreamOperatorParallelism;
        private final int factor;
        private final ShuffleHelper helper;
        private final DistributionHashMode distributionHashMode;

        RoundRobinPartitioner(int i, int i2, DistributionHashMode distributionHashMode, ShuffleHelper shuffleHelper) {
            this.downStreamOperatorParallelism = i;
            this.factor = i2;
            this.distributionHashMode = distributionHashMode;
            this.helper = shuffleHelper;
        }

        public int partition(ShuffleKey shuffleKey, int i) {
            if (this.helper != null) {
                this.helper.open();
            }
            Preconditions.checkNotNull(shuffleKey);
            RowData rowData = (RowData) Preconditions.checkNotNull(shuffleKey.getRow());
            Preconditions.checkArgument(i == this.downStreamOperatorParallelism, String.format("shuffle arctic record numPartition:%s is diff with writer parallelism:%s.", Integer.valueOf(i), Integer.valueOf(this.downStreamOperatorParallelism)));
            Integer num = null;
            if (this.distributionHashMode.isSupportPrimaryKey()) {
                num = Integer.valueOf((int) (this.helper.hashKeyValue(rowData) % this.factor));
            }
            Integer num2 = null;
            if (this.distributionHashMode.isSupportPartition()) {
                num2 = Integer.valueOf(this.helper.hashPartitionValue(rowData));
            }
            if (num != null && num2 != null) {
                return Math.abs(Objects.hash(num, num2)) % i;
            }
            if (num != null) {
                return num.intValue() % i;
            }
            if (num2 != null) {
                return num2.intValue() % i;
            }
            return 0;
        }
    }

    public RoundRobinShuffleRulePolicy(int i, int i2) {
        this(null, i, i2);
    }

    public RoundRobinShuffleRulePolicy(ShuffleHelper shuffleHelper, int i, int i2) {
        this(shuffleHelper, i, i2, DistributionHashMode.autoSelect(shuffleHelper.isPrimaryKeyExist(), shuffleHelper.isPartitionKeyExist()));
    }

    public RoundRobinShuffleRulePolicy(ShuffleHelper shuffleHelper, int i, int i2, DistributionHashMode distributionHashMode) {
        this.factor = -1;
        this.helper = shuffleHelper;
        this.downStreamOperatorParallelism = i;
        this.fileSplit = i2;
        this.distributionHashMode = distributionHashMode;
        org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkArgument(distributionHashMode != DistributionHashMode.NONE);
        org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkArgument(distributionHashMode != DistributionHashMode.AUTO);
    }

    @Override // com.netease.arctic.flink.shuffle.ShuffleRulePolicy
    public KeySelector<RowData, ShuffleKey> generateKeySelector() {
        return new PrimaryKeySelector();
    }

    @Override // com.netease.arctic.flink.shuffle.ShuffleRulePolicy
    public Partitioner<ShuffleKey> generatePartitioner() {
        getSubtaskTreeNodes();
        return new RoundRobinPartitioner(this.downStreamOperatorParallelism, this.factor, this.distributionHashMode, this.helper);
    }

    @Override // com.netease.arctic.flink.shuffle.ShuffleRulePolicy
    public DistributionHashMode getPolicyType() {
        return this.distributionHashMode;
    }

    @Override // com.netease.arctic.flink.shuffle.ShuffleRulePolicy
    public Map<Integer, Set<DataTreeNode>> getSubtaskTreeNodes() {
        if (this.subtaskTreeNodes != null) {
            return this.subtaskTreeNodes;
        }
        this.subtaskTreeNodes = initSubtaskFactorMap(this.downStreamOperatorParallelism);
        return this.subtaskTreeNodes;
    }

    private Map<Integer, Set<DataTreeNode>> initSubtaskFactorMap(int i) {
        HashMap hashMap = new HashMap(i);
        if (this.distributionHashMode.isSupportPrimaryKey()) {
            this.factor = this.fileSplit;
            if (this.distributionHashMode.isSupportPartition()) {
                IntStream.range(0, i).forEach(i2 -> {
                    hashMap.put(Integer.valueOf(i2), IntStream.range(0, this.factor).mapToObj(i2 -> {
                        return DataTreeNode.of(this.factor - 1, i2);
                    }).collect(Collectors.toSet()));
                });
            } else {
                if (this.factor < i) {
                    this.factor = (int) Math.pow(2.0d, getActualDepth(i) - 1);
                }
                int i3 = this.factor - 1;
                IntStream.range(0, this.factor).forEach(i4 -> {
                    int subtaskId = getSubtaskId(i4, i);
                    if (hashMap.containsKey(Integer.valueOf(subtaskId))) {
                        ((Set) hashMap.get(Integer.valueOf(subtaskId))).add(DataTreeNode.of(i3, i4));
                        return;
                    }
                    HashSet hashSet = new HashSet();
                    hashSet.add(DataTreeNode.of(i3, i4));
                    hashMap.put(Integer.valueOf(subtaskId), hashSet);
                });
            }
        } else {
            IntStream.range(0, i).forEach(i5 -> {
                hashMap.put(Integer.valueOf(i5), Sets.newHashSet(new DataTreeNode[]{DataTreeNode.of(0L, 0L)}));
            });
        }
        hashMap.forEach((num, set) -> {
            LOG.info("subtaskId={}, treeNodes={}.", num, set);
        });
        return hashMap;
    }

    private static int getActualDepth(int i) {
        return ((int) Math.ceil(Math.log(i) / Math.log(2.0d))) + 1;
    }

    private static int getSubtaskId(int i, int i2) {
        return i % i2;
    }
}
