package com.alibaba.alink.operator.batch.graph;

import com.alibaba.alink.common.MLEnvironmentFactory;
import com.alibaba.alink.common.annotation.InputPorts;
import com.alibaba.alink.common.annotation.NameCn;
import com.alibaba.alink.common.annotation.NameEn;
import com.alibaba.alink.common.annotation.OutputPorts;
import com.alibaba.alink.common.annotation.ParamSelectColumnSpec;
import com.alibaba.alink.common.annotation.ParamSelectColumnSpecs;
import com.alibaba.alink.common.annotation.PortDesc;
import com.alibaba.alink.common.annotation.PortSpec;
import com.alibaba.alink.common.annotation.PortType;
import com.alibaba.alink.common.annotation.TypeCollections;
import com.alibaba.alink.common.utils.TableUtil;
import com.alibaba.alink.operator.batch.BatchOperator;
import com.alibaba.alink.params.graph.EdgeClusterCoefficientParams;
import java.util.Arrays;
import java.util.Iterator;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Triplet;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.VertexJoinFunction;
import org.apache.flink.graph.spargel.GatherFunction;
import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.ScatterFunction;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

@InputPorts(values = {@PortSpec(value = PortType.DATA, opType = PortSpec.OpType.BATCH, desc = PortDesc.GRPAH_EDGES)})
@OutputPorts(values = {@PortSpec(value = PortType.DATA, desc = PortDesc.OUTPUT_RESULT)})
@ParamSelectColumnSpecs({@ParamSelectColumnSpec(name = "edgeSourceCol", allowedTypeCollections = {TypeCollections.INT_LONG_STRING_TYPES}), @ParamSelectColumnSpec(name = "edgeTargetCol", allowedTypeCollections = {TypeCollections.INT_LONG_STRING_TYPES})})
@NameCn("边聚类系数")
@NameEn("Edge Cluster Coefficient")
/* loaded from: input_file:com/alibaba/alink/operator/batch/graph/EdgeClusterCoefficientBatchOp.class */
public class EdgeClusterCoefficientBatchOp extends BatchOperator<EdgeClusterCoefficientBatchOp> implements EdgeClusterCoefficientParams<EdgeClusterCoefficientBatchOp> {
    private static final long serialVersionUID = 7552939759062806711L;

    /* loaded from: input_file:com/alibaba/alink/operator/batch/graph/EdgeClusterCoefficientBatchOp$EdgeClusterCoefficient.class */
    public static class EdgeClusterCoefficient {

        /* loaded from: input_file:com/alibaba/alink/operator/batch/graph/EdgeClusterCoefficientBatchOp$EdgeClusterCoefficient$GatherGraphTemp.class */
        public static class GatherGraphTemp extends GatherFunction<Long, Long[], Long> {
            private static final long serialVersionUID = 465153561839511086L;

            public void updateVertex(Vertex<Long, Long[]> vertex, MessageIterator<Long> messageIterator) {
                int i = 0;
                Iterator it = messageIterator.iterator();
                while (it.hasNext()) {
                    ((Long[]) vertex.f1)[i] = (Long) it.next();
                    i++;
                }
                Arrays.sort((Object[]) vertex.f1);
                setNewVertexValue(vertex.f1);
            }
        }

        /* loaded from: input_file:com/alibaba/alink/operator/batch/graph/EdgeClusterCoefficientBatchOp$EdgeClusterCoefficient$GraphTempMapEdge.class */
        public static class GraphTempMapEdge implements MapFunction<Edge<Long, Double>, Long> {
            private static final long serialVersionUID = 5872715320371423887L;

            public Long map(Edge<Long, Double> edge) throws Exception {
                return 0L;
            }
        }

        /* loaded from: input_file:com/alibaba/alink/operator/batch/graph/EdgeClusterCoefficientBatchOp$EdgeClusterCoefficient$GraphTempMapVertex.class */
        public static class GraphTempMapVertex implements MapFunction<Vertex<Long, Double>, Long[]> {
            private static final long serialVersionUID = 897641610324504285L;

            public Long[] map(Vertex<Long, Double> vertex) throws Exception {
                return new Long[((Double) vertex.f1).intValue()];
            }
        }

        /* loaded from: input_file:com/alibaba/alink/operator/batch/graph/EdgeClusterCoefficientBatchOp$EdgeClusterCoefficient$Longvalue2Long.class */
        public static class Longvalue2Long implements MapFunction<Tuple2<Long, LongValue>, Tuple2<Long, Long>> {
            private static final long serialVersionUID = -8499849561757464697L;

            public Tuple2<Long, Long> map(Tuple2<Long, LongValue> tuple2) throws Exception {
                return new Tuple2<>(tuple2.f0, Long.valueOf(((LongValue) tuple2.f1).getValue()));
            }
        }

        /* loaded from: input_file:com/alibaba/alink/operator/batch/graph/EdgeClusterCoefficientBatchOp$EdgeClusterCoefficient$MapTriplet.class */
        public static class MapTriplet implements MapFunction<Triplet<Long, Long[], Long>, Tuple6<Long, Long, Long, Long, Long, Double>> {
            private static final long serialVersionUID = -1837449716800435246L;

            public Tuple6<Long, Long, Long, Long, Long, Double> map(Triplet<Long, Long[], Long> triplet) throws Exception {
                int length = ((Long[]) triplet.f2).length;
                int length2 = ((Long[]) triplet.f3).length;
                int i = 0;
                int i2 = 0;
                long j = 0;
                while (i < length && i2 < length2) {
                    if (((Long[]) triplet.f2)[i].equals(((Long[]) triplet.f3)[i2])) {
                        i++;
                        i2++;
                        j++;
                    } else if (((Long[]) triplet.f2)[i].longValue() > ((Long[]) triplet.f3)[i2].longValue()) {
                        i2++;
                    } else {
                        i++;
                    }
                }
                return new Tuple6<>(triplet.f0, triplet.f1, Long.valueOf(((Long[]) triplet.f2).length), Long.valueOf(((Long[]) triplet.f3).length), Long.valueOf(j), Double.valueOf((j * 1.0d) / Math.min(r0, r0)));
            }
        }

        /* loaded from: input_file:com/alibaba/alink/operator/batch/graph/EdgeClusterCoefficientBatchOp$EdgeClusterCoefficient$ScatterGraphTemp.class */
        public static class ScatterGraphTemp extends ScatterFunction<Long, Long[], Long, Long> {
            private static final long serialVersionUID = -7538585087831930835L;

            public void sendMessages(Vertex<Long, Long[]> vertex) {
                Iterator it = getEdges().iterator();
                while (it.hasNext()) {
                    sendMessageTo(((Edge) it.next()).getTarget(), vertex.f0);
                }
            }
        }

        /* loaded from: input_file:com/alibaba/alink/operator/batch/graph/EdgeClusterCoefficientBatchOp$EdgeClusterCoefficient$VerticesJoin.class */
        public static class VerticesJoin implements VertexJoinFunction<Double, Long> {
            private static final long serialVersionUID = 3413134536200006612L;

            public Double vertexJoin(Double d, Long l) {
                return Double.valueOf(l.doubleValue());
            }
        }

        public DataSet<Tuple6<Long, Long, Long, Long, Long, Double>> run(Graph<Long, Double, Double> graph) {
            return graph.joinWithVertices(graph.inDegrees().map(new Longvalue2Long()), new VerticesJoin()).mapEdges(new GraphTempMapEdge()).mapVertices(new GraphTempMapVertex()).runScatterGatherIteration(new ScatterGraphTemp(), new GatherGraphTemp(), 1).getTriplets().map(new MapTriplet());
        }
    }

    /* loaded from: input_file:com/alibaba/alink/operator/batch/graph/EdgeClusterCoefficientBatchOp$MapVertices.class */
    public static class MapVertices implements MapFunction<Vertex<Long, NullValue>, Double> {
        private static final long serialVersionUID = -2462169842349628527L;

        public Double map(Vertex<Long, NullValue> vertex) throws Exception {
            return Double.valueOf(1.0d);
        }
    }

    public EdgeClusterCoefficientBatchOp(Params params) {
        super(params);
    }

    public EdgeClusterCoefficientBatchOp() {
        super(new Params());
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.alibaba.alink.operator.batch.BatchOperator
    public EdgeClusterCoefficientBatchOp linkFrom(BatchOperator<?>... batchOperatorArr) {
        BatchOperator<?> checkAndGetFirst = checkAndGetFirst(batchOperatorArr);
        String edgeSourceCol = getEdgeSourceCol();
        String edgeTargetCol = getEdgeTargetCol();
        String[] strArr = {"node1", "node2", "neighbor1", "neighbor2", "commonNeighbor", "edgeClusterCoefficient"};
        Boolean asUndirectedGraph = getAsUndirectedGraph();
        String[] strArr2 = {edgeSourceCol, edgeTargetCol};
        TypeInformation<?> typeInformation = checkAndGetFirst.getColTypes()[TableUtil.findColIndexWithAssertAndHint(checkAndGetFirst.getColNames(), edgeSourceCol)];
        DataSet<Row> input2json = GraphUtilsWithString.input2json(checkAndGetFirst, strArr2, 2, false);
        GraphUtilsWithString graphUtilsWithString = new GraphUtilsWithString(input2json, typeInformation);
        DataSet<Edge<Long, Double>> inputType2longEdge = graphUtilsWithString.inputType2longEdge(input2json, false);
        setOutput(graphUtilsWithString.long2outputTypeECC(new EdgeClusterCoefficient().run(asUndirectedGraph.booleanValue() ? Graph.fromDataSet(inputType2longEdge, MLEnvironmentFactory.get(getMLEnvironmentId()).getExecutionEnvironment()).mapVertices(new MapVertices()).getUndirected() : Graph.fromDataSet(inputType2longEdge, MLEnvironmentFactory.get(getMLEnvironmentId()).getExecutionEnvironment()).mapVertices(new MapVertices())).groupBy(new int[]{0}).reduceGroup(new GroupReduceFunction<Tuple6<Long, Long, Long, Long, Long, Double>, Tuple6<Long, Long, Long, Long, Long, Double>>() { // from class: com.alibaba.alink.operator.batch.graph.EdgeClusterCoefficientBatchOp.1
            private static final long serialVersionUID = 2757693152062829979L;

            public void reduce(Iterable<Tuple6<Long, Long, Long, Long, Long, Double>> iterable, Collector<Tuple6<Long, Long, Long, Long, Long, Double>> collector) throws Exception {
                for (Tuple6<Long, Long, Long, Long, Long, Double> tuple6 : iterable) {
                    if (((Long) tuple6.f0).longValue() > ((Long) tuple6.f1).longValue()) {
                        collector.collect(tuple6);
                    }
                }
            }
        })), strArr, new TypeInformation[]{typeInformation, typeInformation, Types.LONG, Types.LONG, Types.LONG, Types.DOUBLE});
        return this;
    }

    @Override // com.alibaba.alink.operator.batch.BatchOperator
    public /* bridge */ /* synthetic */ EdgeClusterCoefficientBatchOp linkFrom(BatchOperator[] batchOperatorArr) {
        return linkFrom((BatchOperator<?>[]) batchOperatorArr);
    }
}
