package com.alibaba.alink.operator.stream.utils;

import com.alibaba.alink.common.io.directreader.DataBridge;
import com.alibaba.alink.common.io.directreader.DirectReader;
import com.alibaba.alink.common.mapper.ModelMapper;
import com.alibaba.alink.operator.common.modelstream.ModelStreamUtils;
import com.alibaba.alink.operator.common.recommendation.FourFunction;
import com.alibaba.alink.operator.common.recommendation.RecommKernel;
import com.alibaba.alink.operator.common.recommendation.RecommMapper;
import com.alibaba.alink.operator.common.recommendation.RecommType;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.flink.util.function.TriFunction;

/* loaded from: input_file:com/alibaba/alink/operator/stream/utils/PredictProcess.class */
public class PredictProcess extends RichCoFlatMapFunction<Row, Row, Row> {
    private final DataBridge dataBridge;
    private ModelMapper mapper;
    private final Map<Timestamp, List<Row>> buffers = new HashMap();
    private final int timestampColIndex;
    private final int countColIndex;

    public PredictProcess(TableSchema tableSchema, TableSchema tableSchema2, Params params, TriFunction<TableSchema, TableSchema, Params, ModelMapper> triFunction, DataBridge dataBridge, int i, int i2) {
        this.dataBridge = dataBridge;
        this.mapper = (ModelMapper) triFunction.apply(tableSchema, tableSchema2, params);
        this.timestampColIndex = i;
        this.countColIndex = i2;
    }

    public PredictProcess(TableSchema tableSchema, TableSchema tableSchema2, Params params, FourFunction<TableSchema, TableSchema, Params, RecommType, RecommKernel> fourFunction, RecommType recommType, DataBridge dataBridge, int i, int i2) {
        this.dataBridge = dataBridge;
        this.mapper = new RecommMapper(fourFunction, recommType, tableSchema, tableSchema2, params);
        this.timestampColIndex = i;
        this.countColIndex = i2;
    }

    public void open(Configuration configuration) throws Exception {
        if (this.dataBridge != null) {
            this.mapper.loadModel(DirectReader.directRead(this.dataBridge));
            this.mapper.open();
        }
    }

    public void close() throws Exception {
        super.close();
        this.mapper.close();
    }

    public void flatMap1(Row row, Collector<Row> collector) throws Exception {
        collector.collect(this.mapper.map(row));
    }

    public void flatMap2(Row row, Collector<Row> collector) {
        Timestamp timestamp = (Timestamp) row.getField(this.timestampColIndex);
        long longValue = ((Long) row.getField(this.countColIndex)).longValue();
        Row genRowWithoutIdentifier = ModelStreamUtils.genRowWithoutIdentifier(row, this.timestampColIndex, this.countColIndex);
        if (!this.buffers.containsKey(timestamp) || this.buffers.get(timestamp).size() != ((int) longValue) - 1) {
            if (this.buffers.containsKey(timestamp)) {
                this.buffers.get(timestamp).add(genRowWithoutIdentifier);
                return;
            }
            ArrayList arrayList = new ArrayList(0);
            arrayList.add(genRowWithoutIdentifier);
            this.buffers.put(timestamp, arrayList);
            return;
        }
        if (this.buffers.containsKey(timestamp)) {
            this.buffers.get(timestamp).add(genRowWithoutIdentifier);
        } else {
            ArrayList arrayList2 = new ArrayList(0);
            arrayList2.add(genRowWithoutIdentifier);
            this.buffers.put(timestamp, arrayList2);
        }
        try {
            ModelMapper createNew = this.mapper.createNew(this.buffers.get(timestamp));
            createNew.open();
            this.mapper = createNew;
            this.buffers.get(timestamp).clear();
        } catch (Exception e) {
            System.err.println("Model stream updating failed. Please check your model stream." + e);
        }
    }

    public /* bridge */ /* synthetic */ void flatMap2(Object obj, Collector collector) throws Exception {
        flatMap2((Row) obj, (Collector<Row>) collector);
    }

    public /* bridge */ /* synthetic */ void flatMap1(Object obj, Collector collector) throws Exception {
        flatMap1((Row) obj, (Collector<Row>) collector);
    }
}
