package com.alibaba.alink.operator.stream.sql;

import com.alibaba.alink.common.MLEnvironmentFactory;
import com.alibaba.alink.common.annotation.InputPorts;
import com.alibaba.alink.common.annotation.NameCn;
import com.alibaba.alink.common.annotation.NameEn;
import com.alibaba.alink.common.annotation.OutputPorts;
import com.alibaba.alink.common.annotation.PortSpec;
import com.alibaba.alink.common.annotation.PortType;
import com.alibaba.alink.common.utils.TableUtil;
import com.alibaba.alink.operator.stream.StreamOperator;
import com.alibaba.alink.params.sql.WindowGroupByParams;
import java.sql.Timestamp;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.types.Row;
import org.apache.flink.util.StringUtils;

@InputPorts(values = {@PortSpec(PortType.DATA)})
@OutputPorts(values = {@PortSpec(PortType.DATA)})
@NameCn("SQL操作：WindowGroupBy")
@NameEn("SQL：WindowGroupBy")
/* loaded from: input_file:com/alibaba/alink/operator/stream/sql/WindowGroupByStreamOp.class */
public final class WindowGroupByStreamOp extends StreamOperator<WindowGroupByStreamOp> implements WindowGroupByParams<WindowGroupByStreamOp> {
    private static final long serialVersionUID = -7231017642191624430L;

    public WindowGroupByStreamOp() {
        this(new Params());
    }

    public WindowGroupByStreamOp(Params params) {
        super(params);
    }

    private static DataStream<Row> getDataStreamWithExplicitTypeDefine(DataStream<Row> dataStream, String[] strArr, TypeInformation<?>[] typeInformationArr) {
        return dataStream.map(new MapFunction<Row, Row>() { // from class: com.alibaba.alink.operator.stream.sql.WindowGroupByStreamOp.1
            private static final long serialVersionUID = 1424209144153966514L;

            public Row map(Row row) throws Exception {
                return row;
            }
        }).returns(new RowTypeInfo(typeInformationArr, strArr));
    }

    String createSelectClause(String[] strArr) {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < strArr.length; i++) {
            if (i > 0) {
                sb.append(",");
            }
            sb.append(strArr[i]);
        }
        return sb.toString();
    }

    /* JADX WARN: Can't rename method to resolve collision */
    /* JADX WARN: Multi-variable type inference failed */
    @Override // com.alibaba.alink.operator.stream.StreamOperator
    public WindowGroupByStreamOp linkFrom(StreamOperator<?>... streamOperatorArr) {
        String format;
        StreamOperator<?> checkAndGetFirst = checkAndGetFirst(streamOperatorArr);
        String createUniqueTableName = StreamOperator.createUniqueTableName();
        MLEnvironmentFactory.get(getMLEnvironmentId()).getStreamTableEnvironment().registerDataStream(createUniqueTableName, getDataStreamWithExplicitTypeDefine(checkAndGetFirst.getDataStream(), checkAndGetFirst.getColNames(), checkAndGetFirst.getColTypes()), createSelectClause(checkAndGetFirst.getColNames()) + ",proctime.proctime");
        WindowGroupByParams.WindowType windowType = getWindowType();
        String intervalUnit = getIntervalUnit().toString();
        switch (windowType) {
            case TUMBLE:
                format = String.format("proctime, INTERVAL '%d' %s", Integer.valueOf(getWindowLength().intValue()), intervalUnit);
                break;
            case HOP:
                format = String.format("proctime, INTERVAL '%d' %s, INTERVAL '%d' %s", Integer.valueOf(getSlidingLength().intValue()), intervalUnit, Integer.valueOf(getWindowLength().intValue()), intervalUnit);
                break;
            case SESSION:
                format = String.format("proctime, INTERVAL '%d' %s", Integer.valueOf(getSessionGap().intValue()), intervalUnit);
                break;
            default:
                throw new IllegalArgumentException("invalid window type: " + windowType);
        }
        String selectClause = getSelectClause();
        String groupByClause = getGroupByClause();
        boolean z = !StringUtils.isNullOrWhitespaceOnly(groupByClause);
        String format2 = String.format("SELECT %s, CAST((%s_start(%s)) as TIMESTAMP(3)) as window_start, CAST((%s_end(%s)) as TIMESTAMP(3)) as window_end FROM %s GROUP BY %s(%s)", selectClause, windowType, format, windowType, format, createUniqueTableName, windowType, format);
        if (z) {
            format2 = format2 + String.format(", %s", groupByClause);
        }
        try {
            setOutputTable(((StreamOperator) MLEnvironmentFactory.get(checkAndGetFirst.getMLEnvironmentId()).streamSQL(format2).setMLEnvironmentId(getMLEnvironmentId())).getOutputTable());
            final int findColIndexWithAssertAndHint = TableUtil.findColIndexWithAssertAndHint(getOutputTable().getSchema().getFieldNames(), "window_start");
            final int findColIndexWithAssertAndHint2 = TableUtil.findColIndexWithAssertAndHint(getOutputTable().getSchema().getFieldNames(), "window_end");
            setOutput((DataStream<Row>) getDataStream().map(new RichMapFunction<Row, Row>() { // from class: com.alibaba.alink.operator.stream.sql.WindowGroupByStreamOp.2
                private static final long serialVersionUID = -7654428798338632450L;

                public Row map(Row row) throws Exception {
                    Timestamp timestamp = (Timestamp) row.getField(findColIndexWithAssertAndHint);
                    Timestamp timestamp2 = (Timestamp) row.getField(findColIndexWithAssertAndHint2);
                    long currentTimeMillis = System.currentTimeMillis();
                    long time = timestamp.getTime();
                    long time2 = timestamp2.getTime();
                    long round = Math.round((currentTimeMillis - time2) / 3600000.0d) * 1000 * 3600;
                    row.setField(findColIndexWithAssertAndHint, new Timestamp(time + round));
                    row.setField(findColIndexWithAssertAndHint2, new Timestamp(time2 + round));
                    return row;
                }
            }).name("correct_window_timezone"), getOutputTable().getSchema());
            return this;
        } catch (Exception e) {
            throw new IllegalArgumentException("Invalid input: " + format2 + ", because: " + e);
        }
    }

    @Override // com.alibaba.alink.operator.stream.StreamOperator
    public /* bridge */ /* synthetic */ WindowGroupByStreamOp linkFrom(StreamOperator[] streamOperatorArr) {
        return linkFrom((StreamOperator<?>[]) streamOperatorArr);
    }
}
