package com.alibaba.alink.operator.stream.recommendation;

import com.alibaba.alink.common.MTable;
import com.alibaba.alink.common.annotation.NameCn;
import com.alibaba.alink.common.annotation.NameEn;
import com.alibaba.alink.common.utils.TableUtil;
import com.alibaba.alink.operator.common.nlp.WordCountUtil;
import com.alibaba.alink.operator.stream.StreamOperator;
import com.alibaba.alink.operator.stream.feature.HopTimeWindowStreamOp;
import com.alibaba.alink.operator.stream.feature.OverTimeWindowStreamOp;
import com.alibaba.alink.operator.stream.feature.TumbleTimeWindowStreamOp;
import com.alibaba.alink.operator.stream.source.TableSourceStreamOp;
import com.alibaba.alink.operator.stream.utils.DataStreamConversionUtil;
import com.alibaba.alink.params.recommendation.HotProductParams;
import java.sql.Timestamp;
import java.util.ArrayList;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

@NameCn("热点推荐")
@NameEn("Hot product")
/* loaded from: input_file:com/alibaba/alink/operator/stream/recommendation/HotProductStreamOp.class */
public class HotProductStreamOp extends StreamOperator<HotProductStreamOp> implements HotProductParams<HotProductStreamOp> {

    /* loaded from: input_file:com/alibaba/alink/operator/stream/recommendation/HotProductStreamOp$AdjustTimeStamp.class */
    public static class AdjustTimeStamp extends ScalarFunction {
        final long offset_time;

        public AdjustTimeStamp(double d) {
            this.offset_time = (long) (1000.0d * d);
        }

        public Timestamp eval(Timestamp timestamp) {
            return new Timestamp(timestamp.getTime() - this.offset_time);
        }
    }

    /* loaded from: input_file:com/alibaba/alink/operator/stream/recommendation/HotProductStreamOp$FromUnixTimestamp.class */
    public static class FromUnixTimestamp extends ScalarFunction {
        public Timestamp eval(Long l) {
            return new Timestamp(l.longValue() * 1000);
        }
    }

    /* loaded from: input_file:com/alibaba/alink/operator/stream/recommendation/HotProductStreamOp$TopNHotList.class */
    public static class TopNHotList extends ScalarFunction {
        final int n;
        final String countCol;

        public TopNHotList(int i, String str) {
            this.n = i;
            this.countCol = str;
        }

        public MTable eval(MTable mTable) {
            mTable.orderBy(new String[]{this.countCol}, new boolean[]{false});
            if (mTable.getNumRow() <= this.n) {
                return mTable;
            }
            ArrayList arrayList = new ArrayList();
            for (int i = 0; i < this.n; i++) {
                arrayList.add(mTable.getRow(i));
            }
            return new MTable(arrayList, mTable.getSchema());
        }
    }

    public HotProductStreamOp() {
        this(null);
    }

    public HotProductStreamOp(Params params) {
        super(params);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.alibaba.alink.operator.stream.StreamOperator
    public HotProductStreamOp linkFrom(StreamOperator<?>... streamOperatorArr) {
        StreamOperator<?> checkAndGetFirst = checkAndGetFirst(streamOperatorArr);
        String timeCol = getTimeCol();
        String productCol = getProductCol();
        String windowTime = getWindowTime();
        String hopTime = getHopTime();
        setOutputTable(addTriggerData(((HopTimeWindowStreamOp) addTriggerData(checkAndGetFirst.select(productCol + ", " + timeCol + ", 1 AS cnt").link(new TumbleTimeWindowStreamOp().setWindowTime(hopTime).setTimeCol(timeCol).setGroupCols(productCol).setClause(String.format("TUMBLE_START() AS %s,TUMBLE_END() AS %s, SUM(cnt) AS cnt", timeCol, "timeEnd"))), timeCol, WordCountUtil.COUNT_COL_NAME, OverTimeWindowStreamOp.getIntervalBySecond(getHopTime())).link(new HopTimeWindowStreamOp().setWindowTime(windowTime).setHopTime(hopTime).setTimeCol(timeCol).setGroupCols(productCol).setClause(String.format("HOP_START() AS %s, HOP_END() AS %s, SUM(cnt) AS cnt", "startTime", timeCol)))).udf(timeCol, timeCol, new AdjustTimeStamp(OverTimeWindowStreamOp.getIntervalBySecond(hopTime))), timeCol, WordCountUtil.COUNT_COL_NAME, OverTimeWindowStreamOp.getIntervalBySecond(getHopTime())).link(new TumbleTimeWindowStreamOp().setWindowTime(hopTime).setTimeCol(timeCol).setClause(String.format("TUMBLE_END() AS %s, MTABLE_AGG(%s, cnt) AS mt", timeCol, productCol))).udf("mt", "mt", new TopNHotList(getTopN().intValue(), WordCountUtil.COUNT_COL_NAME)).getOutputTable());
        return this;
    }

    private static StreamOperator<?> addTriggerData(StreamOperator<?> streamOperator, String str, String str2, final double d) {
        final int findColIndex = TableUtil.findColIndex(streamOperator.getColNames(), str);
        final int findColIndex2 = TableUtil.findColIndex(streamOperator.getColNames(), str2);
        return new TableSourceStreamOp(DataStreamConversionUtil.toTable(streamOperator.getMLEnvironmentId(), (DataStream<Row>) streamOperator.getDataStream().flatMap(new FlatMapFunction<Row, Row>() { // from class: com.alibaba.alink.operator.stream.recommendation.HotProductStreamOp.1
            public void flatMap(Row row, Collector<Row> collector) throws Exception {
                Timestamp timestamp = (Timestamp) row.getField(findColIndex);
                collector.collect(row);
                Row copy = Row.copy(row);
                copy.setField(findColIndex, new Timestamp(timestamp.getTime() + (1000 * ((long) d))));
                copy.setField(findColIndex2, 0);
                collector.collect(copy);
            }

            public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
                flatMap((Row) obj, (Collector<Row>) collector);
            }
        }), streamOperator.getSchema()));
    }

    @Override // com.alibaba.alink.operator.stream.StreamOperator
    public /* bridge */ /* synthetic */ HotProductStreamOp linkFrom(StreamOperator[] streamOperatorArr) {
        return linkFrom((StreamOperator<?>[]) streamOperatorArr);
    }
}
