package com.alibaba.alink.operator.stream.dataproc;

import com.alibaba.alink.common.annotation.InputPorts;
import com.alibaba.alink.common.annotation.NameCn;
import com.alibaba.alink.common.annotation.NameEn;
import com.alibaba.alink.common.annotation.OutputPorts;
import com.alibaba.alink.common.annotation.PortDesc;
import com.alibaba.alink.common.annotation.PortSpec;
import com.alibaba.alink.common.annotation.PortType;
import com.alibaba.alink.operator.stream.StreamOperator;
import com.alibaba.alink.params.dataproc.AppendIdStreamParams;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.types.Row;

@InputPorts(values = {@PortSpec(PortType.DATA)})
@OutputPorts(values = {@PortSpec(value = PortType.DATA, desc = PortDesc.OUTPUT_RESULT)})
@NameCn("流式增加ID列")
@NameEn("Append ID")
/* loaded from: input_file:com/alibaba/alink/operator/stream/dataproc/AppendIdStreamOp.class */
public class AppendIdStreamOp extends StreamOperator<AppendIdStreamOp> implements AppendIdStreamParams<AppendIdStreamOp> {
    private static final long serialVersionUID = -6309808493226982591L;

    public AppendIdStreamOp() {
        this(new Params());
    }

    public AppendIdStreamOp(Params params) {
        super(params);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.alibaba.alink.operator.stream.StreamOperator
    public AppendIdStreamOp linkFrom(StreamOperator<?>... streamOperatorArr) {
        StreamOperator<?> checkAndGetFirst = checkAndGetFirst(streamOperatorArr);
        setOutput(checkAndGetFirst.getDataStream().map(new RichMapFunction<Row, Row>() { // from class: com.alibaba.alink.operator.stream.dataproc.AppendIdStreamOp.1
            private static final long serialVersionUID = 8195432332635374903L;
            transient long currIdx;
            transient int numTasks;
            transient int taskId;
            transient Row reused;

            public void open(Configuration configuration) throws Exception {
                this.numTasks = getRuntimeContext().getNumberOfParallelSubtasks();
                this.taskId = getRuntimeContext().getIndexOfThisSubtask();
                this.currIdx = this.taskId;
            }

            public Row map(Row row) throws Exception {
                if (this.reused == null || this.reused.getArity() != row.getArity() + 1) {
                    this.reused = new Row(row.getArity() + 1);
                }
                for (int i = 0; i < row.getArity(); i++) {
                    this.reused.setField(i, row.getField(i));
                }
                this.reused.setField(row.getArity(), Long.valueOf(this.currIdx));
                this.currIdx += this.numTasks;
                return this.reused;
            }
        }).name("append_id"), (String[]) ArrayUtils.add(checkAndGetFirst.getColNames(), getIdCol()), (TypeInformation[]) ArrayUtils.add(checkAndGetFirst.getColTypes(), Types.LONG));
        return this;
    }

    @Override // com.alibaba.alink.operator.stream.StreamOperator
    public /* bridge */ /* synthetic */ AppendIdStreamOp linkFrom(StreamOperator[] streamOperatorArr) {
        return linkFrom((StreamOperator<?>[]) streamOperatorArr);
    }
}
