package com.alibaba.alink.operator.batch.graph;

import com.alibaba.alink.common.MLEnvironmentFactory;
import com.alibaba.alink.common.annotation.InputPorts;
import com.alibaba.alink.common.annotation.NameCn;
import com.alibaba.alink.common.annotation.NameEn;
import com.alibaba.alink.common.annotation.OutputPorts;
import com.alibaba.alink.common.annotation.ParamSelectColumnSpec;
import com.alibaba.alink.common.annotation.ParamSelectColumnSpecs;
import com.alibaba.alink.common.annotation.PortDesc;
import com.alibaba.alink.common.annotation.PortSpec;
import com.alibaba.alink.common.annotation.PortType;
import com.alibaba.alink.common.annotation.TypeCollections;
import com.alibaba.alink.common.comqueue.IterTaskObjKeeper;
import com.alibaba.alink.common.exceptions.AkPreconditions;
import com.alibaba.alink.common.linalg.VectorUtil;
import com.alibaba.alink.common.utils.TableUtil;
import com.alibaba.alink.operator.batch.BatchOperator;
import com.alibaba.alink.operator.batch.graph.RandomWalkBatchOp;
import com.alibaba.alink.operator.batch.graph.storage.GraphEdge;
import com.alibaba.alink.operator.batch.graph.storage.HeteGraphEngine;
import com.alibaba.alink.operator.batch.graph.utils.ComputeGraphStatistics;
import com.alibaba.alink.operator.batch.graph.utils.ConstructHeteEdge;
import com.alibaba.alink.operator.batch.graph.utils.EndWritingRandomWalks;
import com.alibaba.alink.operator.batch.graph.utils.GraphPartition;
import com.alibaba.alink.operator.batch.graph.utils.GraphStatistics;
import com.alibaba.alink.operator.batch.graph.utils.HandleReceivedMessage;
import com.alibaba.alink.operator.batch.graph.utils.IDMappingUtils;
import com.alibaba.alink.operator.batch.graph.utils.LongArrayToRow;
import com.alibaba.alink.operator.batch.graph.utils.ParseGraphData;
import com.alibaba.alink.operator.batch.graph.utils.RandomWalkMemoryBuffer;
import com.alibaba.alink.operator.batch.graph.utils.ReadFromBufferAndRemoveStaticObject;
import com.alibaba.alink.operator.batch.graph.utils.RecvRequestKeySelector;
import com.alibaba.alink.operator.batch.graph.utils.SendRequestKeySelector;
import com.alibaba.alink.operator.batch.graph.walkpath.MetaPathWalkPathEngine;
import com.alibaba.alink.params.nlp.walk.MetaPathWalkParams;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapPartitionFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.operators.Operator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.flink.util.NumberSequenceIterator;

@InputPorts(values = {@PortSpec(value = PortType.DATA, desc = PortDesc.GRAPH), @PortSpec(value = PortType.DATA, desc = PortDesc.NODE_TYPE)})
@OutputPorts(values = {@PortSpec(PortType.DATA)})
@ParamSelectColumnSpecs({@ParamSelectColumnSpec(name = "sourceCol", portIndices = {VectorUtil.VectorSerialType.DENSE_VECTOR}, allowedTypeCollections = {TypeCollections.INT_LONG_TYPES, TypeCollections.STRING_TYPES}), @ParamSelectColumnSpec(name = "targetCol", portIndices = {VectorUtil.VectorSerialType.DENSE_VECTOR}, allowedTypeCollections = {TypeCollections.INT_LONG_TYPES, TypeCollections.STRING_TYPES}), @ParamSelectColumnSpec(name = "weightCol", portIndices = {VectorUtil.VectorSerialType.DENSE_VECTOR}, allowedTypeCollections = {TypeCollections.NUMERIC_TYPES}), @ParamSelectColumnSpec(name = "vertexCol", portIndices = {1}, allowedTypeCollections = {TypeCollections.INT_LONG_TYPES, TypeCollections.STRING_TYPES}), @ParamSelectColumnSpec(name = "typeCol", portIndices = {1}, allowedTypeCollections = {TypeCollections.STRING_TYPES})})
@NameCn("MetaPath游走")
@NameEn("MetaPath Walk")
/* loaded from: input_file:com/alibaba/alink/operator/batch/graph/MetaPathWalkBatchOp.class */
public class MetaPathWalkBatchOp extends BatchOperator<MetaPathWalkBatchOp> implements MetaPathWalkParams<MetaPathWalkBatchOp> {
    public static final String PATH_COL_NAME = "path";
    public static final String GRAPH_STATISTICS = "graphStatistics";
    private static final long serialVersionUID = -4645013343119976771L;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/alibaba/alink/operator/batch/graph/MetaPathWalkBatchOp$CacheGraphAndRandomWalk.class */
    public static class CacheGraphAndRandomWalk extends RichMapPartitionFunction<GraphEdge, MetaPathCommunicationUnit> {
        long graphStorageHandler;
        long randomWalkStorageHandler;
        long walkWriteBufferHandler;
        long walkReadBufferHandler;
        int numWalkPerVertex;
        int walkLen;
        boolean isWeighted;
        String samplingMethod;
        List<char[]> metaPaths;
        GraphPartition.GraphPartitionFunction graphPartitionFunction;

        public CacheGraphAndRandomWalk(long j, long j2, long j3, long j4, int i, int i2, boolean z, String str, List<char[]> list, GraphPartition.GraphPartitionFunction graphPartitionFunction) {
            this.graphStorageHandler = j;
            this.randomWalkStorageHandler = j2;
            this.walkWriteBufferHandler = j3;
            this.walkReadBufferHandler = j4;
            this.numWalkPerVertex = i;
            this.walkLen = i2;
            this.isWeighted = z;
            this.samplingMethod = str;
            this.metaPaths = list;
            this.graphPartitionFunction = graphPartitionFunction;
        }

        public void mapPartition(Iterable<GraphEdge> iterable, Collector<MetaPathCommunicationUnit> collector) throws Exception {
            int superstepNumber = getIterationRuntimeContext().getSuperstepNumber();
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
            int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
            if (superstepNumber != 1) {
                if (superstepNumber == 2) {
                    List<MetaPathCommunicationUnit> broadcastVariable = getRuntimeContext().getBroadcastVariable("loop");
                    HashMap hashMap = new HashMap(broadcastVariable.size());
                    for (MetaPathCommunicationUnit metaPathCommunicationUnit : broadcastVariable) {
                        hashMap.put(Integer.valueOf(metaPathCommunicationUnit.getSrcPartitionId()), Integer.valueOf(metaPathCommunicationUnit.getDstPartitionId()));
                    }
                    HeteGraphEngine heteGraphEngine = (HeteGraphEngine) IterTaskObjKeeper.get(this.graphStorageHandler, indexOfThisSubtask);
                    AkPreconditions.checkNotNull(heteGraphEngine, "heteGraphEngine is null");
                    heteGraphEngine.setLogicalWorkerIdToPhysicalWorkerId(hashMap);
                    return;
                }
                return;
            }
            GraphStatistics graphStatistics = null;
            Iterator it = getRuntimeContext().getBroadcastVariable("graphStatistics").iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                GraphStatistics graphStatistics2 = (GraphStatistics) it.next();
                if (graphStatistics2.getPartitionId() == indexOfThisSubtask) {
                    graphStatistics = graphStatistics2;
                    break;
                }
            }
            HeteGraphEngine heteGraphEngine2 = new HeteGraphEngine(iterable, graphStatistics.getVertexNum(), graphStatistics.getEdgeNum(), graphStatistics.getNodeTypes(), this.isWeighted, this.samplingMethod.equalsIgnoreCase("ALIAS"));
            IterTaskObjKeeper.put(this.graphStorageHandler, indexOfThisSubtask, heteGraphEngine2);
            IterTaskObjKeeper.put(this.randomWalkStorageHandler, indexOfThisSubtask, new MetaPathWalkPathEngine(Math.min(16777216 / (this.numWalkPerVertex * this.walkLen), graphStatistics.getVertexNum()), this.numWalkPerVertex, this.walkLen, heteGraphEngine2.getAllSrcVerticesWithTypes(), this.metaPaths));
            RandomWalkMemoryBuffer randomWalkMemoryBuffer = new RandomWalkMemoryBuffer(67108864 / ((this.walkLen * 8) + 16));
            IterTaskObjKeeper.put(this.walkWriteBufferHandler, indexOfThisSubtask, randomWalkMemoryBuffer);
            IterTaskObjKeeper.put(this.walkReadBufferHandler, indexOfThisSubtask, randomWalkMemoryBuffer);
            Iterator<Long> allSrcVertices = heteGraphEngine2.getAllSrcVertices();
            if (allSrcVertices.hasNext()) {
                collector.collect(new MetaPathCommunicationUnit(this.graphPartitionFunction.apply(allSrcVertices.next().longValue(), numberOfParallelSubtasks), indexOfThisSubtask, null, null, null));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/alibaba/alink/operator/batch/graph/MetaPathWalkBatchOp$ConcatDstNodeType.class */
    public static class ConcatDstNodeType implements CoGroupFunction<Tuple3<Long, Long, Double>, Tuple2<Long, Character>, Tuple4<Long, Long, Double, Character>> {
        ConcatDstNodeType() {
        }

        public void coGroup(Iterable<Tuple3<Long, Long, Double>> iterable, Iterable<Tuple2<Long, Character>> iterable2, Collector<Tuple4<Long, Long, Double, Character>> collector) throws Exception {
            Tuple2<Long, Character> next = iterable2.iterator().next();
            for (Tuple3<Long, Long, Double> tuple3 : iterable) {
                collector.collect(Tuple4.of(tuple3.f0, tuple3.f1, tuple3.f2, next.f1));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/alibaba/alink/operator/batch/graph/MetaPathWalkBatchOp$ConcatSrcNodeType.class */
    public static class ConcatSrcNodeType implements CoGroupFunction<Tuple4<Long, Long, Double, Character>, Tuple2<Long, Character>, Tuple5<Long, Long, Double, Character, Character>> {
        ConcatSrcNodeType() {
        }

        public void coGroup(Iterable<Tuple4<Long, Long, Double, Character>> iterable, Iterable<Tuple2<Long, Character>> iterable2, Collector<Tuple5<Long, Long, Double, Character, Character>> collector) throws Exception {
            Tuple2<Long, Character> next = iterable2.iterator().next();
            for (Tuple4<Long, Long, Double, Character> tuple4 : iterable) {
                collector.collect(Tuple5.of(tuple4.f0, tuple4.f1, tuple4.f2, next.f1, tuple4.f3));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/alibaba/alink/operator/batch/graph/MetaPathWalkBatchOp$DoRemoteProcessing.class */
    public static class DoRemoteProcessing extends RichMapPartitionFunction<MetaPathCommunicationUnit, MetaPathCommunicationUnit> {
        long graphStorageHandler;

        public DoRemoteProcessing(long j) {
            this.graphStorageHandler = j;
        }

        public void mapPartition(Iterable<MetaPathCommunicationUnit> iterable, Collector<MetaPathCommunicationUnit> collector) throws Exception {
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
            if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
                Iterator<MetaPathCommunicationUnit> it = iterable.iterator();
                while (it.hasNext()) {
                    collector.collect(it.next());
                }
                return;
            }
            HeteGraphEngine heteGraphEngine = (HeteGraphEngine) IterTaskObjKeeper.get(this.graphStorageHandler, indexOfThisSubtask);
            AkPreconditions.checkNotNull(heteGraphEngine, "heteGraphEngine is null");
            for (MetaPathCommunicationUnit metaPathCommunicationUnit : iterable) {
                AkPreconditions.checkState(metaPathCommunicationUnit.getDstPartitionId() == indexOfThisSubtask, "The task id is incorrect. It should be " + metaPathCommunicationUnit.getDstPartitionId() + " but is is " + indexOfThisSubtask);
                Long[] requestedVertexIds = metaPathCommunicationUnit.getRequestedVertexIds();
                Character[] vertexTypes = metaPathCommunicationUnit.getVertexTypes();
                for (int i = 0; i < requestedVertexIds.length; i++) {
                    if (heteGraphEngine.containsVertex(requestedVertexIds[i].longValue())) {
                        requestedVertexIds[i] = Long.valueOf(heteGraphEngine.sampleOneNeighbor(requestedVertexIds[i].longValue(), vertexTypes[i].charValue()));
                    } else {
                        requestedVertexIds[i] = -1L;
                    }
                }
                metaPathCommunicationUnit.deleteVertexTypes();
                collector.collect(metaPathCommunicationUnit);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/alibaba/alink/operator/batch/graph/MetaPathWalkBatchOp$GetMessageToSend.class */
    public static class GetMessageToSend extends RichMapPartitionFunction<MetaPathCommunicationUnit, MetaPathCommunicationUnit> {
        long graphStorageHandler;
        long randomWalkStorageHandler;
        long walkWriteBufferHandler;
        GraphPartition.GraphPartitionFunction graphPartitionFunction;

        public GetMessageToSend(long j, long j2, long j3, GraphPartition.GraphPartitionFunction graphPartitionFunction) {
            this.graphStorageHandler = j;
            this.randomWalkStorageHandler = j2;
            this.walkWriteBufferHandler = j3;
            this.graphPartitionFunction = graphPartitionFunction;
        }

        public void mapPartition(Iterable<MetaPathCommunicationUnit> iterable, Collector<MetaPathCommunicationUnit> collector) throws Exception {
            Character ch;
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
            if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
                Iterator<MetaPathCommunicationUnit> it = iterable.iterator();
                while (it.hasNext()) {
                    collector.collect(it.next());
                }
                return;
            }
            for (MetaPathCommunicationUnit metaPathCommunicationUnit : iterable) {
            }
            HeteGraphEngine heteGraphEngine = (HeteGraphEngine) IterTaskObjKeeper.get(this.graphStorageHandler, indexOfThisSubtask);
            MetaPathWalkPathEngine metaPathWalkPathEngine = (MetaPathWalkPathEngine) IterTaskObjKeeper.get(this.randomWalkStorageHandler, indexOfThisSubtask);
            RandomWalkMemoryBuffer randomWalkMemoryBuffer = (RandomWalkMemoryBuffer) IterTaskObjKeeper.get(this.walkWriteBufferHandler, indexOfThisSubtask);
            AkPreconditions.checkNotNull(heteGraphEngine, "heteGraphEngine is null");
            AkPreconditions.checkNotNull(metaPathWalkPathEngine, "heteWalkPath is null");
            AkPreconditions.checkNotNull(randomWalkMemoryBuffer, "randomWalkMemoryBuffer is null");
            Tuple2<long[], Character[]> nextBatchOfVerticesToSampleFrom = metaPathWalkPathEngine.getNextBatchOfVerticesToSampleFrom();
            for (int i = 0; i < ((long[]) nextBatchOfVerticesToSampleFrom.f0).length; i++) {
                if (metaPathWalkPathEngine.canOutput(i)) {
                    long[] oneWalkAndAddNewWalk = metaPathWalkPathEngine.getOneWalkAndAddNewWalk(i);
                    Tuple2<Long, Character> nextVertexToSampleFrom = metaPathWalkPathEngine.getNextVertexToSampleFrom(i);
                    if (null == nextVertexToSampleFrom) {
                        ((long[]) nextBatchOfVerticesToSampleFrom.f0)[i] = -1;
                        ((Character[]) nextBatchOfVerticesToSampleFrom.f1)[i] = null;
                    } else {
                        ((long[]) nextBatchOfVerticesToSampleFrom.f0)[i] = ((Long) nextVertexToSampleFrom.f0).longValue();
                        ((Character[]) nextBatchOfVerticesToSampleFrom.f1)[i] = (Character) nextVertexToSampleFrom.f1;
                    }
                    randomWalkMemoryBuffer.writeOneWalk(oneWalkAndAddNewWalk);
                }
                long j = ((long[]) nextBatchOfVerticesToSampleFrom.f0)[i];
                Character ch2 = ((Character[]) nextBatchOfVerticesToSampleFrom.f1)[i];
                while (true) {
                    ch = ch2;
                    if (heteGraphEngine.containsVertex(j)) {
                        metaPathWalkPathEngine.updatePath(i, heteGraphEngine.sampleOneNeighbor(j, ch.charValue()));
                        if (metaPathWalkPathEngine.canOutput(i)) {
                            randomWalkMemoryBuffer.writeOneWalk(metaPathWalkPathEngine.getOneWalkAndAddNewWalk(i));
                        }
                        Tuple2<Long, Character> nextVertexToSampleFrom2 = metaPathWalkPathEngine.getNextVertexToSampleFrom(i);
                        if (null != nextVertexToSampleFrom2) {
                            j = ((Long) nextVertexToSampleFrom2.f0).longValue();
                            ch2 = (Character) nextVertexToSampleFrom2.f1;
                        } else {
                            j = -1;
                            ch2 = null;
                        }
                    }
                }
                ((long[]) nextBatchOfVerticesToSampleFrom.f0)[i] = j;
                ((Character[]) nextBatchOfVerticesToSampleFrom.f1)[i] = ch;
            }
            int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
            ArrayList[] arrayListArr = new ArrayList[numberOfParallelSubtasks];
            ArrayList[] arrayListArr2 = new ArrayList[numberOfParallelSubtasks];
            ArrayList[] arrayListArr3 = new ArrayList[numberOfParallelSubtasks];
            for (int i2 = 0; i2 < numberOfParallelSubtasks; i2++) {
                arrayListArr[i2] = new ArrayList();
                arrayListArr2[i2] = new ArrayList();
                arrayListArr3[i2] = new ArrayList();
            }
            for (int i3 = 0; i3 < ((long[]) nextBatchOfVerticesToSampleFrom.f0).length; i3++) {
                if (((long[]) nextBatchOfVerticesToSampleFrom.f0)[i3] != -1) {
                    int physicalWorkerIdByLogicalWorkerId = heteGraphEngine.getPhysicalWorkerIdByLogicalWorkerId(this.graphPartitionFunction.apply(((long[]) nextBatchOfVerticesToSampleFrom.f0)[i3], numberOfParallelSubtasks));
                    arrayListArr[physicalWorkerIdByLogicalWorkerId].add(Long.valueOf(((long[]) nextBatchOfVerticesToSampleFrom.f0)[i3]));
                    arrayListArr2[physicalWorkerIdByLogicalWorkerId].add(((Character[]) nextBatchOfVerticesToSampleFrom.f1)[i3]);
                    arrayListArr3[physicalWorkerIdByLogicalWorkerId].add(Integer.valueOf(i3));
                }
            }
            for (int i4 = 0; i4 < numberOfParallelSubtasks; i4++) {
                if (arrayListArr3[i4].size() != 0) {
                    collector.collect(new MetaPathCommunicationUnit(indexOfThisSubtask, i4, (Integer[]) arrayListArr3[i4].toArray(new Integer[0]), (Long[]) arrayListArr[i4].toArray(new Long[0]), (Character[]) arrayListArr2[i4].toArray(new Character[0])));
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/alibaba/alink/operator/batch/graph/MetaPathWalkBatchOp$MetaPathCommunicationUnit.class */
    public static class MetaPathCommunicationUnit extends RandomWalkBatchOp.RandomWalkCommunicationUnit implements Serializable {
        Character[] vertexTypes;

        public MetaPathCommunicationUnit(int i, int i2, Integer[] numArr, Long[] lArr, Character[] chArr) {
            super(i, i2, numArr, lArr);
            this.vertexTypes = chArr;
        }

        public Character[] getVertexTypes() {
            return this.vertexTypes;
        }

        public void deleteVertexTypes() {
            this.vertexTypes = null;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/alibaba/alink/operator/batch/graph/MetaPathWalkBatchOp$ParseNodeType.class */
    public static class ParseNodeType implements MapFunction<Row, Tuple2<Long, Character>> {
        int vertexIdx;
        int typeIdx;

        public ParseNodeType(int i, int i2) {
            this.vertexIdx = i;
            this.typeIdx = i2;
        }

        public Tuple2<Long, Character> map(Row row) throws Exception {
            long longValue = ((Number) row.getField(this.vertexIdx)).longValue();
            return Tuple2.of(Long.valueOf(longValue), Character.valueOf(((String) row.getField(this.typeIdx)).charAt(0)));
        }
    }

    public MetaPathWalkBatchOp() {
        this(new Params());
    }

    public MetaPathWalkBatchOp(Params params) {
        super(params);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.alibaba.alink.operator.batch.BatchOperator
    public MetaPathWalkBatchOp linkFrom(BatchOperator<?>... batchOperatorArr) {
        DataSet<Row> dataSet;
        DataSet<Row> dataSet2;
        checkOpSize(2, batchOperatorArr);
        BatchOperator<?> batchOperator = batchOperatorArr[0];
        BatchOperator<?> batchOperator2 = batchOperatorArr[1];
        Integer walkNum = getWalkNum();
        Integer walkLength = getWalkLength();
        String delimiter = getDelimiter();
        String sourceCol = getSourceCol();
        String targetCol = getTargetCol();
        String weightCol = getWeightCol();
        Boolean isToUndigraph = getIsToUndigraph();
        int findColIndexWithAssertAndHint = TableUtil.findColIndexWithAssertAndHint(batchOperator.getColNames(), sourceCol);
        int findColIndexWithAssertAndHint2 = TableUtil.findColIndexWithAssertAndHint(batchOperator.getColNames(), targetCol);
        int findColIndexWithAssertAndHint3 = weightCol == null ? -1 : TableUtil.findColIndexWithAssertAndHint(batchOperator.getColNames(), weightCol);
        String vertexCol = getVertexCol();
        String typeCol = getTypeCol();
        int findColIndexWithAssertAndHint4 = TableUtil.findColIndexWithAssertAndHint(batchOperator2.getColNames(), vertexCol);
        int findColIndexWithAssertAndHint5 = TableUtil.findColIndexWithAssertAndHint(batchOperator2.getColNames(), typeCol);
        String metaPath = getMetaPath();
        boolean z = (weightCol == null || weightCol.isEmpty()) ? false : true;
        String[] split = metaPath.split(",");
        for (int i = 0; i < split.length; i++) {
            split[i] = split[i].trim();
        }
        ArrayList arrayList = new ArrayList(split.length);
        for (String str : split) {
            arrayList.add(str.toCharArray());
        }
        BasicTypeInfo basicTypeInfo = batchOperator.getColTypes()[findColIndexWithAssertAndHint];
        BasicTypeInfo basicTypeInfo2 = batchOperator.getColTypes()[findColIndexWithAssertAndHint2];
        DataSet<Tuple2<String, Long>> dataSet3 = null;
        boolean z2 = ((basicTypeInfo == BasicTypeInfo.LONG_TYPE_INFO && basicTypeInfo2 == BasicTypeInfo.LONG_TYPE_INFO) || (basicTypeInfo == BasicTypeInfo.INT_TYPE_INFO && basicTypeInfo2 == BasicTypeInfo.INT_TYPE_INFO)) ? false : true;
        if (z2) {
            dataSet3 = IDMappingUtils.computeIdMapping(batchOperator.getDataSet(), new int[]{findColIndexWithAssertAndHint, findColIndexWithAssertAndHint2});
            dataSet = IDMappingUtils.mapDataSetWithIdMapping(batchOperator.getDataSet(), dataSet3, new int[]{findColIndexWithAssertAndHint, findColIndexWithAssertAndHint2});
            dataSet2 = IDMappingUtils.mapDataSetWithIdMapping(batchOperator2.getDataSet(), dataSet3, new int[]{findColIndexWithAssertAndHint4});
        } else {
            dataSet = batchOperator.getDataSet();
            dataSet2 = batchOperator2.getDataSet();
        }
        GraphPartition.GraphPartitionHashFunction graphPartitionHashFunction = new GraphPartition.GraphPartitionHashFunction();
        Operator name = dataSet.flatMap(new ParseGraphData(findColIndexWithAssertAndHint, findColIndexWithAssertAndHint2, findColIndexWithAssertAndHint3, isToUndigraph.booleanValue())).name("parsedEdge");
        Operator name2 = dataSet2.map(new ParseNodeType(findColIndexWithAssertAndHint4, findColIndexWithAssertAndHint5)).name("parsedVertexType");
        Operator name3 = name.coGroup(name2).where(new int[]{1}).equalTo(new int[]{0}).with(new ConcatDstNodeType()).name("edgeWithDstType").coGroup(name2).where(new int[]{0}).equalTo(new int[]{0}).with(new ConcatSrcNodeType()).partitionCustom(new GraphPartition.GraphPartitioner(graphPartitionHashFunction), 0).sortPartition(0, Order.ASCENDING).sortPartition(4, Order.ASCENDING).map(new ConstructHeteEdge()).name("parsedAndPartitionedAndSortedGraph");
        Operator name4 = name3.mapPartition(new ComputeGraphStatistics()).name("graphStatistics");
        long newHandle = IterTaskObjKeeper.getNewHandle();
        long newHandle2 = IterTaskObjKeeper.getNewHandle();
        long newHandle3 = IterTaskObjKeeper.getNewHandle();
        long newHandle4 = IterTaskObjKeeper.getNewHandle();
        ExecutionEnvironment executionEnvironment = MLEnvironmentFactory.get(getMLEnvironmentId()).getExecutionEnvironment();
        IterativeDataSet name5 = executionEnvironment.fromParallelCollection(new NumberSequenceIterator(1L, executionEnvironment.getParallelism()), BasicTypeInfo.LONG_TYPE_INFO).map(new MapFunction<Long, MetaPathCommunicationUnit>() { // from class: com.alibaba.alink.operator.batch.graph.MetaPathWalkBatchOp.1
            public MetaPathCommunicationUnit map(Long l) throws Exception {
                return new MetaPathCommunicationUnit(1, 1, null, null, null);
            }
        }).name("initData").iterate(Integer.MAX_VALUE).name("loop");
        Operator name6 = name3.mapPartition(new CacheGraphAndRandomWalk(newHandle, newHandle2, newHandle3, newHandle4, walkNum.intValue(), walkLength.intValue(), z, getSamplingMethod(), arrayList, graphPartitionHashFunction)).withBroadcastSet(name4, "graphStatistics").withBroadcastSet(name5, "loop").name("cachedGraphAndRandomWalk").mapPartition(new GetMessageToSend(newHandle, newHandle2, newHandle3, graphPartitionHashFunction)).name("sendCommunicationUnit");
        DataSet closeWith = name5.closeWith(name6.partitionCustom(new GraphPartition.GraphPartitioner(graphPartitionHashFunction), new SendRequestKeySelector()).mapPartition(new DoRemoteProcessing(newHandle)).partitionCustom(new GraphPartition.GraphPartitioner(graphPartitionHashFunction), new RecvRequestKeySelector()).name("recvCommunicationUnit").mapPartition(new HandleReceivedMessage(newHandle2)).name("finishedOneStep"), name6.map(new MapFunction<MetaPathCommunicationUnit, Object>() { // from class: com.alibaba.alink.operator.batch.graph.MetaPathWalkBatchOp.2
            public Object map(MetaPathCommunicationUnit metaPathCommunicationUnit) throws Exception {
                return new Object();
            }
        }).name("termination"));
        Operator name7 = executionEnvironment.fromParallelCollection(new NumberSequenceIterator(0L, executionEnvironment.getParallelism()), BasicTypeInfo.LONG_TYPE_INFO).mapPartition(new ReadFromBufferAndRemoveStaticObject(newHandle, newHandle2, newHandle4, delimiter)).name("memoryOut").union(closeWith.mapPartition(new EndWritingRandomWalks(newHandle3)).name("emptyOut").withBroadcastSet(closeWith, "output")).name("mergedOutput");
        setOutput(z2 ? IDMappingUtils.mapWalkToStringWithIdMapping(name7, dataSet3, walkLength.intValue(), delimiter) : name7.map(new LongArrayToRow(delimiter)).name("finalOutput"), new TableSchema(new String[]{"path"}, new TypeInformation[]{Types.STRING}));
        return this;
    }

    @Override // com.alibaba.alink.operator.batch.BatchOperator
    public /* bridge */ /* synthetic */ MetaPathWalkBatchOp linkFrom(BatchOperator[] batchOperatorArr) {
        return linkFrom((BatchOperator<?>[]) batchOperatorArr);
    }
}
