package com.alibaba.alink.operator.stream.dataproc;

import com.alibaba.alink.common.annotation.InputPorts;
import com.alibaba.alink.common.annotation.NameCn;
import com.alibaba.alink.common.annotation.NameEn;
import com.alibaba.alink.common.annotation.OutputPorts;
import com.alibaba.alink.common.annotation.PortSpec;
import com.alibaba.alink.common.annotation.PortType;
import com.alibaba.alink.operator.stream.StreamOperator;
import com.alibaba.alink.params.onlinelearning.SpeedControlParams;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.types.Row;

@InputPorts(values = {@PortSpec(PortType.ANY)})
@OutputPorts
@NameCn("流速控制")
@NameEn("Speed Control")
/* loaded from: input_file:com/alibaba/alink/operator/stream/dataproc/SpeedControlStreamOp.class */
public class SpeedControlStreamOp extends StreamOperator<SpeedControlStreamOp> implements SpeedControlParams<SpeedControlStreamOp> {
    private static final long serialVersionUID = 7943599253882942917L;

    public SpeedControlStreamOp() {
        super(new Params());
    }

    public SpeedControlStreamOp(Params params) {
        super(params);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.alibaba.alink.operator.stream.StreamOperator
    public SpeedControlStreamOp linkFrom(StreamOperator<?>... streamOperatorArr) {
        StreamOperator<?> checkAndGetFirst = checkAndGetFirst(streamOperatorArr);
        final double doubleValue = getTimeInterval().doubleValue();
        setOutput((DataStream<Row>) checkAndGetFirst.getDataStream().map(new RichMapFunction<Row, Row>() { // from class: com.alibaba.alink.operator.stream.dataproc.SpeedControlStreamOp.1
            private static final long serialVersionUID = 8928343941684014599L;
            private long sleepTime;
            private long numSamplesPass;
            long cnt = 0;

            public void open(Configuration configuration) throws Exception {
                super.open(configuration);
                int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
                if (doubleValue > 0.001d) {
                    this.sleepTime = numberOfParallelSubtasks * Double.valueOf(Math.ceil(doubleValue * 1000.0d)).longValue();
                    this.numSamplesPass = 1L;
                } else {
                    this.sleepTime = numberOfParallelSubtasks;
                    this.numSamplesPass = Double.valueOf(Math.ceil(1.0d / (doubleValue * 1000.0d))).longValue();
                }
            }

            public Row map(Row row) throws Exception {
                if (this.cnt % this.numSamplesPass == 0) {
                    Thread.sleep(this.sleepTime);
                }
                this.cnt++;
                return row;
            }
        }), checkAndGetFirst.getSchema());
        return this;
    }

    @Override // com.alibaba.alink.operator.stream.StreamOperator
    public /* bridge */ /* synthetic */ SpeedControlStreamOp linkFrom(StreamOperator[] streamOperatorArr) {
        return linkFrom((StreamOperator<?>[]) streamOperatorArr);
    }
}
