package com.alibaba.alink.operator.batch.recommendation;

import com.alibaba.alink.common.annotation.InputPorts;
import com.alibaba.alink.common.annotation.Internal;
import com.alibaba.alink.common.annotation.OutputPorts;
import com.alibaba.alink.common.annotation.PortDesc;
import com.alibaba.alink.common.annotation.PortSpec;
import com.alibaba.alink.common.annotation.PortType;
import com.alibaba.alink.common.annotation.ReservedColsWithSecondInputSpec;
import com.alibaba.alink.common.model.BroadcastVariableModelSource;
import com.alibaba.alink.operator.batch.BatchOperator;
import com.alibaba.alink.operator.batch.recommendation.BaseRecommBatchOp;
import com.alibaba.alink.operator.common.recommendation.FourFunction;
import com.alibaba.alink.operator.common.recommendation.RecommAdapter;
import com.alibaba.alink.operator.common.recommendation.RecommAdapterMT;
import com.alibaba.alink.operator.common.recommendation.RecommKernel;
import com.alibaba.alink.operator.common.recommendation.RecommMapper;
import com.alibaba.alink.operator.common.recommendation.RecommType;
import com.alibaba.alink.params.mapper.ModelMapperParams;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.PartitionOperator;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;

@InputPorts(values = {@PortSpec(PortType.MODEL), @PortSpec(PortType.DATA)})
@OutputPorts(values = {@PortSpec(value = PortType.DATA, desc = PortDesc.OUTPUT_RESULT)})
@Internal
@ReservedColsWithSecondInputSpec
/* loaded from: input_file:com/alibaba/alink/operator/batch/recommendation/BaseRecommBatchOp.class */
public class BaseRecommBatchOp<T extends BaseRecommBatchOp<T>> extends BatchOperator<T> {
    private static final String BROADCAST_MODEL_TABLE_NAME = "broadcastModelTable";
    private static final long serialVersionUID = -5664225520926927756L;
    private final FourFunction<TableSchema, TableSchema, Params, RecommType, RecommKernel> recommKernelBuilder;
    private final RecommType recommType;

    public BaseRecommBatchOp(FourFunction<TableSchema, TableSchema, Params, RecommType, RecommKernel> fourFunction, RecommType recommType, Params params) {
        super(params);
        this.recommKernelBuilder = fourFunction;
        this.recommType = recommType;
    }

    @Override // com.alibaba.alink.operator.batch.BatchOperator
    public T linkFrom(BatchOperator<?>... batchOperatorArr) {
        checkOpSize(2, batchOperatorArr);
        BroadcastVariableModelSource broadcastVariableModelSource = new BroadcastVariableModelSource(BROADCAST_MODEL_TABLE_NAME);
        RecommMapper recommMapper = new RecommMapper(this.recommKernelBuilder, this.recommType, batchOperatorArr[0].getSchema(), batchOperatorArr[1].getSchema(), getParams());
        PartitionOperator rebalance = batchOperatorArr[0].getDataSet().rebalance();
        setOutput((DataSet<Row>) (((Integer) getParams().get(ModelMapperParams.NUM_THREADS)).intValue() <= 1 ? batchOperatorArr[1].getDataSet().map(new RecommAdapter(recommMapper, broadcastVariableModelSource)).withBroadcastSet(rebalance, BROADCAST_MODEL_TABLE_NAME) : batchOperatorArr[1].getDataSet().flatMap(new RecommAdapterMT(recommMapper, broadcastVariableModelSource, ((Integer) getParams().get(ModelMapperParams.NUM_THREADS)).intValue())).withBroadcastSet(rebalance, BROADCAST_MODEL_TABLE_NAME)), recommMapper.getOutputSchema());
        return this;
    }

    @Override // com.alibaba.alink.operator.batch.BatchOperator
    public /* bridge */ /* synthetic */ BatchOperator linkFrom(BatchOperator[] batchOperatorArr) {
        return linkFrom((BatchOperator<?>[]) batchOperatorArr);
    }
}
