package com.alibaba.alink.common.comqueue.communication;

import com.alibaba.alink.common.comqueue.ComContext;
import com.alibaba.alink.common.comqueue.CommunicateFunction;
import com.alibaba.alink.common.io.directreader.DefaultDistributedInfo;
import com.alibaba.alink.common.linalg.VectorUtil;
import com.alibaba.alink.common.utils.JsonConverter;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.Iterator;
import java.util.Objects;
import java.util.UUID;
import java.util.function.BiConsumer;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.functions.RichMapPartitionFunction;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/alink/common/comqueue/communication/AllReduce.class */
public class AllReduce extends CommunicateFunction {
    private static final int TRANSFER_BUFFER_SIZE = 4096;
    private static final long serialVersionUID = 2878350590317507159L;
    private final String bufferName;
    private final String lengthName;
    private final SerializableBiConsumer<double[], double[]> op;
    private static final Logger LOG = LoggerFactory.getLogger(AllReduce.class);
    public static final SerializableBiConsumer<double[], double[]> SUM = new SerializableBiConsumer<double[], double[]>() { // from class: com.alibaba.alink.common.comqueue.communication.AllReduce.3
        private static final long serialVersionUID = 1674418885589623933L;

        @Override // java.util.function.BiConsumer
        public void accept(double[] dArr, double[] dArr2) {
            for (int i = 0; i < dArr.length; i++) {
                int i2 = i;
                dArr[i2] = dArr[i2] + dArr2[i];
            }
        }
    };
    public static final SerializableBiConsumer<double[], double[]> MAX = new SerializableBiConsumer<double[], double[]>() { // from class: com.alibaba.alink.common.comqueue.communication.AllReduce.4
        private static final long serialVersionUID = -7642209703460263383L;

        @Override // java.util.function.BiConsumer
        public void accept(double[] dArr, double[] dArr2) {
            for (int i = 0; i < dArr.length; i++) {
                dArr[i] = Math.max(dArr[i], dArr2[i]);
            }
        }
    };
    public static final SerializableBiConsumer<double[], double[]> MIN = new SerializableBiConsumer<double[], double[]>() { // from class: com.alibaba.alink.common.comqueue.communication.AllReduce.5
        private static final long serialVersionUID = -5361253243150270428L;

        @Override // java.util.function.BiConsumer
        public void accept(double[] dArr, double[] dArr2) {
            for (int i = 0; i < dArr.length; i++) {
                dArr[i] = Math.min(dArr[i], dArr2[i]);
            }
        }
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/alibaba/alink/common/comqueue/communication/AllReduce$AllReduceRecv.class */
    public static class AllReduceRecv<T> extends RichMapPartitionFunction<Tuple3<Integer, Integer, double[]>, T> {
        private static final long serialVersionUID = -2596118119896911087L;
        private final String bufferName;
        private final String lengthName;
        private final int sessionId;

        AllReduceRecv(String str, String str2, int i) {
            this.bufferName = str;
            this.lengthName = str2;
            this.sessionId = i;
        }

        public void mapPartition(Iterable<Tuple3<Integer, Integer, double[]>> iterable, Collector<T> collector) throws Exception {
            ComContext comContext = new ComContext(this.sessionId, getIterationRuntimeContext());
            Iterator<Tuple3<Integer, Integer, double[]>> it = iterable.iterator();
            if (it.hasNext()) {
                double[] dArr = (double[]) comContext.getObj(this.bufferName);
                int intValue = this.lengthName != null ? ((Integer) comContext.getObj(this.lengthName)).intValue() : dArr.length;
                int pieces = AllReduce.pieces(intValue);
                do {
                    Tuple3<Integer, Integer, double[]> next = it.next();
                    if (((Integer) next.f1).intValue() == pieces - 1) {
                        System.arraycopy(next.f2, 0, dArr, ((Integer) next.f1).intValue() * AllReduce.TRANSFER_BUFFER_SIZE, AllReduce.lastLen(intValue));
                    } else {
                        System.arraycopy(next.f2, 0, dArr, ((Integer) next.f1).intValue() * AllReduce.TRANSFER_BUFFER_SIZE, AllReduce.TRANSFER_BUFFER_SIZE);
                    }
                } while (it.hasNext());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/alibaba/alink/common/comqueue/communication/AllReduce$AllReduceSend.class */
    public static class AllReduceSend<T> extends RichMapPartitionFunction<T, Tuple3<Integer, Integer, double[]>> {
        private static final long serialVersionUID = 762861369253958025L;
        private final String bufferName;
        private final String lengthName;
        private final String transferBufferName;
        private final int sessionId;

        AllReduceSend(String str, String str2, String str3, int i) {
            this.bufferName = str;
            this.lengthName = str2;
            this.transferBufferName = str3;
            this.sessionId = i;
        }

        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
            AllReduce.LOG.info("taskId: {}, collect open", Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()));
        }

        public void close() throws Exception {
            super.close();
            AllReduce.LOG.info("taskId: {}, collect end", Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()));
        }

        public void mapPartition(Iterable<T> iterable, Collector<Tuple3<Integer, Integer, double[]>> collector) throws Exception {
            double[] dArr;
            ComContext comContext = new ComContext(this.sessionId, getIterationRuntimeContext());
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
            int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
            int superstepNumber = getIterationRuntimeContext().getSuperstepNumber();
            AllReduce.LOG.info("taskId: {}, collect start", Integer.valueOf(indexOfThisSubtask));
            double[] dArr2 = (double[]) comContext.getObj(this.bufferName);
            int intValue = this.lengthName != null ? ((Integer) comContext.getObj(this.lengthName)).intValue() : dArr2.length;
            if (superstepNumber == 1) {
                dArr = new double[AllReduce.TRANSFER_BUFFER_SIZE];
                comContext.putObj(this.transferBufferName, dArr);
            } else {
                dArr = (double[]) comContext.getObj(this.transferBufferName);
            }
            int pieces = AllReduce.pieces(intValue);
            AllReduce.LOG.info("taskId: {}, len: {}, pieces: {}", new Object[]{Integer.valueOf(indexOfThisSubtask), Integer.valueOf(intValue), Integer.valueOf(pieces)});
            DefaultDistributedInfo defaultDistributedInfo = new DefaultDistributedInfo();
            int i = 0;
            for (int i2 = 0; i2 < numberOfParallelSubtasks; i2++) {
                int startPos = (int) defaultDistributedInfo.startPos(i2, numberOfParallelSubtasks, pieces);
                int localRowCnt = (int) defaultDistributedInfo.localRowCnt(i2, numberOfParallelSubtasks, pieces);
                for (int i3 = 0; i3 < localRowCnt; i3++) {
                    int i4 = (startPos + i3) * AllReduce.TRANSFER_BUFFER_SIZE;
                    if (startPos + i3 == pieces - 1) {
                        System.arraycopy(dArr2, i4, dArr, 0, AllReduce.lastLen(intValue));
                    } else {
                        System.arraycopy(dArr2, i4, dArr, 0, AllReduce.TRANSFER_BUFFER_SIZE);
                    }
                    i++;
                    collector.collect(Tuple3.of(Integer.valueOf(i2), Integer.valueOf(startPos + i3), dArr));
                }
            }
            AllReduce.LOG.info("taskId: {}, collect end, agg: {}", Integer.valueOf(indexOfThisSubtask), Integer.valueOf(i));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/alibaba/alink/common/comqueue/communication/AllReduce$AllReduceSum.class */
    public static class AllReduceSum extends RichMapPartitionFunction<Tuple3<Integer, Integer, double[]>, Tuple3<Integer, Integer, double[]>> {
        private static final long serialVersionUID = -6816714426472711188L;
        private final String bufferName;
        private final String lengthName;
        private final int sessionId;
        private final SerializableBiConsumer<double[], double[]> op;

        AllReduceSum(String str, String str2, int i, SerializableBiConsumer<double[], double[]> serializableBiConsumer) {
            this.bufferName = str;
            this.lengthName = str2;
            this.sessionId = i;
            this.op = serializableBiConsumer;
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void mapPartition(Iterable<Tuple3<Integer, Integer, double[]>> iterable, Collector<Tuple3<Integer, Integer, double[]>> collector) {
            ComContext comContext = new ComContext(this.sessionId, getIterationRuntimeContext());
            Iterator<Tuple3<Integer, Integer, double[]>> it = iterable.iterator();
            if (it.hasNext()) {
                int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
                int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
                AllReduce.LOG.info("taskId: {}, AllReduceSum start", Integer.valueOf(indexOfThisSubtask));
                int intValue = this.lengthName != null ? ((Integer) comContext.getObj(this.lengthName)).intValue() : ((double[]) comContext.getObj(this.bufferName)).length;
                AllReduce.LOG.info("taskId: {}, AllReduceSum sendLen: {}", Integer.valueOf(indexOfThisSubtask), Integer.valueOf(intValue));
                int pieces = AllReduce.pieces(intValue);
                DefaultDistributedInfo defaultDistributedInfo = new DefaultDistributedInfo();
                int startPos = (int) defaultDistributedInfo.startPos(indexOfThisSubtask, numberOfParallelSubtasks, pieces);
                int localRowCnt = (int) defaultDistributedInfo.localRowCnt(indexOfThisSubtask, numberOfParallelSubtasks, pieces);
                AllReduce.LOG.info("taskId: {}, AllReduceSum cnt: {}", Integer.valueOf(indexOfThisSubtask), Integer.valueOf(localRowCnt));
                double[] dArr = new double[localRowCnt];
                double[] dArr2 = new double[localRowCnt];
                do {
                    Tuple3<Integer, Integer, double[]> next = it.next();
                    int intValue2 = ((Integer) next.f1).intValue() - startPos;
                    if (dArr[intValue2] == 0) {
                        dArr[intValue2] = (double[]) next.f2;
                        dArr2[intValue2] = dArr2[intValue2] + 1.0d;
                    } else {
                        this.op.accept(dArr[intValue2], next.f2);
                    }
                } while (it.hasNext());
                for (int i = 0; i < numberOfParallelSubtasks; i++) {
                    for (int i2 = 0; i2 < localRowCnt; i2++) {
                        collector.collect(Tuple3.of(Integer.valueOf(i), Integer.valueOf(startPos + i2), dArr[i2]));
                    }
                }
                AllReduce.LOG.info("taskId: {} AllReduceSum agg: {}", Integer.valueOf(indexOfThisSubtask), JsonConverter.toJson(dArr2));
                AllReduce.LOG.info("taskId: {}, AllReduceSum end", Integer.valueOf(indexOfThisSubtask));
            }
        }
    }

    /* loaded from: input_file:com/alibaba/alink/common/comqueue/communication/AllReduce$SerializableBiConsumer.class */
    public interface SerializableBiConsumer<T, U> extends BiConsumer<T, U>, Serializable {
        default SerializableBiConsumer<T, U> andThen(SerializableBiConsumer<? super T, ? super U> serializableBiConsumer) {
            Objects.requireNonNull(serializableBiConsumer);
            return (obj, obj2) -> {
                accept(obj, obj2);
                serializableBiConsumer.accept(obj, obj2);
            };
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case 2040148716:
                    if (implMethodName.equals("lambda$andThen$d59f2f88$1")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case VectorUtil.VectorSerialType.DENSE_VECTOR /* 0 */:
                    if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("com/alibaba/alink/common/comqueue/communication/AllReduce$SerializableBiConsumer") && serializedLambda.getFunctionalInterfaceMethodName().equals("accept") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("com/alibaba/alink/common/comqueue/communication/AllReduce$SerializableBiConsumer") && serializedLambda.getImplMethodSignature().equals("(Lcom/alibaba/alink/common/comqueue/communication/AllReduce$SerializableBiConsumer;Ljava/lang/Object;Ljava/lang/Object;)V")) {
                        SerializableBiConsumer serializableBiConsumer = (SerializableBiConsumer) serializedLambda.getCapturedArg(0);
                        SerializableBiConsumer serializableBiConsumer2 = (SerializableBiConsumer) serializedLambda.getCapturedArg(1);
                        return (obj, obj2) -> {
                            accept(obj, obj2);
                            serializableBiConsumer2.accept(obj, obj2);
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    public AllReduce(String str) {
        this(str, null);
    }

    public AllReduce(String str, String str2) {
        this(str, str2, SUM);
    }

    public AllReduce(String str, String str2, SerializableBiConsumer<double[], double[]> serializableBiConsumer) {
        this.bufferName = str;
        this.lengthName = str2;
        this.op = serializableBiConsumer;
    }

    public static <T> DataSet<T> allReduce(DataSet<T> dataSet, String str, String str2, SerializableBiConsumer<double[], double[]> serializableBiConsumer, int i) {
        return dataSet.mapPartition(new AllReduceSend(str, str2, UUID.randomUUID().toString(), i)).withBroadcastSet(dataSet, "barrier").returns(new TupleTypeInfo(new TypeInformation[]{Types.INT, Types.INT, PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO})).name("AllReduceSend").partitionCustom(new Partitioner<Integer>() { // from class: com.alibaba.alink.common.comqueue.communication.AllReduce.2
            private static final long serialVersionUID = -5584126092583517829L;

            public int partition(Integer num, int i2) {
                return num.intValue();
            }
        }, 0).name("AllReduceBroadcastRaw").mapPartition(new AllReduceSum(str, str2, i, serializableBiConsumer)).returns(new TupleTypeInfo(new TypeInformation[]{Types.INT, Types.INT, PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO})).name("AllReduceSum").partitionCustom(new Partitioner<Integer>() { // from class: com.alibaba.alink.common.comqueue.communication.AllReduce.1
            private static final long serialVersionUID = -2088778990924431340L;

            public int partition(Integer num, int i2) {
                return num.intValue();
            }
        }, 0).name("AllReduceBroadcastSum").mapPartition(new AllReduceRecv(str, str2, i)).returns(dataSet.getType()).name("AllReduceRecv");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static int pieces(int i) {
        int i2 = i / TRANSFER_BUFFER_SIZE;
        return i % TRANSFER_BUFFER_SIZE == 0 ? i2 : i2 + 1;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static int lastLen(int i) {
        int i2 = i % TRANSFER_BUFFER_SIZE;
        return i2 == 0 ? TRANSFER_BUFFER_SIZE : i2;
    }

    @Override // com.alibaba.alink.common.comqueue.CommunicateFunction
    public <T> DataSet<T> communicateWith(DataSet<T> dataSet, int i) {
        return allReduce(dataSet, this.bufferName, this.lengthName, this.op, i);
    }
}
