package com.alibaba.alink.operator.stream.source;

import com.alibaba.alink.common.MLEnvironmentFactory;
import com.alibaba.alink.common.annotation.InputPorts;
import com.alibaba.alink.common.annotation.NameCn;
import com.alibaba.alink.common.annotation.NameEn;
import com.alibaba.alink.common.annotation.OutputPorts;
import com.alibaba.alink.common.annotation.PortSpec;
import com.alibaba.alink.common.annotation.PortType;
import com.alibaba.alink.common.io.annotations.AnnotationUtils;
import org.apache.commons.math3.random.RandomDataGenerator;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Table;

@InputPorts
@OutputPorts(values = {@PortSpec(PortType.DATA)})
@NameCn("数值队列数据源")
@NameEn("Numeric Sequence Source")
/* loaded from: input_file:com/alibaba/alink/operator/stream/source/NumSeqSourceStreamOp.class */
public final class NumSeqSourceStreamOp extends BaseSourceStreamOp<NumSeqSourceStreamOp> {
    private static final long serialVersionUID = -1132356020317225421L;
    private final long from;
    private final long to;
    private final String colName;
    private double timePerSample;
    private Double[] timeZones;

    /* loaded from: input_file:com/alibaba/alink/operator/stream/source/NumSeqSourceStreamOp$SpeedController.class */
    public static class SpeedController extends AbstractRichFunction implements MapFunction<Long, Long> {
        private static final long serialVersionUID = 4030421619547107931L;
        RandomDataGenerator rd = new RandomDataGenerator();
        boolean updateSeed = false;
        int numWorker;
        private Double timePerSample;
        private Double[] timeZones;

        public SpeedController(Double[] dArr) {
            if (dArr.length == 1) {
                this.timePerSample = dArr[0];
            } else {
                this.timeZones = dArr;
            }
        }

        public void open(Configuration configuration) throws Exception {
            this.numWorker = getRuntimeContext().getNumberOfParallelSubtasks();
        }

        public Long map(Long l) throws Exception {
            long round;
            if (!this.updateSeed) {
                this.rd.reSeed(l.longValue());
                this.updateSeed = true;
            }
            if (this.timeZones == null) {
                round = Math.round(1000.0d * this.timePerSample.doubleValue() * this.numWorker);
            } else {
                if (this.timeZones.length != 2) {
                    throw new IllegalArgumentException("time parameter is wrong!");
                }
                round = Math.round(1000.0d * (this.timeZones[0].doubleValue() + (0.01d * this.rd.nextInt(0, 100) * (this.timeZones[1].doubleValue() - this.timeZones[0].doubleValue()))) * this.numWorker);
            }
            Thread.sleep(round);
            return l;
        }
    }

    public NumSeqSourceStreamOp(long j) {
        this(1L, j);
    }

    public NumSeqSourceStreamOp(long j, long j2) {
        this(j, j2, new Params());
    }

    public NumSeqSourceStreamOp(long j, long j2, Params params) {
        this(j, j2, "num", params);
    }

    public NumSeqSourceStreamOp(long j, long j2, String str) {
        this(j, j2, str, new Params());
    }

    public NumSeqSourceStreamOp(long j, long j2, String str, Params params) {
        super(AnnotationUtils.annotatedName(NumSeqSourceStreamOp.class), params);
        this.timePerSample = -1.0d;
        this.from = j;
        this.to = j2;
        this.colName = str;
    }

    public NumSeqSourceStreamOp(long j, long j2, double d) {
        this(j, j2, d, (Params) null);
    }

    public NumSeqSourceStreamOp(long j, long j2, double d, Params params) {
        this(j, j2, "num", d, params);
    }

    public NumSeqSourceStreamOp(long j, long j2, String str, double d) {
        this(j, j2, str, d, (Params) null);
    }

    public NumSeqSourceStreamOp(long j, long j2, String str, double d, Params params) {
        super(AnnotationUtils.annotatedName(NumSeqSourceStreamOp.class), params);
        this.timePerSample = -1.0d;
        this.from = j;
        this.to = j2;
        this.colName = str;
        this.timePerSample = d;
    }

    public NumSeqSourceStreamOp(long j, long j2, Double[] dArr) {
        this(j, j2, "num", dArr, (Params) null);
    }

    public NumSeqSourceStreamOp(long j, long j2, Double[] dArr, Params params) {
        this(j, j2, "num", dArr, params);
    }

    public NumSeqSourceStreamOp(long j, long j2, String str, Double[] dArr) {
        this(j, j2, str, dArr, (Params) null);
    }

    public NumSeqSourceStreamOp(long j, long j2, String str, Double[] dArr, Params params) {
        super(AnnotationUtils.annotatedName(NumSeqSourceStreamOp.class), params);
        this.timePerSample = -1.0d;
        this.from = j;
        this.to = j2;
        this.colName = str;
        this.timeZones = dArr;
    }

    @Override // com.alibaba.alink.operator.stream.source.BaseSourceStreamOp
    protected Table initializeDataSource() {
        DataStream generateSequence = MLEnvironmentFactory.get(getMLEnvironmentId()).getStreamExecutionEnvironment().generateSequence(this.from, this.to);
        return MLEnvironmentFactory.get(getMLEnvironmentId()).getStreamTableEnvironment().fromDataStream((this.timeZones == null && this.timePerSample == -1.0d) ? generateSequence : (this.timeZones == null || this.timePerSample != -1.0d) ? generateSequence.map(new SpeedController(new Double[]{Double.valueOf(this.timePerSample)})) : generateSequence.map(new SpeedController(this.timeZones)), this.colName);
    }
}
