package com.alibaba.alink.operator.batch.graph;

import com.alibaba.alink.common.annotation.InputPorts;
import com.alibaba.alink.common.annotation.NameCn;
import com.alibaba.alink.common.annotation.NameEn;
import com.alibaba.alink.common.annotation.OutputPorts;
import com.alibaba.alink.common.annotation.ParamSelectColumnSpec;
import com.alibaba.alink.common.annotation.ParamSelectColumnSpecs;
import com.alibaba.alink.common.annotation.PortDesc;
import com.alibaba.alink.common.annotation.PortSpec;
import com.alibaba.alink.common.annotation.PortType;
import com.alibaba.alink.common.annotation.TypeCollections;
import com.alibaba.alink.common.utils.TableUtil;
import com.alibaba.alink.operator.batch.BatchOperator;
import com.alibaba.alink.operator.batch.graph.memory.MemoryComputeFunction;
import com.alibaba.alink.operator.batch.graph.memory.MemoryVertexCentricIteration;
import com.alibaba.alink.operator.common.tree.Criteria;
import com.alibaba.alink.params.graph.KCoreParams;
import java.util.ArrayList;
import java.util.Iterator;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.operators.Operator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.graph.Edge;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;

@InputPorts(values = {@PortSpec(value = PortType.DATA, opType = PortSpec.OpType.BATCH, desc = PortDesc.GRPAH_EDGES)})
@OutputPorts(values = {@PortSpec(PortType.DATA)})
@ParamSelectColumnSpecs({@ParamSelectColumnSpec(name = "edgeSourceCol", allowedTypeCollections = {TypeCollections.INT_LONG_STRING_TYPES}), @ParamSelectColumnSpec(name = "edgeTargetCol", allowedTypeCollections = {TypeCollections.INT_LONG_STRING_TYPES})})
@NameCn("KCore算法")
@NameEn("KCore")
/* loaded from: input_file:com/alibaba/alink/operator/batch/graph/KCoreBatchOp.class */
public class KCoreBatchOp extends BatchOperator<KCoreBatchOp> implements KCoreParams<KCoreBatchOp> {
    private static final long serialVersionUID = -7537644695230031028L;

    /* loaded from: input_file:com/alibaba/alink/operator/batch/graph/KCoreBatchOp$KCore.class */
    public static class KCore {
        public static int k;
        public int maxIter;

        /* loaded from: input_file:com/alibaba/alink/operator/batch/graph/KCoreBatchOp$KCore$FilterLargeOnesOnThiState.class */
        public static class FilterLargeOnesOnThiState implements FilterFunction<Tuple5<Long, Long, Long, Long, Double>> {
            private static final long serialVersionUID = 1257898737107879380L;
            private long k;

            private FilterLargeOnesOnThiState(long j) {
                this.k = j;
            }

            public boolean filter(Tuple5<Long, Long, Long, Long, Double> tuple5) throws Exception {
                return ((Long) tuple5.f3).longValue() <= this.k;
            }
        }

        /* loaded from: input_file:com/alibaba/alink/operator/batch/graph/KCoreBatchOp$KCore$FilterSmallOnesOnFirstField.class */
        public static class FilterSmallOnesOnFirstField implements FilterFunction<Tuple5<Long, Long, Long, Long, Double>> {
            private static final long serialVersionUID = -4414815465890029511L;
            private long k;

            private FilterSmallOnesOnFirstField(long j) {
                this.k = j;
            }

            public boolean filter(Tuple5<Long, Long, Long, Long, Double> tuple5) throws Exception {
                return ((Long) tuple5.f2).longValue() > this.k;
            }
        }

        /* loaded from: input_file:com/alibaba/alink/operator/batch/graph/KCoreBatchOp$KCore$FilterSmallOnesOnSecondField.class */
        public static class FilterSmallOnesOnSecondField implements FilterFunction<Tuple5<Long, Long, Long, Long, Double>> {
            private static final long serialVersionUID = 156799354134467716L;
            private long k;

            private FilterSmallOnesOnSecondField(long j) {
                this.k = j;
            }

            public boolean filter(Tuple5<Long, Long, Long, Long, Double> tuple5) throws Exception {
                return ((Long) tuple5.f3).longValue() > this.k;
            }
        }

        /* loaded from: input_file:com/alibaba/alink/operator/batch/graph/KCoreBatchOp$KCore$ReduceOnFirstField.class */
        public static class ReduceOnFirstField implements GroupReduceFunction<Tuple5<Long, Long, Long, Long, Double>, Tuple5<Long, Long, Long, Long, Double>> {
            private static final long serialVersionUID = 263920722211539724L;

            public void reduce(Iterable<Tuple5<Long, Long, Long, Long, Double>> iterable, Collector<Tuple5<Long, Long, Long, Long, Double>> collector) throws Exception {
                long j = 0;
                ArrayList<Tuple5> arrayList = new ArrayList();
                Iterator<Tuple5<Long, Long, Long, Long, Double>> it = iterable.iterator();
                while (it.hasNext()) {
                    j++;
                    arrayList.add(it.next());
                }
                for (Tuple5 tuple5 : arrayList) {
                    collector.collect(new Tuple5(tuple5.f0, tuple5.f1, Long.valueOf(j), tuple5.f3, tuple5.f4));
                }
            }
        }

        /* loaded from: input_file:com/alibaba/alink/operator/batch/graph/KCoreBatchOp$KCore$ReduceOnSecondField.class */
        public static class ReduceOnSecondField implements GroupReduceFunction<Tuple5<Long, Long, Long, Long, Double>, Tuple5<Long, Long, Long, Long, Double>> {
            private static final long serialVersionUID = 7840099990204577056L;

            public void reduce(Iterable<Tuple5<Long, Long, Long, Long, Double>> iterable, Collector<Tuple5<Long, Long, Long, Long, Double>> collector) throws Exception {
                long j = 0;
                ArrayList<Tuple5> arrayList = new ArrayList();
                Iterator<Tuple5<Long, Long, Long, Long, Double>> it = iterable.iterator();
                while (it.hasNext()) {
                    j++;
                    arrayList.add(it.next());
                }
                for (Tuple5 tuple5 : arrayList) {
                    collector.collect(new Tuple5(tuple5.f0, tuple5.f1, tuple5.f2, Long.valueOf(j), tuple5.f4));
                }
            }
        }

        public KCore(int i, int i2) {
            k = i;
            this.maxIter = i2;
        }

        public DataSet<Edge<Long, Double>> run(DataSet<Edge<Long, Double>> dataSet, Boolean bool) {
            return operation(bool.booleanValue() ? dataSet.flatMap(new FlatMapFunction<Edge<Long, Double>, Tuple5<Long, Long, Long, Long, Double>>() { // from class: com.alibaba.alink.operator.batch.graph.KCoreBatchOp.KCore.1
                private static final long serialVersionUID = 3415098877090917677L;

                public void flatMap(Edge<Long, Double> edge, Collector<Tuple5<Long, Long, Long, Long, Double>> collector) {
                    Tuple5 tuple5 = new Tuple5();
                    tuple5.f0 = edge.f0;
                    tuple5.f1 = edge.f1;
                    tuple5.f2 = -1L;
                    tuple5.f3 = -1L;
                    tuple5.f4 = Double.valueOf(Criteria.INVALID_GAIN);
                    collector.collect(tuple5);
                    tuple5.f0 = edge.f1;
                    tuple5.f1 = edge.f0;
                    collector.collect(tuple5);
                }

                public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
                    flatMap((Edge<Long, Double>) obj, (Collector<Tuple5<Long, Long, Long, Long, Double>>) collector);
                }
            }) : dataSet.flatMap(new FlatMapFunction<Edge<Long, Double>, Tuple5<Long, Long, Long, Long, Double>>() { // from class: com.alibaba.alink.operator.batch.graph.KCoreBatchOp.KCore.2
                private static final long serialVersionUID = -1356257363097879387L;

                public void flatMap(Edge<Long, Double> edge, Collector<Tuple5<Long, Long, Long, Long, Double>> collector) {
                    Tuple5 tuple5 = new Tuple5();
                    tuple5.f0 = edge.f0;
                    tuple5.f1 = edge.f1;
                    tuple5.f2 = -1L;
                    tuple5.f3 = -1L;
                    tuple5.f4 = Double.valueOf(Criteria.INVALID_GAIN);
                    collector.collect(tuple5);
                }

                public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
                    flatMap((Edge<Long, Double>) obj, (Collector<Tuple5<Long, Long, Long, Long, Double>>) collector);
                }
            })).map(new MapFunction<Tuple5<Long, Long, Long, Long, Double>, Edge<Long, Double>>() { // from class: com.alibaba.alink.operator.batch.graph.KCoreBatchOp.KCore.3
                private static final long serialVersionUID = -1652848589684719913L;

                public Edge<Long, Double> map(Tuple5<Long, Long, Long, Long, Double> tuple5) throws Exception {
                    return new Edge<>(tuple5.f0, tuple5.f1, Double.valueOf(1.0d));
                }
            });
        }

        public DataSet<Tuple5<Long, Long, Long, Long, Double>> operation(DataSet<Tuple5<Long, Long, Long, Long, Double>> dataSet) {
            IterativeDataSet iterate = dataSet.iterate(this.maxIter);
            Operator name = iterate.groupBy(new int[]{0}).reduceGroup(new ReduceOnFirstField()).filter(new FilterSmallOnesOnFirstField(k)).name("firstStep").groupBy(new int[]{1}).reduceGroup(new ReduceOnSecondField()).name("secondStep");
            return iterate.closeWith(name.filter(new FilterSmallOnesOnSecondField(k)).name("filterSmallOne"), name.filter(new FilterLargeOnesOnThiState(k)).name("filterLargeOne"));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/alibaba/alink/operator/batch/graph/KCoreBatchOp$KcoreComputeFunction.class */
    public static class KcoreComputeFunction extends MemoryComputeFunction {
        private final int k;
        private static final double VALID_EDGE = 1.0d;
        private static final double INVALID_EDGE = -1.0d;
        private static final double INVALID_VERTEX = -1.0d;
        private static final double doubleEpsilon = 1.0E-7d;

        public KcoreComputeFunction(int i) {
            this.k = i;
        }

        @Override // com.alibaba.alink.operator.batch.graph.memory.MemoryComputeFunction
        public void gatherMessage(long j, double d) {
            incCurVertexValue(j, -1.0d);
            setEdgeValue(j, (long) d, -1.0d);
        }

        @Override // com.alibaba.alink.operator.batch.graph.memory.MemoryComputeFunction
        public void sendMessage(Iterator<Tuple2<Long, Double>> it, long j) {
            double curVertexValue = getCurVertexValue(j);
            if (!isValidVertex(curVertexValue) || curVertexValue > this.k + doubleEpsilon) {
                return;
            }
            long j2 = -1;
            while (it.hasNext()) {
                Tuple2<Long, Double> next = it.next();
                if (((Double) next.f1).doubleValue() > Criteria.INVALID_GAIN) {
                    sendMessageTo(((Long) next.f0).longValue(), j);
                    if (j2 != -1 && j2 != ((Long) next.f0).longValue()) {
                        setEdgeValue(j, j2, -1.0d);
                    }
                    j2 = ((Long) next.f0).longValue();
                }
            }
            if (j2 != -1) {
                setEdgeValue(j, j2, -1.0d);
            }
            setCurVertexValue(j, -1.0d);
        }

        private boolean isValidVertex(double d) {
            return d > -1.0d;
        }

        @Override // com.alibaba.alink.operator.batch.graph.memory.MemoryComputeFunction
        public void initVerticesValues() {
            setAllVertexValueByOutDegree();
        }

        @Override // com.alibaba.alink.operator.batch.graph.memory.MemoryComputeFunction
        public void initEdgesValues() {
            setAllEdgeValues(VALID_EDGE);
        }
    }

    public KCoreBatchOp(Params params) {
        super(params);
    }

    public KCoreBatchOp() {
        super(new Params());
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.alibaba.alink.operator.batch.BatchOperator
    public KCoreBatchOp linkFrom(BatchOperator<?>... batchOperatorArr) {
        BatchOperator<?> checkAndGetFirst = checkAndGetFirst(batchOperatorArr);
        String[] strArr = {"node1", "node2"};
        String[] colNames = checkAndGetFirst.getColNames();
        int findColIndexWithAssertAndHint = TableUtil.findColIndexWithAssertAndHint(colNames, getEdgeTargetCol());
        int findColIndexWithAssertAndHint2 = TableUtil.findColIndexWithAssertAndHint(colNames, getEdgeTargetCol());
        TypeInformation<?> typeInformation = checkAndGetFirst.getColTypes()[findColIndexWithAssertAndHint];
        Preconditions.checkState(typeInformation == checkAndGetFirst.getColTypes()[findColIndexWithAssertAndHint2], "The source and target should be the same type.");
        setOutput(MemoryVertexCentricIteration.runAndGetEdges(checkAndGetFirst.select(new String[]{getEdgeSourceCol(), getEdgeTargetCol()}).getDataSet(), typeInformation, false, true, getMLEnvironmentId().longValue(), Integer.MAX_VALUE, new KcoreComputeFunction(getK().intValue())).flatMap(new FlatMapFunction<Row, Row>() { // from class: com.alibaba.alink.operator.batch.graph.KCoreBatchOp.1
            public void flatMap(Row row, Collector<Row> collector) throws Exception {
                if (((Number) row.getField(2)).doubleValue() > Criteria.INVALID_GAIN) {
                    collector.collect(Row.of(new Object[]{row.getField(0), row.getField(1)}));
                }
            }

            public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
                flatMap((Row) obj, (Collector<Row>) collector);
            }
        }), strArr, new TypeInformation[]{typeInformation, typeInformation});
        return this;
    }

    @Override // com.alibaba.alink.operator.batch.BatchOperator
    public /* bridge */ /* synthetic */ KCoreBatchOp linkFrom(BatchOperator[] batchOperatorArr) {
        return linkFrom((BatchOperator<?>[]) batchOperatorArr);
    }
}
