package com.alibaba.alink.operator.batch.graph;

import com.alibaba.alink.common.annotation.InputPorts;
import com.alibaba.alink.common.annotation.NameCn;
import com.alibaba.alink.common.annotation.NameEn;
import com.alibaba.alink.common.annotation.OutputPorts;
import com.alibaba.alink.common.annotation.ParamSelectColumnSpec;
import com.alibaba.alink.common.annotation.ParamSelectColumnSpecs;
import com.alibaba.alink.common.annotation.PortSpec;
import com.alibaba.alink.common.annotation.PortType;
import com.alibaba.alink.common.annotation.TypeCollections;
import com.alibaba.alink.common.type.AlinkTypes;
import com.alibaba.alink.common.utils.TableUtil;
import com.alibaba.alink.operator.batch.BatchOperator;
import com.alibaba.alink.operator.batch.huge.line.ApsIteratorLine;
import com.alibaba.alink.operator.batch.huge.line.ApsSerializeDataLine;
import com.alibaba.alink.operator.batch.huge.line.ApsSerializeModelLine;
import com.alibaba.alink.operator.common.aps.ApsContext;
import com.alibaba.alink.operator.common.aps.ApsEnv;
import com.alibaba.alink.operator.common.nlp.WordCountUtil;
import com.alibaba.alink.params.graph.LineParams;
import com.alibaba.alink.params.nlp.HasBatchSize;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapPartitionFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.ml.api.misc.param.ParamInfo;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

@InputPorts(values = {@PortSpec(PortType.DATA)})
@OutputPorts(values = {@PortSpec(PortType.MODEL)})
@ParamSelectColumnSpecs({@ParamSelectColumnSpec(name = "sourceCol", allowedTypeCollections = {TypeCollections.INT_LONG_STRING_TYPES}), @ParamSelectColumnSpec(name = "targetCol", allowedTypeCollections = {TypeCollections.INT_LONG_STRING_TYPES}), @ParamSelectColumnSpec(name = "weightCol", allowedTypeCollections = {TypeCollections.NUMERIC_TYPES})})
@NameCn("Line")
@NameEn("Line")
/* loaded from: input_file:com/alibaba/alink/operator/batch/graph/LineBatchOp.class */
public class LineBatchOp extends BatchOperator<LineBatchOp> implements LineParams<LineBatchOp> {
    private static final long serialVersionUID = -5857950388102221227L;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/alibaba/alink/operator/batch/graph/LineBatchOp$IniModel.class */
    public static class IniModel extends RichMapPartitionFunction<Row, Tuple2<Long, float[][]>> {
        private static final long serialVersionUID = 4601462262211542871L;
        private final int vectorDim;
        private final int order;

        IniModel(int i, int i2) {
            this.vectorDim = i;
            this.order = i2;
        }

        public void mapPartition(Iterable<Row> iterable, Collector<Tuple2<Long, float[][]>> collector) throws Exception {
            Random random = new Random();
            Iterator<Row> it = iterable.iterator();
            while (it.hasNext()) {
                Long l = (Long) it.next().getField(2);
                float[][] fArr = new float[this.order][this.vectorDim];
                for (int i = 0; i < this.vectorDim; i++) {
                    fArr[0][i] = (random.nextFloat() - 0.5f) / this.vectorDim;
                }
                collector.collect(new Tuple2(l, fArr));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/alibaba/alink/operator/batch/graph/LineBatchOp$MapMiniBatchNum.class */
    public static class MapMiniBatchNum implements MapFunction<Long, Integer> {
        private static final long serialVersionUID = -5385263705043389249L;
        int batchSize;

        MapMiniBatchNum(int i) {
            this.batchSize = i;
        }

        public Integer map(Long l) throws Exception {
            return Integer.valueOf(Double.valueOf(Math.ceil((1.0d * l.longValue()) / this.batchSize)).intValue());
        }
    }

    /* loaded from: input_file:com/alibaba/alink/operator/batch/graph/LineBatchOp$RandomPartitioner.class */
    private static class RandomPartitioner implements Partitioner<Long> {
        private static final long serialVersionUID = -2350703157277923339L;

        private RandomPartitioner() {
        }

        public int partition(Long l, int i) {
            return (int) (l.longValue() % i);
        }
    }

    /* loaded from: input_file:com/alibaba/alink/operator/batch/graph/LineBatchOp$RowKeySelector.class */
    public static class RowKeySelector implements KeySelector<Row, Long> {
        private static final long serialVersionUID = 7514280642434354647L;
        int index;

        public RowKeySelector(int i) {
            this.index = i;
        }

        public Long getKey(Row row) {
            return (Long) row.getField(this.index);
        }
    }

    public LineBatchOp(Params params) {
        super(params);
    }

    public LineBatchOp() {
        super(new Params());
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.alibaba.alink.operator.batch.BatchOperator
    public LineBatchOp linkFrom(BatchOperator<?>... batchOperatorArr) {
        BatchOperator<?> checkAndGetFirst = checkAndGetFirst(batchOperatorArr);
        String sourceCol = getSourceCol();
        String targetCol = getTargetCol();
        int intValue = getVectorSize().intValue();
        int intValue2 = getMaxIter().intValue();
        int value = getOrder().getValue();
        boolean booleanValue = getIsToUndigraph().booleanValue();
        getParams().set("threadNum", Integer.valueOf(getNumThreads().intValue()));
        Params params = new Params();
        params.set(ApsContext.alinkApsNumMiniBatch, (Object) 1);
        ApsContext put = new ApsContext(getMLEnvironmentId().longValue()).put(params);
        ApsEnv apsEnv = new ApsEnv(null, new ApsSerializeDataLine(), new ApsSerializeModelLine(), getMLEnvironmentId());
        String weightCol = getWeightCol();
        boolean z = weightCol != null;
        String[] strArr = z ? new String[]{sourceCol, targetCol, weightCol} : new String[]{sourceCol, targetCol};
        TypeInformation<?> typeInformation = checkAndGetFirst.getColTypes()[TableUtil.findColIndexWithAssertAndHint(checkAndGetFirst.getColNames(), sourceCol)];
        DataSet<Row> input2json = GraphUtilsWithString.input2json(checkAndGetFirst, strArr, 2, true);
        GraphUtilsWithString graphUtilsWithString = new GraphUtilsWithString(input2json, typeInformation);
        DataSet inputType2longTuple3 = graphUtilsWithString.inputType2longTuple3(input2json, Boolean.valueOf(z));
        if (booleanValue) {
            inputType2longTuple3 = inputType2longTuple3.flatMap(new FlatMapFunction<Tuple3<Long, Long, Double>, Tuple3<Long, Long, Double>>() { // from class: com.alibaba.alink.operator.batch.graph.LineBatchOp.1
                private static final long serialVersionUID = 2164543041121716573L;

                public void flatMap(Tuple3<Long, Long, Double> tuple3, Collector<Tuple3<Long, Long, Double>> collector) {
                    collector.collect(tuple3);
                    long longValue = ((Long) tuple3.f0).longValue();
                    tuple3.f0 = tuple3.f1;
                    tuple3.f1 = Long.valueOf(longValue);
                    collector.collect(tuple3);
                }

                public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
                    flatMap((Tuple3<Long, Long, Double>) obj, (Collector<Tuple3<Long, Long, Double>>) collector);
                }
            });
        }
        Tuple3<DataSet<Row>, DataSet<Long[]>, DataSet<long[]>> sortedIndexVocab = WordCountUtil.sortedIndexVocab(inputType2longTuple3.flatMap(new FlatMapFunction<Tuple3<Long, Long, Double>, Tuple1<Long>>() { // from class: com.alibaba.alink.operator.batch.graph.LineBatchOp.3
            private static final long serialVersionUID = -3157356818548756725L;

            public void flatMap(Tuple3<Long, Long, Double> tuple3, Collector<Tuple1<Long>> collector) {
                collector.collect(Tuple1.of(tuple3.f0));
                collector.collect(Tuple1.of(tuple3.f1));
            }

            public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
                flatMap((Tuple3<Long, Long, Double>) obj, (Collector<Tuple1<Long>>) collector);
            }
        }).groupBy(new int[]{0}).reduceGroup(new GroupReduceFunction<Tuple1<Long>, Row>() { // from class: com.alibaba.alink.operator.batch.graph.LineBatchOp.2
            private static final long serialVersionUID = -5732373345671140682L;

            public void reduce(Iterable<Tuple1<Long>> iterable, Collector<Row> collector) throws Exception {
                AtomicInteger atomicInteger = new AtomicInteger();
                AtomicLong atomicLong = new AtomicLong();
                iterable.forEach(tuple1 -> {
                    atomicLong.set(((Long) tuple1.f0).longValue());
                    atomicInteger.addAndGet(1);
                });
                Row row = new Row(3);
                row.setField(0, Long.valueOf(atomicLong.longValue()));
                row.setField(1, Integer.valueOf(atomicInteger.intValue()));
                row.setField(2, Double.valueOf(Math.pow(atomicInteger.intValue(), 0.75d)));
                collector.collect(row);
            }
        }).returns(new RowTypeInfo(new TypeInformation[]{Types.LONG, Types.INT, Types.DOUBLE})), 1L, false);
        DataSet dataSet = (DataSet) sortedIndexVocab.f0;
        put.put("negBound", (DataSet) sortedIndexVocab.f1);
        if (getParams().contains(HasBatchSize.BATCH_SIZE)) {
            put.put(ApsContext.alinkApsNumMiniBatch, inputType2longTuple3.mapPartition(new MapPartitionFunction<Tuple3<Long, Long, Double>, Long>() { // from class: com.alibaba.alink.operator.batch.graph.LineBatchOp.6
                private static final long serialVersionUID = -6208289751895867389L;

                public void mapPartition(Iterable<Tuple3<Long, Long, Double>> iterable, Collector<Long> collector) throws Exception {
                    long j = 0;
                    for (Tuple3<Long, Long, Double> tuple3 : iterable) {
                        j++;
                    }
                    collector.collect(Long.valueOf(j));
                }
            }).reduce(new ReduceFunction<Long>() { // from class: com.alibaba.alink.operator.batch.graph.LineBatchOp.5
                private static final long serialVersionUID = 2077893051956735732L;

                public Long reduce(Long l, Long l2) throws Exception {
                    return Long.valueOf(l.longValue() + l2.longValue());
                }
            }).map(new MapMiniBatchNum(((Integer) get(HasBatchSize.BATCH_SIZE)).intValue())));
        } else {
            put.map(new MapFunction<Params, Params>() { // from class: com.alibaba.alink.operator.batch.graph.LineBatchOp.4
                private static final long serialVersionUID = -746561127812958030L;

                public Params map(Params params2) throws Exception {
                    params2.set((ParamInfo<ParamInfo<Integer>>) ApsContext.ALINK_APS_NUM_MINI_BATCH, (ParamInfo<Integer>) 1);
                    return params2;
                }
            });
        }
        setOutput(graphUtilsWithString.mapLine(((DataSet) apsEnv.iterate(dataSet.mapPartition(new IniModel(intValue, value)), encode(inputType2longTuple3, dataSet), put, null, true, intValue2, 1, getParams(), new ApsIteratorLine(), new ApsEnv.PersistentHook<Tuple2<Long, float[][]>>() { // from class: com.alibaba.alink.operator.batch.graph.LineBatchOp.7
            @Override // com.alibaba.alink.operator.common.aps.ApsEnv.PersistentHook
            public DataSet<Tuple2<Long, float[][]>> hook(DataSet<Tuple2<Long, float[][]>> dataSet2) {
                return dataSet2.groupBy(new int[]{0}).withPartitioner(new RandomPartitioner()).reduceGroup(new GroupReduceFunction<Tuple2<Long, float[][]>, Tuple2<Long, float[][]>>() { // from class: com.alibaba.alink.operator.batch.graph.LineBatchOp.7.1
                    private static final long serialVersionUID = 4586697690697331816L;

                    public void reduce(Iterable<Tuple2<Long, float[][]>> iterable, Collector<Tuple2<Long, float[][]>> collector) throws Exception {
                        collector.collect(iterable.iterator().next());
                    }
                });
            }
        }).f0).join(dataSet).where(new int[]{0}).equalTo(new RowKeySelector(2)).with(new JoinFunction<Tuple2<Long, float[][]>, Row, Tuple2<Long, double[]>>() { // from class: com.alibaba.alink.operator.batch.graph.LineBatchOp.8
            private static final long serialVersionUID = -2935973948347718658L;

            public Tuple2<Long, double[]> join(Tuple2<Long, float[][]> tuple2, Row row) throws Exception {
                int length = ((float[][]) tuple2.f1)[0].length;
                double[] dArr = new double[length];
                for (int i = 0; i < length; i++) {
                    dArr[i] = ((float[][]) tuple2.f1)[0][i];
                }
                return Tuple2.of(Long.valueOf(((Long) row.getField(0)).longValue()), dArr);
            }
        })), new String[]{"vertexId", "vertexVector"}, new TypeInformation[]{typeInformation, AlinkTypes.DENSE_VECTOR});
        return this;
    }

    private static DataSet<Number[]> encode(DataSet<Tuple3<Long, Long, Double>> dataSet, DataSet<Row> dataSet2) {
        return dataSet.coGroup(dataSet2).where(new int[]{0}).equalTo(new RowKeySelector(0)).with(new CoGroupFunction<Tuple3<Long, Long, Double>, Row, Tuple3<Long, Long, Double>>() { // from class: com.alibaba.alink.operator.batch.graph.LineBatchOp.11
            private static final long serialVersionUID = -6776475425155785444L;

            public void coGroup(Iterable<Tuple3<Long, Long, Double>> iterable, Iterable<Row> iterable2, Collector<Tuple3<Long, Long, Double>> collector) throws Exception {
                Row next = iterable2.iterator().next();
                for (Tuple3<Long, Long, Double> tuple3 : iterable) {
                    collector.collect(Tuple3.of(Long.valueOf(((Long) next.getField(2)).longValue()), tuple3.f1, tuple3.f2));
                }
            }
        }).coGroup(dataSet2).where(new int[]{1}).equalTo(new RowKeySelector(0)).with(new CoGroupFunction<Tuple3<Long, Long, Double>, Row, Tuple3<Long, Long, Double>>() { // from class: com.alibaba.alink.operator.batch.graph.LineBatchOp.10
            private static final long serialVersionUID = 1331079849234438512L;

            public void coGroup(Iterable<Tuple3<Long, Long, Double>> iterable, Iterable<Row> iterable2, Collector<Tuple3<Long, Long, Double>> collector) throws Exception {
                Row next = iterable2.iterator().next();
                for (Tuple3<Long, Long, Double> tuple3 : iterable) {
                    collector.collect(Tuple3.of(tuple3.f0, Long.valueOf(((Long) next.getField(2)).longValue()), tuple3.f2));
                }
            }
        }).map(new MapFunction<Tuple3<Long, Long, Double>, Number[]>() { // from class: com.alibaba.alink.operator.batch.graph.LineBatchOp.9
            private static final long serialVersionUID = -223306152758743411L;

            public Number[] map(Tuple3<Long, Long, Double> tuple3) throws Exception {
                return new Number[]{(Number) tuple3.f0, (Number) tuple3.f1, Float.valueOf(((Double) tuple3.f2).floatValue())};
            }
        });
    }

    @Override // com.alibaba.alink.operator.batch.BatchOperator
    public /* bridge */ /* synthetic */ LineBatchOp linkFrom(BatchOperator[] batchOperatorArr) {
        return linkFrom((BatchOperator<?>[]) batchOperatorArr);
    }
}
