package com.alibaba.alink.operator.common.aps;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.functions.RichCoGroupFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/alink/operator/common/aps/ApsFuncTrain.class */
public abstract class ApsFuncTrain<DT, MT> extends RichCoGroupFunction<Tuple3<Integer, Long, MT>, Tuple3<Integer, Integer, DT>, Tuple2<Long, MT>> {
    private static final Logger LOG = LoggerFactory.getLogger(ApsFuncTrain.class);
    private static final long serialVersionUID = 432623425253133299L;
    protected Params contextParams = null;
    private int pid = -1;

    public void open(Configuration configuration) throws Exception {
        LOG.info("{}:{}", Thread.currentThread().getName(), "open");
        if (getRuntimeContext().hasBroadcastVariable("TrainSubset")) {
            this.contextParams = (Params) getRuntimeContext().getBroadcastVariable("TrainSubset").get(0);
        }
    }

    public void close() throws Exception {
        LOG.info("{}:{}", Thread.currentThread().getName(), "close");
    }

    public void coGroup(Iterable<Tuple3<Integer, Long, MT>> iterable, Iterable<Tuple3<Integer, Integer, DT>> iterable2, Collector<Tuple2<Long, MT>> collector) throws Exception {
        ArrayList arrayList = new ArrayList();
        for (Tuple3<Integer, Long, MT> tuple3 : iterable) {
            arrayList.add(new Tuple2(tuple3.f1, tuple3.f2));
        }
        ArrayList arrayList2 = new ArrayList();
        for (Tuple3<Integer, Integer, DT> tuple32 : iterable2) {
            this.pid = ((Integer) tuple32.f0).intValue();
            arrayList2.add(tuple32.f2);
        }
        HashMap hashMap = new HashMap(arrayList.size());
        for (int i = 0; i < arrayList.size(); i++) {
            hashMap.put(((Tuple2) arrayList.get(i)).f0, Integer.valueOf(i));
        }
        Iterator<Tuple2<Long, MT>> it = train(arrayList, hashMap, arrayList2).iterator();
        while (it.hasNext()) {
            collector.collect(it.next());
        }
    }

    public int getPatitionId() {
        return this.pid;
    }

    protected abstract List<Tuple2<Long, MT>> train(List<Tuple2<Long, MT>> list, Map<Long, Integer> map, List<DT> list2) throws Exception;
}
