package com.alibaba.alink.operator.stream.utils;

import com.alibaba.alink.common.annotation.InputPorts;
import com.alibaba.alink.common.annotation.Internal;
import com.alibaba.alink.common.annotation.OutputPorts;
import com.alibaba.alink.common.annotation.PortDesc;
import com.alibaba.alink.common.annotation.PortSpec;
import com.alibaba.alink.common.annotation.PortType;
import com.alibaba.alink.common.annotation.ReservedColsWithFirstInputSpec;
import com.alibaba.alink.common.exceptions.AkUnclassifiedErrorException;
import com.alibaba.alink.common.mapper.Mapper;
import com.alibaba.alink.common.mapper.MapperAdapter;
import com.alibaba.alink.common.mapper.MapperAdapterMT;
import com.alibaba.alink.operator.stream.StreamOperator;
import com.alibaba.alink.operator.stream.utils.MapStreamOp;
import com.alibaba.alink.params.mapper.MapperParams;
import java.util.function.BiFunction;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;

@InputPorts(values = {@PortSpec(PortType.DATA)})
@OutputPorts(values = {@PortSpec(value = PortType.DATA, desc = PortDesc.OUTPUT_RESULT)})
@Internal
@ReservedColsWithFirstInputSpec
/* loaded from: input_file:com/alibaba/alink/operator/stream/utils/MapStreamOp.class */
public class MapStreamOp<T extends MapStreamOp<T>> extends StreamOperator<T> {
    private static final long serialVersionUID = -1335939787657447127L;
    protected final BiFunction<TableSchema, Params, Mapper> mapperBuilder;

    public MapStreamOp(BiFunction<TableSchema, Params, Mapper> biFunction, Params params) {
        super(params);
        this.mapperBuilder = biFunction;
    }

    @Override // com.alibaba.alink.operator.stream.StreamOperator
    public T linkFrom(StreamOperator<?>... streamOperatorArr) {
        StreamOperator<?> checkAndGetFirst = checkAndGetFirst(streamOperatorArr);
        try {
            Mapper apply = this.mapperBuilder.apply(checkAndGetFirst.getSchema(), getParams());
            setOutput(calcResultRows(checkAndGetFirst, apply, getParams()), apply.getOutputSchema());
            return this;
        } catch (Exception e) {
            throw new AkUnclassifiedErrorException(e.getMessage(), e);
        }
    }

    public static DataStream<Row> calcResultRows(StreamOperator<?> streamOperator, Mapper mapper, Params params) {
        return ((Integer) params.get(MapperParams.NUM_THREADS)).intValue() <= 1 ? streamOperator.getDataStream().map(new MapperAdapter(mapper)) : streamOperator.getDataStream().flatMap(new MapperAdapterMT(mapper, ((Integer) params.get(MapperParams.NUM_THREADS)).intValue()));
    }

    @Override // com.alibaba.alink.operator.stream.StreamOperator
    public /* bridge */ /* synthetic */ StreamOperator linkFrom(StreamOperator[] streamOperatorArr) {
        return linkFrom((StreamOperator<?>[]) streamOperatorArr);
    }
}
