package com.alibaba.alink.operator.batch.graph.utils;

import com.alibaba.alink.common.comqueue.IterTaskObjKeeper;
import com.alibaba.alink.common.exceptions.AkIllegalStateException;
import com.alibaba.alink.operator.batch.graph.RandomWalkBatchOp;
import com.alibaba.alink.operator.batch.graph.RandomWalkBatchOp.RandomWalkCommunicationUnit;
import com.alibaba.alink.operator.batch.graph.walkpath.BaseWalkPathEngine;
import java.util.Iterator;
import org.apache.flink.api.common.functions.RichMapPartitionFunction;
import org.apache.flink.util.Collector;

/* loaded from: input_file:com/alibaba/alink/operator/batch/graph/utils/HandleReceivedMessage.class */
public class HandleReceivedMessage<T extends RandomWalkBatchOp.RandomWalkCommunicationUnit> extends RichMapPartitionFunction<T, T> {
    long randomWalkStorageHandler;
    static final /* synthetic */ boolean $assertionsDisabled;

    public HandleReceivedMessage(long j) {
        this.randomWalkStorageHandler = j;
    }

    public void mapPartition(Iterable<T> iterable, Collector<T> collector) throws Exception {
        if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
            Iterator<T> it = iterable.iterator();
            while (it.hasNext()) {
                collector.collect(it.next());
            }
            return;
        }
        int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
        BaseWalkPathEngine baseWalkPathEngine = (BaseWalkPathEngine) IterTaskObjKeeper.get(this.randomWalkStorageHandler, indexOfThisSubtask);
        if (baseWalkPathEngine == null) {
            throw new AkIllegalStateException("baseWalkPathEngine is null");
        }
        for (T t : iterable) {
            int srcPartitionId = t.getSrcPartitionId();
            if (!$assertionsDisabled && srcPartitionId != indexOfThisSubtask) {
                throw new AssertionError();
            }
            Long[] requestedVertexIds = t.getRequestedVertexIds();
            Integer[] walkIds = t.getWalkIds();
            for (int i = 0; i < requestedVertexIds.length; i++) {
                baseWalkPathEngine.updatePath(walkIds[i].intValue(), requestedVertexIds[i].longValue());
            }
        }
    }

    static {
        $assertionsDisabled = !HandleReceivedMessage.class.desiredAssertionStatus();
    }
}
