package com.alibaba.alink.operator.batch.graph;

import com.alibaba.alink.common.MLEnvironmentFactory;
import com.alibaba.alink.common.annotation.InputPorts;
import com.alibaba.alink.common.annotation.NameCn;
import com.alibaba.alink.common.annotation.NameEn;
import com.alibaba.alink.common.annotation.OutputPorts;
import com.alibaba.alink.common.annotation.ParamSelectColumnSpec;
import com.alibaba.alink.common.annotation.ParamSelectColumnSpecs;
import com.alibaba.alink.common.annotation.PortDesc;
import com.alibaba.alink.common.annotation.PortSpec;
import com.alibaba.alink.common.annotation.PortType;
import com.alibaba.alink.common.annotation.TypeCollections;
import com.alibaba.alink.common.comqueue.IterTaskObjKeeper;
import com.alibaba.alink.common.exceptions.AkIllegalStateException;
import com.alibaba.alink.common.exceptions.AkPreconditions;
import com.alibaba.alink.common.exceptions.AkUnclassifiedErrorException;
import com.alibaba.alink.common.linalg.VectorUtil;
import com.alibaba.alink.common.utils.TableUtil;
import com.alibaba.alink.operator.batch.BatchOperator;
import com.alibaba.alink.operator.batch.graph.RandomWalkBatchOp;
import com.alibaba.alink.operator.batch.graph.storage.GraphEdge;
import com.alibaba.alink.operator.batch.graph.storage.HomoGraphEngine;
import com.alibaba.alink.operator.batch.graph.utils.ComputeGraphStatistics;
import com.alibaba.alink.operator.batch.graph.utils.ConstructHomoEdge;
import com.alibaba.alink.operator.batch.graph.utils.EndWritingRandomWalks;
import com.alibaba.alink.operator.batch.graph.utils.GraphPartition;
import com.alibaba.alink.operator.batch.graph.utils.GraphStatistics;
import com.alibaba.alink.operator.batch.graph.utils.IDMappingUtils;
import com.alibaba.alink.operator.batch.graph.utils.LongArrayToRow;
import com.alibaba.alink.operator.batch.graph.utils.ParseGraphData;
import com.alibaba.alink.operator.batch.graph.utils.RandomWalkMemoryBuffer;
import com.alibaba.alink.operator.batch.graph.utils.ReadFromBufferAndRemoveStaticObject;
import com.alibaba.alink.operator.batch.graph.utils.RecvRequestKeySelector;
import com.alibaba.alink.operator.batch.graph.utils.SendRequestKeySelector;
import com.alibaba.alink.operator.batch.graph.walkpath.Node2VecWalkPathEngine;
import com.alibaba.alink.params.nlp.Node2VecParams;
import com.alibaba.alink.params.nlp.walk.Node2VecWalkParams;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapPartitionFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.operators.Operator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.flink.util.NumberSequenceIterator;

@InputPorts(values = {@PortSpec(value = PortType.DATA, desc = PortDesc.GRAPH)})
@OutputPorts(values = {@PortSpec(value = PortType.DATA, desc = PortDesc.OUTPUT_RESULT)})
@ParamSelectColumnSpecs({@ParamSelectColumnSpec(name = "sourceCol", portIndices = {VectorUtil.VectorSerialType.DENSE_VECTOR}, allowedTypeCollections = {TypeCollections.INT_LONG_TYPES, TypeCollections.STRING_TYPES}), @ParamSelectColumnSpec(name = "targetCol", portIndices = {VectorUtil.VectorSerialType.DENSE_VECTOR}, allowedTypeCollections = {TypeCollections.INT_LONG_TYPES, TypeCollections.STRING_TYPES}), @ParamSelectColumnSpec(name = "weightCol", portIndices = {VectorUtil.VectorSerialType.DENSE_VECTOR}, allowedTypeCollections = {TypeCollections.NUMERIC_TYPES})})
@NameCn("Node2Vec游走")
@NameEn("Node2Vec Walk")
/* loaded from: input_file:com/alibaba/alink/operator/batch/graph/Node2VecWalkBatchOp.class */
public final class Node2VecWalkBatchOp extends BatchOperator<Node2VecWalkBatchOp> implements Node2VecWalkParams<Node2VecWalkBatchOp> {
    public static final String PATH_COL_NAME = "path";
    public static final String GRAPH_STATISTICS = "graphStatistics";
    private static final long serialVersionUID = 5772364018494433734L;
    static final long PREV_IN_CUR_NEIGHBOR = -2021;
    static final long PREV_NOT_IN_CUR_NEIGHBOR = -2022;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/alibaba/alink/operator/batch/graph/Node2VecWalkBatchOp$CacheGraphAndRandomWalk.class */
    public static class CacheGraphAndRandomWalk extends RichMapPartitionFunction<GraphEdge, Node2VecCommunicationUnit> {
        long graphStorageHandler;
        long randomWalkStorageHandler;
        long walkWriteBufferHandler;
        long walkReadBufferHandler;
        int numWalkPerVertex;
        int walkLen;
        boolean isWeighted;
        String samplingMethod;
        GraphPartition.GraphPartitionFunction graphPartitionFunction;

        public CacheGraphAndRandomWalk(long j, long j2, long j3, long j4, int i, int i2, boolean z, String str, GraphPartition.GraphPartitionFunction graphPartitionFunction) {
            this.graphStorageHandler = j;
            this.randomWalkStorageHandler = j2;
            this.walkWriteBufferHandler = j3;
            this.walkReadBufferHandler = j4;
            this.numWalkPerVertex = i;
            this.walkLen = i2;
            this.isWeighted = z;
            this.samplingMethod = str;
            this.graphPartitionFunction = graphPartitionFunction;
        }

        public void mapPartition(Iterable<GraphEdge> iterable, Collector<Node2VecCommunicationUnit> collector) throws Exception {
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
            int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
            int superstepNumber = getIterationRuntimeContext().getSuperstepNumber();
            if (superstepNumber != 1) {
                if (superstepNumber == 2) {
                    List<Node2VecCommunicationUnit> broadcastVariable = getRuntimeContext().getBroadcastVariable("loop");
                    HashMap hashMap = new HashMap(broadcastVariable.size());
                    for (Node2VecCommunicationUnit node2VecCommunicationUnit : broadcastVariable) {
                        hashMap.put(Integer.valueOf(node2VecCommunicationUnit.getSrcPartitionId()), Integer.valueOf(node2VecCommunicationUnit.getDstPartitionId()));
                    }
                    HomoGraphEngine homoGraphEngine = (HomoGraphEngine) IterTaskObjKeeper.get(this.graphStorageHandler, indexOfThisSubtask);
                    AkPreconditions.checkNotNull(homoGraphEngine, "the graph engine is null");
                    homoGraphEngine.setLogicalWorkerIdToPhysicalWorkerId(hashMap);
                    return;
                }
                return;
            }
            GraphStatistics graphStatistics = null;
            Iterator it = getRuntimeContext().getBroadcastVariable("graphStatistics").iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                GraphStatistics graphStatistics2 = (GraphStatistics) it.next();
                if (graphStatistics2.getPartitionId() == indexOfThisSubtask) {
                    graphStatistics = graphStatistics2;
                    break;
                }
            }
            AkPreconditions.checkNotNull(graphStatistics, "The statistics is null.");
            HomoGraphEngine homoGraphEngine2 = new HomoGraphEngine(iterable, graphStatistics.getVertexNum(), graphStatistics.getEdgeNum(), this.isWeighted, this.samplingMethod.equalsIgnoreCase("ALIAS"));
            IterTaskObjKeeper.put(this.graphStorageHandler, indexOfThisSubtask, homoGraphEngine2);
            IterTaskObjKeeper.put(this.randomWalkStorageHandler, indexOfThisSubtask, new Node2VecWalkPathEngine(Math.min((16777216 / (this.numWalkPerVertex * this.walkLen)) / 2, graphStatistics.getVertexNum()), this.numWalkPerVertex, this.walkLen, homoGraphEngine2.getAllSrcVertices()));
            RandomWalkMemoryBuffer randomWalkMemoryBuffer = new RandomWalkMemoryBuffer(67108864 / ((this.walkLen * 8) + 16));
            IterTaskObjKeeper.put(this.walkWriteBufferHandler, indexOfThisSubtask, randomWalkMemoryBuffer);
            IterTaskObjKeeper.put(this.walkReadBufferHandler, indexOfThisSubtask, randomWalkMemoryBuffer);
            if (homoGraphEngine2.getAllSrcVertices().hasNext()) {
                collector.collect(new Node2VecCommunicationUnit(this.graphPartitionFunction.apply(homoGraphEngine2.getAllSrcVertices().next().longValue(), numberOfParallelSubtasks), indexOfThisSubtask, null, null, null, null));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/alibaba/alink/operator/batch/graph/Node2VecWalkBatchOp$DoRemoteProcessing.class */
    public static class DoRemoteProcessing extends RichMapPartitionFunction<Node2VecCommunicationUnit, Node2VecCommunicationUnit> {
        long graphStorageHandler;

        public DoRemoteProcessing(long j) {
            this.graphStorageHandler = j;
        }

        public void mapPartition(Iterable<Node2VecCommunicationUnit> iterable, Collector<Node2VecCommunicationUnit> collector) throws Exception {
            if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
                Iterator<Node2VecCommunicationUnit> it = iterable.iterator();
                while (it.hasNext()) {
                    collector.collect(it.next());
                }
                return;
            }
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
            HomoGraphEngine homoGraphEngine = (HomoGraphEngine) IterTaskObjKeeper.get(this.graphStorageHandler, indexOfThisSubtask);
            AkPreconditions.checkNotNull(homoGraphEngine, "homoGraphEngine is null");
            for (Node2VecCommunicationUnit node2VecCommunicationUnit : iterable) {
                AkPreconditions.checkState(node2VecCommunicationUnit.getDstPartitionId() == indexOfThisSubtask, "The target task id is incorrect. It should be " + node2VecCommunicationUnit.getDstPartitionId() + ", but it is " + indexOfThisSubtask);
                Long[] requestedVertexIds = node2VecCommunicationUnit.getRequestedVertexIds();
                Long[] prevVertexIdsOrContainsPrevVertexIds = node2VecCommunicationUnit.getPrevVertexIdsOrContainsPrevVertexIds();
                for (int i = 0; i < requestedVertexIds.length; i++) {
                    if (homoGraphEngine.containsVertex(requestedVertexIds[i].longValue())) {
                        switch (node2VecCommunicationUnit.getMessageTypes()[i]) {
                            case GET_NUM_OF_NEIGHBORS:
                                requestedVertexIds[i] = Long.valueOf(homoGraphEngine.getNumNeighbors(requestedVertexIds[i].longValue()));
                                break;
                            case SAMPLE_A_NEIGHBOR:
                                requestedVertexIds[i] = Long.valueOf(homoGraphEngine.sampleOneNeighbor(requestedVertexIds[i].longValue()));
                                break;
                            case CHECK_NEXT_NODE_NEIGHBOR_CONTAINS_PREV_VERTEX:
                                if (homoGraphEngine.containsEdge(requestedVertexIds[i].longValue(), prevVertexIdsOrContainsPrevVertexIds[i].longValue())) {
                                    prevVertexIdsOrContainsPrevVertexIds[i] = Long.valueOf(Node2VecWalkBatchOp.PREV_IN_CUR_NEIGHBOR);
                                    break;
                                } else {
                                    prevVertexIdsOrContainsPrevVertexIds[i] = Long.valueOf(Node2VecWalkBatchOp.PREV_NOT_IN_CUR_NEIGHBOR);
                                    break;
                                }
                            default:
                                throw new AkIllegalStateException("Illegal state here: Remote state must be one of [GET_NUM_OF_NEIGHBORS, SAMPLE_A_NEIGHBOR and CHECK_NEXT_NODE_NEIGHBOR_CONTAINS_PREV_VERTEX]");
                        }
                    } else {
                        requestedVertexIds[i] = -1L;
                    }
                }
                collector.collect(node2VecCommunicationUnit);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/alibaba/alink/operator/batch/graph/Node2VecWalkBatchOp$GetMessageToSend.class */
    public static class GetMessageToSend extends RichMapPartitionFunction<Node2VecCommunicationUnit, Node2VecCommunicationUnit> {
        long graphStorageHandler;
        long randomWalkStorageHandler;
        long walkWriteBufferHandler;
        GraphPartition.GraphPartitionFunction graphPartitionFunction;

        public GetMessageToSend(long j, long j2, long j3, GraphPartition.GraphPartitionFunction graphPartitionFunction) {
            this.graphStorageHandler = j;
            this.randomWalkStorageHandler = j2;
            this.walkWriteBufferHandler = j3;
            this.graphPartitionFunction = graphPartitionFunction;
        }

        public void mapPartition(Iterable<Node2VecCommunicationUnit> iterable, Collector<Node2VecCommunicationUnit> collector) throws Exception {
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
            int superstepNumber = getIterationRuntimeContext().getSuperstepNumber();
            int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
            if (superstepNumber == 1) {
                Iterator<Node2VecCommunicationUnit> it = iterable.iterator();
                while (it.hasNext()) {
                    collector.collect(it.next());
                }
                return;
            }
            for (Node2VecCommunicationUnit node2VecCommunicationUnit : iterable) {
            }
            HomoGraphEngine homoGraphEngine = (HomoGraphEngine) IterTaskObjKeeper.get(this.graphStorageHandler, indexOfThisSubtask);
            Node2VecWalkPathEngine node2VecWalkPathEngine = (Node2VecWalkPathEngine) IterTaskObjKeeper.get(this.randomWalkStorageHandler, indexOfThisSubtask);
            RandomWalkMemoryBuffer randomWalkMemoryBuffer = (RandomWalkMemoryBuffer) IterTaskObjKeeper.get(this.walkWriteBufferHandler, indexOfThisSubtask);
            AkPreconditions.checkNotNull(homoGraphEngine, "homoGraphEngine is null.");
            AkPreconditions.checkNotNull(node2VecWalkPathEngine, "node2VecWalkPathEngine is null");
            AkPreconditions.checkNotNull(randomWalkMemoryBuffer, "randomWalkMemoryBuffer is null");
            long[] nextBatchOfVerticesToSampleFrom = node2VecWalkPathEngine.getNextBatchOfVerticesToSampleFrom();
            for (int i = 0; i < nextBatchOfVerticesToSampleFrom.length; i++) {
                if (node2VecWalkPathEngine.canOutput(i)) {
                    long[] oneWalkAndAddNewWalk = node2VecWalkPathEngine.getOneWalkAndAddNewWalk(i);
                    nextBatchOfVerticesToSampleFrom[i] = node2VecWalkPathEngine.getNextVertexToSampleFrom(i);
                    randomWalkMemoryBuffer.writeOneWalk(oneWalkAndAddNewWalk);
                }
            }
            for (int i2 = 0; i2 < nextBatchOfVerticesToSampleFrom.length; i2++) {
                long j = nextBatchOfVerticesToSampleFrom[i2];
                if (j != -1 && node2VecWalkPathEngine.getPrevVertex(i2) == -1) {
                    long sampleOneNeighbor = homoGraphEngine.sampleOneNeighbor(j);
                    node2VecWalkPathEngine.updatePath(i2, sampleOneNeighbor);
                    nextBatchOfVerticesToSampleFrom[i2] = sampleOneNeighbor;
                    node2VecWalkPathEngine.setNode2VecState(i2, Node2VecState.INIT);
                    node2VecWalkPathEngine.setNode2VecState(i2, Node2VecState.GET_NUM_OF_NEIGHBORS);
                }
            }
            ArrayList[] arrayListArr = new ArrayList[numberOfParallelSubtasks];
            ArrayList[] arrayListArr2 = new ArrayList[numberOfParallelSubtasks];
            ArrayList[] arrayListArr3 = new ArrayList[numberOfParallelSubtasks];
            ArrayList[] arrayListArr4 = new ArrayList[numberOfParallelSubtasks];
            for (int i3 = 0; i3 < numberOfParallelSubtasks; i3++) {
                arrayListArr[i3] = new ArrayList();
                arrayListArr2[i3] = new ArrayList();
                arrayListArr3[i3] = new ArrayList();
                arrayListArr4[i3] = new ArrayList();
            }
            for (int i4 = 0; i4 < nextBatchOfVerticesToSampleFrom.length; i4++) {
                if (nextBatchOfVerticesToSampleFrom[i4] != -1) {
                    int physicalWorkerIdByLogicalWorkerId = homoGraphEngine.getPhysicalWorkerIdByLogicalWorkerId(this.graphPartitionFunction.apply(nextBatchOfVerticesToSampleFrom[i4], numberOfParallelSubtasks));
                    switch (node2VecWalkPathEngine.getNode2VecState(i4)) {
                        case GET_NUM_OF_NEIGHBORS:
                            arrayListArr[physicalWorkerIdByLogicalWorkerId].add(Long.valueOf(nextBatchOfVerticesToSampleFrom[i4]));
                            arrayListArr2[physicalWorkerIdByLogicalWorkerId].add(Integer.valueOf(i4));
                            arrayListArr3[physicalWorkerIdByLogicalWorkerId].add(Node2VecState.GET_NUM_OF_NEIGHBORS);
                            arrayListArr4[physicalWorkerIdByLogicalWorkerId].add(Long.valueOf(node2VecWalkPathEngine.getPrevVertex(i4)));
                            break;
                        case SAMPLE_A_NEIGHBOR:
                            arrayListArr[physicalWorkerIdByLogicalWorkerId].add(Long.valueOf(nextBatchOfVerticesToSampleFrom[i4]));
                            arrayListArr2[physicalWorkerIdByLogicalWorkerId].add(Integer.valueOf(i4));
                            arrayListArr3[physicalWorkerIdByLogicalWorkerId].add(Node2VecState.SAMPLE_A_NEIGHBOR);
                            arrayListArr4[physicalWorkerIdByLogicalWorkerId].add(Long.valueOf(node2VecWalkPathEngine.getPrevVertex(i4)));
                            break;
                        case CHECK_NEXT_NODE_NEIGHBOR_CONTAINS_PREV_VERTEX:
                            long sampledNeighbor = node2VecWalkPathEngine.getSampledNeighbor(i4);
                            int physicalWorkerIdByLogicalWorkerId2 = homoGraphEngine.getPhysicalWorkerIdByLogicalWorkerId(this.graphPartitionFunction.apply(sampledNeighbor, numberOfParallelSubtasks));
                            arrayListArr[physicalWorkerIdByLogicalWorkerId2].add(Long.valueOf(sampledNeighbor));
                            arrayListArr2[physicalWorkerIdByLogicalWorkerId2].add(Integer.valueOf(i4));
                            arrayListArr3[physicalWorkerIdByLogicalWorkerId2].add(Node2VecState.CHECK_NEXT_NODE_NEIGHBOR_CONTAINS_PREV_VERTEX);
                            arrayListArr4[physicalWorkerIdByLogicalWorkerId2].add(Long.valueOf(node2VecWalkPathEngine.getPrevVertex(i4)));
                            break;
                        default:
                            throw new AkUnclassifiedErrorException("Illegal state here: Remote state must be one of [GET_NUM_OF_NEIGHBORS, SAMPLE_A_NEIGHBOR and CHECK_NEXT_NODE_NEIGHBOR_CONTAINS_PREV_VERTEX]");
                    }
                }
            }
            for (int i5 = 0; i5 < numberOfParallelSubtasks; i5++) {
                if (arrayListArr2[i5].size() != 0) {
                    collector.collect(new Node2VecCommunicationUnit(indexOfThisSubtask, i5, (Integer[]) arrayListArr2[i5].toArray(new Integer[0]), (Long[]) arrayListArr[i5].toArray(new Long[0]), (Node2VecState[]) arrayListArr3[i5].toArray(new Node2VecState[0]), (Long[]) arrayListArr4[i5].toArray(new Long[0])));
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/alibaba/alink/operator/batch/graph/Node2VecWalkBatchOp$HandleReceivedMessage.class */
    public static class HandleReceivedMessage extends RichMapPartitionFunction<Node2VecCommunicationUnit, Node2VecCommunicationUnit> {
        long randomWalkStorageHandler;
        double invP;
        double invQ;
        Random random = new Random(2021);

        public HandleReceivedMessage(long j, double d, double d2) {
            this.randomWalkStorageHandler = j;
            this.invP = d;
            this.invQ = d2;
        }

        public void mapPartition(Iterable<Node2VecCommunicationUnit> iterable, Collector<Node2VecCommunicationUnit> collector) throws Exception {
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
            if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
                Iterator<Node2VecCommunicationUnit> it = iterable.iterator();
                while (it.hasNext()) {
                    collector.collect(it.next());
                }
                return;
            }
            Node2VecWalkPathEngine node2VecWalkPathEngine = (Node2VecWalkPathEngine) IterTaskObjKeeper.get(this.randomWalkStorageHandler, indexOfThisSubtask);
            AkPreconditions.checkNotNull(node2VecWalkPathEngine, "node2VecWalkPathEngine is null");
            for (Node2VecCommunicationUnit node2VecCommunicationUnit : iterable) {
                int srcPartitionId = node2VecCommunicationUnit.getSrcPartitionId();
                AkPreconditions.checkState(srcPartitionId == indexOfThisSubtask, "The target task id is incorrect. It should be " + srcPartitionId + ", but it is " + indexOfThisSubtask);
                Long[] requestedVertexIds = node2VecCommunicationUnit.getRequestedVertexIds();
                Integer[] walkIds = node2VecCommunicationUnit.getWalkIds();
                node2VecCommunicationUnit.getMessageTypes();
                Long[] prevVertexIdsOrContainsPrevVertexIds = node2VecCommunicationUnit.getPrevVertexIdsOrContainsPrevVertexIds();
                for (int i = 0; i < requestedVertexIds.length; i++) {
                    int intValue = walkIds[i].intValue();
                    switch (r0[i]) {
                        case GET_NUM_OF_NEIGHBORS:
                            int longValue = (int) requestedVertexIds[i].longValue();
                            if (longValue == 0) {
                                node2VecWalkPathEngine.updatePath(intValue, -1L);
                                node2VecWalkPathEngine.setNode2VecState(intValue, Node2VecState.FINISHED);
                                node2VecWalkPathEngine.setNode2VecState(intValue, Node2VecState.INIT);
                                node2VecWalkPathEngine.setNode2VecState(intValue, Node2VecState.GET_NUM_OF_NEIGHBORS);
                                break;
                            } else {
                                node2VecWalkPathEngine.setRejectionState(intValue, Node2VecWalkBatchOp.rejectionSample(this.invP, this.invQ, longValue));
                                node2VecWalkPathEngine.setProb(intValue, this.random.nextDouble() * node2VecWalkPathEngine.getUpperBound(intValue));
                                node2VecWalkPathEngine.setNode2VecState(intValue, Node2VecState.SAMPLE_A_NEIGHBOR);
                                break;
                            }
                        case SAMPLE_A_NEIGHBOR:
                            long longValue2 = requestedVertexIds[i].longValue();
                            double prob = node2VecWalkPathEngine.getProb(intValue);
                            double shatter = node2VecWalkPathEngine.getShatter(intValue);
                            double upperBound = node2VecWalkPathEngine.getUpperBound(intValue);
                            double lowerBound = node2VecWalkPathEngine.getLowerBound(intValue);
                            long prevVertex = node2VecWalkPathEngine.getPrevVertex(intValue);
                            if ((prob + shatter >= upperBound && longValue2 == prevVertex) || prob < lowerBound || (prob < this.invP && longValue2 == prevVertex)) {
                                node2VecWalkPathEngine.updatePath(intValue, longValue2);
                                node2VecWalkPathEngine.setNode2VecState(intValue, Node2VecState.FINISHED);
                                node2VecWalkPathEngine.setNode2VecState(intValue, Node2VecState.INIT);
                                node2VecWalkPathEngine.setNode2VecState(intValue, Node2VecState.GET_NUM_OF_NEIGHBORS);
                                break;
                            } else {
                                node2VecWalkPathEngine.setSampledNeighbor(intValue, longValue2);
                                node2VecWalkPathEngine.setNode2VecState(intValue, Node2VecState.CHECK_NEXT_NODE_NEIGHBOR_CONTAINS_PREV_VERTEX);
                                break;
                            }
                        case CHECK_NEXT_NODE_NEIGHBOR_CONTAINS_PREV_VERTEX:
                            if (((prevVertexIdsOrContainsPrevVertexIds[i].longValue() > Node2VecWalkBatchOp.PREV_IN_CUR_NEIGHBOR ? 1 : (prevVertexIdsOrContainsPrevVertexIds[i].longValue() == Node2VecWalkBatchOp.PREV_IN_CUR_NEIGHBOR ? 0 : -1)) == 0 ? 1.0d : this.invQ) > node2VecWalkPathEngine.getProb(intValue)) {
                                node2VecWalkPathEngine.updatePath(intValue, requestedVertexIds[i].longValue());
                                node2VecWalkPathEngine.setNode2VecState(intValue, Node2VecState.FINISHED);
                                node2VecWalkPathEngine.setNode2VecState(intValue, Node2VecState.INIT);
                                node2VecWalkPathEngine.setNode2VecState(intValue, Node2VecState.GET_NUM_OF_NEIGHBORS);
                                break;
                            } else {
                                node2VecWalkPathEngine.setProb(intValue, this.random.nextDouble() * node2VecWalkPathEngine.getUpperBound(intValue));
                                node2VecWalkPathEngine.setNode2VecState(intValue, Node2VecState.SAMPLE_A_NEIGHBOR);
                                break;
                            }
                        default:
                            throw new AkUnclassifiedErrorException("Illegal state here: Remote state must be one of [GET_NUM_OF_NEIGHBORS, SAMPLE_A_NEIGHBOR and CHECK_NEXT_NODE_NEIGHBOR_CONTAINS_PREV_VERTEX]");
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/alibaba/alink/operator/batch/graph/Node2VecWalkBatchOp$Node2VecCommunicationUnit.class */
    public static class Node2VecCommunicationUnit extends RandomWalkBatchOp.RandomWalkCommunicationUnit implements Serializable {
        Long[] prevVertexIdsOrContainsPrevVertexIds;
        Node2VecState[] messageTypes;

        public Node2VecCommunicationUnit(int i, int i2, Integer[] numArr, Long[] lArr, Node2VecState[] node2VecStateArr, Long[] lArr2) {
            super(i, i2, numArr, lArr);
            this.messageTypes = node2VecStateArr;
            this.prevVertexIdsOrContainsPrevVertexIds = lArr2;
        }

        public Node2VecState[] getMessageTypes() {
            return this.messageTypes;
        }

        public Long[] getPrevVertexIdsOrContainsPrevVertexIds() {
            return this.prevVertexIdsOrContainsPrevVertexIds;
        }
    }

    /* loaded from: input_file:com/alibaba/alink/operator/batch/graph/Node2VecWalkBatchOp$Node2VecState.class */
    public enum Node2VecState {
        INIT,
        GET_NUM_OF_NEIGHBORS,
        LOOP_START,
        SAMPLE_A_NEIGHBOR,
        CHECK_NEXT_NODE_NEIGHBOR_CONTAINS_PREV_VERTEX,
        FINISHED
    }

    public Node2VecWalkBatchOp() {
        super(new Params());
    }

    public Node2VecWalkBatchOp(Params params) {
        super(params);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.alibaba.alink.operator.batch.BatchOperator
    public Node2VecWalkBatchOp linkFrom(BatchOperator<?>... batchOperatorArr) {
        DataSet<Row> dataSet;
        BatchOperator<?> checkAndGetFirst = checkAndGetFirst(batchOperatorArr);
        double doubleValue = 1.0d / ((Double) getParams().get(Node2VecParams.P)).doubleValue();
        double doubleValue2 = 1.0d / ((Double) getParams().get(Node2VecParams.Q)).doubleValue();
        Integer walkNum = getWalkNum();
        Integer walkLength = getWalkLength();
        String delimiter = getDelimiter();
        String sourceCol = getSourceCol();
        String targetCol = getTargetCol();
        String weightCol = getWeightCol();
        Boolean isToUndigraph = getIsToUndigraph();
        int findColIndexWithAssertAndHint = TableUtil.findColIndexWithAssertAndHint(checkAndGetFirst.getColNames(), sourceCol);
        int findColIndexWithAssertAndHint2 = TableUtil.findColIndexWithAssertAndHint(checkAndGetFirst.getColNames(), targetCol);
        int findColIndexWithAssertAndHint3 = weightCol == null ? -1 : TableUtil.findColIndexWithAssertAndHint(checkAndGetFirst.getColNames(), weightCol);
        BasicTypeInfo basicTypeInfo = checkAndGetFirst.getColTypes()[findColIndexWithAssertAndHint];
        BasicTypeInfo basicTypeInfo2 = checkAndGetFirst.getColTypes()[findColIndexWithAssertAndHint2];
        DataSet<Tuple2<String, Long>> dataSet2 = null;
        boolean z = ((basicTypeInfo == BasicTypeInfo.LONG_TYPE_INFO && basicTypeInfo2 == BasicTypeInfo.LONG_TYPE_INFO) || (basicTypeInfo == BasicTypeInfo.INT_TYPE_INFO && basicTypeInfo2 == BasicTypeInfo.INT_TYPE_INFO)) ? false : true;
        if (z) {
            dataSet2 = IDMappingUtils.computeIdMapping(checkAndGetFirst.getDataSet(), new int[]{findColIndexWithAssertAndHint, findColIndexWithAssertAndHint2});
            dataSet = IDMappingUtils.mapDataSetWithIdMapping(checkAndGetFirst.getDataSet(), dataSet2, new int[]{findColIndexWithAssertAndHint, findColIndexWithAssertAndHint2});
        } else {
            dataSet = checkAndGetFirst.getDataSet();
        }
        GraphPartition.GraphPartitionHashFunction graphPartitionHashFunction = new GraphPartition.GraphPartitionHashFunction();
        boolean z2 = weightCol != null;
        Operator name = dataSet.flatMap(new ParseGraphData(findColIndexWithAssertAndHint, findColIndexWithAssertAndHint2, findColIndexWithAssertAndHint3, isToUndigraph.booleanValue())).partitionCustom(new GraphPartition.GraphPartitioner(graphPartitionHashFunction), 0).sortPartition(0, Order.ASCENDING).sortPartition(1, Order.ASCENDING).map(new ConstructHomoEdge()).name("parsedAndPartitionedAndSortedGraph");
        Operator name2 = name.mapPartition(new ComputeGraphStatistics()).name("graphStatistics");
        long newHandle = IterTaskObjKeeper.getNewHandle();
        long newHandle2 = IterTaskObjKeeper.getNewHandle();
        long newHandle3 = IterTaskObjKeeper.getNewHandle();
        long newHandle4 = IterTaskObjKeeper.getNewHandle();
        ExecutionEnvironment executionEnvironment = MLEnvironmentFactory.get(getMLEnvironmentId()).getExecutionEnvironment();
        IterativeDataSet name3 = executionEnvironment.fromParallelCollection(new NumberSequenceIterator(1L, executionEnvironment.getParallelism()), BasicTypeInfo.LONG_TYPE_INFO).map(new MapFunction<Long, Node2VecCommunicationUnit>() { // from class: com.alibaba.alink.operator.batch.graph.Node2VecWalkBatchOp.1
            public Node2VecCommunicationUnit map(Long l) {
                return new Node2VecCommunicationUnit(1, 1, null, null, null, null);
            }
        }).name("initData").iterate(Integer.MAX_VALUE).name("loop");
        Operator name4 = name.mapPartition(new CacheGraphAndRandomWalk(newHandle, newHandle2, newHandle3, newHandle4, walkNum.intValue(), walkLength.intValue(), z2, getSamplingMethod(), graphPartitionHashFunction)).withBroadcastSet(name2, "graphStatistics").withBroadcastSet(name3, "loop").name("cachedGraphAndRandomWalk").mapPartition(new GetMessageToSend(newHandle, newHandle2, newHandle3, graphPartitionHashFunction)).name("sendCommunicationUnit");
        DataSet closeWith = name3.closeWith(name4.partitionCustom(new GraphPartition.GraphPartitioner(graphPartitionHashFunction), new SendRequestKeySelector()).mapPartition(new DoRemoteProcessing(newHandle)).partitionCustom(new GraphPartition.GraphPartitioner(graphPartitionHashFunction), new RecvRequestKeySelector()).name("recvCommunicationUnit").mapPartition(new HandleReceivedMessage(newHandle2, doubleValue, doubleValue2)).name("finishedOneStep"), name4.map(new MapFunction<Node2VecCommunicationUnit, Object>() { // from class: com.alibaba.alink.operator.batch.graph.Node2VecWalkBatchOp.2
            public Object map(Node2VecCommunicationUnit node2VecCommunicationUnit) {
                return new Object();
            }
        }).name("termination"));
        Operator name5 = closeWith.mapPartition(new EndWritingRandomWalks(newHandle3)).name("emptyOut").withBroadcastSet(closeWith, "output").union(executionEnvironment.fromParallelCollection(new NumberSequenceIterator(0L, executionEnvironment.getParallelism()), BasicTypeInfo.LONG_TYPE_INFO).mapPartition(new ReadFromBufferAndRemoveStaticObject(newHandle, newHandle2, newHandle4, delimiter)).name("memoryOut")).name("mergedOutput");
        setOutput(z ? IDMappingUtils.mapWalkToStringWithIdMapping(name5, dataSet2, walkLength.intValue(), delimiter) : name5.map(new LongArrayToRow(delimiter)), new TableSchema(new String[]{"path"}, new TypeInformation[]{Types.STRING}));
        return this;
    }

    static Tuple3<Double, Double, Double> rejectionSample(double d, double d2, int i) {
        double max = Math.max(1.0d, Math.max(d, d2));
        double min = Math.min(1.0d, Math.min(d, d2));
        double d3 = 0.0d;
        double max2 = Math.max(1.0d, d2);
        if (d > max2) {
            d3 = max2 / i;
            max = max2 + d3;
        }
        return Tuple3.of(Double.valueOf(max), Double.valueOf(min), Double.valueOf(d3));
    }

    @Override // com.alibaba.alink.operator.batch.BatchOperator
    public /* bridge */ /* synthetic */ Node2VecWalkBatchOp linkFrom(BatchOperator[] batchOperatorArr) {
        return linkFrom((BatchOperator<?>[]) batchOperatorArr);
    }
}
