package com.alibaba.alink.operator.stream.recommendation;

import com.alibaba.alink.common.annotation.InputPorts;
import com.alibaba.alink.common.annotation.Internal;
import com.alibaba.alink.common.annotation.NameCn;
import com.alibaba.alink.common.annotation.OutputPorts;
import com.alibaba.alink.common.annotation.PortDesc;
import com.alibaba.alink.common.annotation.PortSpec;
import com.alibaba.alink.common.annotation.PortType;
import com.alibaba.alink.common.annotation.ReservedColsWithSecondInputSpec;
import com.alibaba.alink.common.exceptions.AkUnclassifiedErrorException;
import com.alibaba.alink.common.io.directreader.DataBridge;
import com.alibaba.alink.common.io.directreader.DirectReader;
import com.alibaba.alink.common.model.DataBridgeModelSource;
import com.alibaba.alink.common.utils.TableUtil;
import com.alibaba.alink.operator.batch.BatchOperator;
import com.alibaba.alink.operator.common.modelstream.ModelStreamUtils;
import com.alibaba.alink.operator.common.recommendation.FourFunction;
import com.alibaba.alink.operator.common.recommendation.RecommAdapter;
import com.alibaba.alink.operator.common.recommendation.RecommAdapterMT;
import com.alibaba.alink.operator.common.recommendation.RecommKernel;
import com.alibaba.alink.operator.common.recommendation.RecommMapper;
import com.alibaba.alink.operator.common.recommendation.RecommType;
import com.alibaba.alink.operator.stream.StreamOperator;
import com.alibaba.alink.operator.stream.recommendation.BaseRecommStreamOp;
import com.alibaba.alink.operator.stream.source.ModelStreamFileSourceStreamOp;
import com.alibaba.alink.operator.stream.utils.ModelMapStreamOp;
import com.alibaba.alink.operator.stream.utils.PredictProcess;
import com.alibaba.alink.params.ModelStreamScanParams;
import com.alibaba.alink.params.mapper.ModelMapperParams;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;

@InputPorts(values = {@PortSpec(value = PortType.MODEL, opType = PortSpec.OpType.BATCH), @PortSpec(PortType.DATA), @PortSpec(value = PortType.MODEL_STREAM, isOptional = true)})
@OutputPorts(values = {@PortSpec(value = PortType.DATA, desc = PortDesc.OUTPUT_RESULT)})
@Internal
@ReservedColsWithSecondInputSpec
@NameCn("推荐基类")
/* loaded from: input_file:com/alibaba/alink/operator/stream/recommendation/BaseRecommStreamOp.class */
public class BaseRecommStreamOp<T extends BaseRecommStreamOp<T>> extends StreamOperator<T> implements ModelStreamScanParams<T> {
    private static final long serialVersionUID = 5293170481337594373L;
    private final BatchOperator<?> model;
    private final FourFunction<TableSchema, TableSchema, Params, RecommType, RecommKernel> recommKernelBuilder;
    private final RecommType recommType;

    public BaseRecommStreamOp(BatchOperator<?> batchOperator, FourFunction<TableSchema, TableSchema, Params, RecommType, RecommKernel> fourFunction, RecommType recommType, Params params) {
        super(params);
        this.model = batchOperator;
        this.recommKernelBuilder = fourFunction;
        this.recommType = recommType;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // com.alibaba.alink.operator.stream.StreamOperator
    public T linkFrom(StreamOperator<?>... streamOperatorArr) {
        StreamOperator<?> checkAndGetFirst = checkAndGetFirst(streamOperatorArr);
        TableSchema schema = this.model.getSchema();
        try {
            DataBridge collect = DirectReader.collect(this.model);
            DataBridgeModelSource dataBridgeModelSource = new DataBridgeModelSource(collect);
            RecommMapper recommMapper = new RecommMapper(this.recommKernelBuilder, this.recommType, schema, checkAndGetFirst.getSchema(), getParams());
            DataStream<Row> dataStream = null;
            TableSchema tableSchema = null;
            if (ModelStreamUtils.useModelStreamFile(getParams())) {
                StreamOperator streamOperator = (StreamOperator) new ModelStreamFileSourceStreamOp().setFilePath(getModelStreamFilePath()).setScanInterval(getModelStreamScanInterval()).setStartTime(getModelStreamStartTime()).setSchemaStr(TableUtil.schema2SchemaStr(schema)).setMLEnvironmentId(getMLEnvironmentId());
                tableSchema = streamOperator.getSchema();
                dataStream = streamOperator.getDataStream();
            }
            if (streamOperatorArr.length > 1) {
                StreamOperator<?> streamOperator2 = streamOperatorArr[1];
                if (dataStream == null) {
                    tableSchema = streamOperator2.getSchema();
                    dataStream = streamOperator2.getDataStream();
                } else {
                    dataStream = dataStream.union(new DataStream[]{streamOperator2.select(tableSchema.getFieldNames()).getDataStream()});
                }
            }
            setOutput((DataStream<Row>) (dataStream != null ? checkAndGetFirst.getDataStream().connect(ModelMapStreamOp.broadcastStream(dataStream)).flatMap(new PredictProcess(schema, checkAndGetFirst.getSchema(), getParams(), this.recommKernelBuilder, this.recommType, collect, ModelStreamUtils.findTimestampColIndexWithAssertAndHint(tableSchema), ModelStreamUtils.findCountColIndexWithAssertAndHint(tableSchema))) : ((Integer) getParams().get(ModelMapperParams.NUM_THREADS)).intValue() <= 1 ? checkAndGetFirst.getDataStream().map(new RecommAdapter(recommMapper, dataBridgeModelSource)) : checkAndGetFirst.getDataStream().flatMap(new RecommAdapterMT(recommMapper, dataBridgeModelSource, ((Integer) getParams().get(ModelMapperParams.NUM_THREADS)).intValue()))), recommMapper.getOutputSchema());
            return this;
        } catch (Exception e) {
            throw new AkUnclassifiedErrorException(e.toString());
        }
    }

    @Override // com.alibaba.alink.operator.stream.StreamOperator
    public /* bridge */ /* synthetic */ StreamOperator linkFrom(StreamOperator[] streamOperatorArr) {
        return linkFrom((StreamOperator<?>[]) streamOperatorArr);
    }
}
