package com.alibaba.alink.operator.local.utils;

import com.alibaba.alink.common.MTable;
import com.alibaba.alink.common.annotation.InputPorts;
import com.alibaba.alink.common.annotation.Internal;
import com.alibaba.alink.common.annotation.OutputPorts;
import com.alibaba.alink.common.annotation.PortDesc;
import com.alibaba.alink.common.annotation.PortSpec;
import com.alibaba.alink.common.annotation.PortType;
import com.alibaba.alink.common.annotation.ReservedColsWithFirstInputSpec;
import com.alibaba.alink.common.exceptions.AkIllegalDataException;
import com.alibaba.alink.common.exceptions.AkUnclassifiedErrorException;
import com.alibaba.alink.common.exceptions.ExceptionWithErrorCode;
import com.alibaba.alink.common.mapper.Mapper;
import com.alibaba.alink.operator.local.AlinkLocalSession;
import com.alibaba.alink.operator.local.LocalOperator;
import com.alibaba.alink.operator.local.utils.MapLocalOp;
import com.alibaba.alink.params.shared.HasNumThreads;
import java.util.Arrays;
import java.util.List;
import java.util.function.BiFunction;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InputPorts(values = {@PortSpec(PortType.DATA)})
@OutputPorts(values = {@PortSpec(value = PortType.DATA, desc = PortDesc.OUTPUT_RESULT)})
@Internal
@ReservedColsWithFirstInputSpec
/* loaded from: input_file:com/alibaba/alink/operator/local/utils/MapLocalOp.class */
public class MapLocalOp<T extends MapLocalOp<T>> extends LocalOperator<T> {
    private static final Logger LOG = LoggerFactory.getLogger(MapLocalOp.class);
    protected final BiFunction<TableSchema, Params, Mapper> mapperBuilder;

    public MapLocalOp(BiFunction<TableSchema, Params, Mapper> biFunction, Params params) {
        super(params);
        this.mapperBuilder = biFunction;
    }

    @Override // com.alibaba.alink.operator.local.LocalOperator
    public T linkFrom(LocalOperator<?>... localOperatorArr) {
        LocalOperator<?> checkAndGetFirst = checkAndGetFirst(localOperatorArr);
        try {
            Mapper apply = this.mapperBuilder.apply(checkAndGetFirst.getSchema(), getParams());
            apply.open();
            setOutputTable(new MTable(execMapper(checkAndGetFirst, apply, getParams()), apply.getOutputSchema()));
            apply.close();
            return this;
        } catch (ExceptionWithErrorCode e) {
            throw e;
        } catch (Exception e2) {
            throw new AkUnclassifiedErrorException("Error. ", e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static List<Row> execMapper(LocalOperator<?> localOperator, Mapper mapper, Params params) {
        int defaultNumThreads = LocalOperator.getDefaultNumThreads();
        if (params.contains(HasNumThreads.NUM_THREADS)) {
            defaultNumThreads = ((Integer) params.get(HasNumThreads.NUM_THREADS)).intValue();
        }
        AlinkLocalSession.TaskRunner taskRunner = new AlinkLocalSession.TaskRunner();
        List<Row> rows = localOperator.getOutputTable().getRows();
        int size = rows.size();
        List<Row> asList = Arrays.asList(new Row[size]);
        for (int i = 0; i < defaultNumThreads; i++) {
            int startPos = (int) AlinkLocalSession.DISTRIBUTOR.startPos(i, defaultNumThreads, size);
            int localRowCnt = (int) AlinkLocalSession.DISTRIBUTOR.localRowCnt(i, defaultNumThreads, size);
            if (localRowCnt > 0) {
                taskRunner.submit(() -> {
                    for (int i2 = startPos; i2 < startPos + localRowCnt; i2++) {
                        try {
                            asList.set(i2, mapper.map((Row) rows.get(i2)));
                        } catch (Exception e) {
                            LOG.error("Execute mapper error.", e);
                            throw new AkIllegalDataException("Map error on the data : " + ((Row) asList.get(i2)).toString());
                        }
                    }
                });
            }
        }
        taskRunner.join();
        return asList;
    }

    @Override // com.alibaba.alink.operator.local.LocalOperator
    public /* bridge */ /* synthetic */ LocalOperator linkFrom(LocalOperator[] localOperatorArr) {
        return linkFrom((LocalOperator<?>[]) localOperatorArr);
    }
}
