package com.alibaba.alink.operator.batch.source;

import com.alibaba.alink.common.MLEnvironmentFactory;
import com.alibaba.alink.common.annotation.NameCn;
import com.alibaba.alink.common.annotation.NameEn;
import com.alibaba.alink.common.io.annotations.AnnotationUtils;
import com.alibaba.alink.common.io.annotations.IOType;
import com.alibaba.alink.common.io.annotations.IoOpAnnotation;
import com.alibaba.alink.operator.batch.utils.DataSetConversionUtil;
import com.alibaba.alink.params.io.NumSeqSourceParams;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.functions.RichMapPartitionFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.ml.api.misc.param.ParamInfo;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.table.api.Table;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

@IoOpAnnotation(name = "num_seq", ioType = IOType.SourceBatch)
@NameCn("数值队列数据源")
@NameEn("Number Sequence Source")
/* loaded from: input_file:com/alibaba/alink/operator/batch/source/NumSeqSourceBatchOp.class */
public final class NumSeqSourceBatchOp extends BaseSourceBatchOp<NumSeqSourceBatchOp> implements NumSeqSourceParams<NumSeqSourceBatchOp> {
    private static final String DEFAULT_OUTPUT_COL_NAME = "num";
    private static final long serialVersionUID = 6536810460146700008L;

    public NumSeqSourceBatchOp() {
        this((Params) null);
    }

    public NumSeqSourceBatchOp(Params params) {
        super(AnnotationUtils.annotatedName(NumSeqSourceBatchOp.class), params);
        if (getParams().contains(NumSeqSourceParams.OUTPUT_COL)) {
            return;
        }
        setOutputCol(DEFAULT_OUTPUT_COL_NAME);
    }

    @Deprecated
    public NumSeqSourceBatchOp(long j) {
        this(1L, j);
    }

    @Deprecated
    public NumSeqSourceBatchOp(long j, long j2) {
        this(j, j2, DEFAULT_OUTPUT_COL_NAME);
    }

    @Deprecated
    public NumSeqSourceBatchOp(long j, long j2, Params params) {
        this(j, j2, DEFAULT_OUTPUT_COL_NAME, params);
    }

    @Deprecated
    public NumSeqSourceBatchOp(long j, long j2, String str) {
        this(j, j2, str, new Params());
    }

    @Deprecated
    public NumSeqSourceBatchOp(long j, long j2, String str, Params params) {
        this(params.m1495clone().set((ParamInfo<ParamInfo<Long>>) NumSeqSourceParams.FROM, (ParamInfo<Long>) Long.valueOf(j)).set((ParamInfo<ParamInfo<Long>>) NumSeqSourceParams.TO, (ParamInfo<Long>) Long.valueOf(j2)).set((ParamInfo<ParamInfo<String>>) NumSeqSourceParams.OUTPUT_COL, (ParamInfo<String>) str));
    }

    @Override // com.alibaba.alink.operator.batch.source.BaseSourceBatchOp
    protected Table initializeDataSource() {
        final long longValue = getFrom().longValue();
        final long longValue2 = getTo().longValue();
        String outputCol = getOutputCol();
        final int parallelism = MLEnvironmentFactory.get(getMLEnvironmentId()).getExecutionEnvironment().getParallelism();
        return DataSetConversionUtil.toTable(getMLEnvironmentId(), (DataSet<Row>) MLEnvironmentFactory.get(getMLEnvironmentId()).getExecutionEnvironment().fromElements(new Integer[]{0}).flatMap(new FlatMapFunction<Integer, Tuple1<Integer>>() { // from class: com.alibaba.alink.operator.batch.source.NumSeqSourceBatchOp.3
            private static final long serialVersionUID = 4025060738763494851L;

            public void flatMap(Integer num, Collector<Tuple1<Integer>> collector) {
                for (int i = 0; i < parallelism; i++) {
                    collector.collect(Tuple1.of(Integer.valueOf(i)));
                }
            }

            public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
                flatMap((Integer) obj, (Collector<Tuple1<Integer>>) collector);
            }
        }).partitionCustom(new Partitioner<Integer>() { // from class: com.alibaba.alink.operator.batch.source.NumSeqSourceBatchOp.2
            private static final long serialVersionUID = 2199481887391477466L;

            public int partition(Integer num, int i) {
                return num.intValue() % i;
            }
        }, 0).mapPartition(new RichMapPartitionFunction<Tuple1<Integer>, Row>() { // from class: com.alibaba.alink.operator.batch.source.NumSeqSourceBatchOp.1
            private static final long serialVersionUID = 4854657588340027916L;

            public void mapPartition(Iterable<Tuple1<Integer>> iterable, Collector<Row> collector) throws Exception {
                int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
                long j = longValue;
                while (true) {
                    long j2 = j;
                    if (j2 > longValue2) {
                        return;
                    }
                    if (j2 % parallelism == indexOfThisSubtask) {
                        collector.collect(Row.of(new Object[]{Long.valueOf(j2)}));
                    }
                    j = j2 + 1;
                }
            }
        }), new String[]{outputCol}, (TypeInformation<?>[]) new TypeInformation[]{Types.LONG});
    }
}
