package com.alibaba.alink.operator.batch.utils;

import com.alibaba.alink.common.annotation.InputPorts;
import com.alibaba.alink.common.annotation.Internal;
import com.alibaba.alink.common.annotation.OutputPorts;
import com.alibaba.alink.common.annotation.PortDesc;
import com.alibaba.alink.common.annotation.PortSpec;
import com.alibaba.alink.common.annotation.PortType;
import com.alibaba.alink.common.annotation.ReservedColsWithFirstInputSpec;
import com.alibaba.alink.common.exceptions.AkUnclassifiedErrorException;
import com.alibaba.alink.common.exceptions.ExceptionWithErrorCode;
import com.alibaba.alink.common.mapper.FlatMapper;
import com.alibaba.alink.common.mapper.FlatMapperAdapter;
import com.alibaba.alink.common.mapper.FlatMapperAdapterMT;
import com.alibaba.alink.operator.batch.BatchOperator;
import com.alibaba.alink.operator.batch.utils.FlatMapBatchOp;
import com.alibaba.alink.params.mapper.MapperParams;
import java.util.function.BiFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;

@InputPorts(values = {@PortSpec(PortType.DATA)})
@OutputPorts(values = {@PortSpec(value = PortType.DATA, desc = PortDesc.OUTPUT_RESULT)})
@Internal
@ReservedColsWithFirstInputSpec
/* loaded from: input_file:com/alibaba/alink/operator/batch/utils/FlatMapBatchOp.class */
public class FlatMapBatchOp<T extends FlatMapBatchOp<T>> extends BatchOperator<T> {
    private static final long serialVersionUID = 7763733109595053461L;
    private final BiFunction<TableSchema, Params, FlatMapper> mapperBuilder;

    public FlatMapBatchOp(BiFunction<TableSchema, Params, FlatMapper> biFunction, Params params) {
        super(params);
        this.mapperBuilder = biFunction;
    }

    @Override // com.alibaba.alink.operator.batch.BatchOperator
    public T linkFrom(BatchOperator<?>... batchOperatorArr) {
        BatchOperator<?> checkAndGetFirst = checkAndGetFirst(batchOperatorArr);
        try {
            FlatMapper apply = this.mapperBuilder.apply(checkAndGetFirst.getSchema(), getParams());
            setOutput(calcResultRows(checkAndGetFirst, apply, getParams()), apply.getOutputSchema());
            return this;
        } catch (ExceptionWithErrorCode e) {
            throw e;
        } catch (Exception e2) {
            throw new AkUnclassifiedErrorException("Error. ", e2);
        }
    }

    public static DataSet<Row> calcResultRows(BatchOperator<?> batchOperator, FlatMapper flatMapper, Params params) {
        return ((Integer) params.get(MapperParams.NUM_THREADS)).intValue() <= 1 ? batchOperator.getDataSet().flatMap(new FlatMapperAdapter(flatMapper)) : batchOperator.getDataSet().flatMap(new FlatMapperAdapterMT(flatMapper, ((Integer) params.get(MapperParams.NUM_THREADS)).intValue()));
    }

    @Override // com.alibaba.alink.operator.batch.BatchOperator
    public /* bridge */ /* synthetic */ BatchOperator linkFrom(BatchOperator[] batchOperatorArr) {
        return linkFrom((BatchOperator<?>[]) batchOperatorArr);
    }
}
