package com.alibaba.alink.operator.common.feature.featurebuilder;

import com.alibaba.alink.common.AlinkGlobalConfiguration;
import com.alibaba.alink.common.MLEnvironmentFactory;
import com.alibaba.alink.common.annotation.InputPorts;
import com.alibaba.alink.common.annotation.NameCn;
import com.alibaba.alink.common.annotation.OutputPorts;
import com.alibaba.alink.common.annotation.ParamSelectColumnSpec;
import com.alibaba.alink.common.annotation.ParamSelectColumnSpecs;
import com.alibaba.alink.common.annotation.PortDesc;
import com.alibaba.alink.common.annotation.PortSpec;
import com.alibaba.alink.common.annotation.PortType;
import com.alibaba.alink.common.annotation.ReservedColsWithFirstInputSpec;
import com.alibaba.alink.common.annotation.TypeCollections;
import com.alibaba.alink.common.utils.TableUtil;
import com.alibaba.alink.operator.common.feature.featurebuilder.BaseWindowStreamOp;
import com.alibaba.alink.operator.common.feature.featurebuilder.FeatureClauseUtil;
import com.alibaba.alink.operator.stream.StreamOperator;
import com.alibaba.alink.operator.stream.utils.DataStreamConversionUtil;
import com.alibaba.alink.params.feature.featuregenerator.BaseWindowParams;
import java.sql.Timestamp;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

@InputPorts(values = {@PortSpec(PortType.DATA)})
@OutputPorts(values = {@PortSpec(value = PortType.DATA, desc = PortDesc.OUTPUT_RESULT)})
@ParamSelectColumnSpecs({@ParamSelectColumnSpec(name = "partitionCols"), @ParamSelectColumnSpec(name = "timeCol", allowedTypeCollections = {TypeCollections.TIMESTAMP_TYPES})})
@NameCn("")
@ReservedColsWithFirstInputSpec
/* loaded from: input_file:com/alibaba/alink/operator/common/feature/featurebuilder/BaseWindowStreamOp.class */
public abstract class BaseWindowStreamOp<T extends BaseWindowStreamOp<T>> extends StreamOperator<T> implements BaseWindowParams<T> {
    static String ROW_TIME_COL_NAME = "_row_time_col_name_";
    static String WATERMARK_COL_NAME = ROW_TIME_COL_NAME + ".rowtime";
    String[] inputColNames;

    /* loaded from: input_file:com/alibaba/alink/operator/common/feature/featurebuilder/BaseWindowStreamOp$PeriodExtendBound.class */
    public static class PeriodExtendBound extends BoundedOutOfOrdernessTimestampExtractor<Row> {
        private static final long serialVersionUID = 4066823127559292096L;
        int timeIndex;

        public PeriodExtendBound(Time time, int i) {
            super(time);
            this.timeIndex = i;
        }

        public long extractTimestamp(Row row) {
            return ((Long) row.getField(this.timeIndex)).longValue();
        }
    }

    /* loaded from: input_file:com/alibaba/alink/operator/common/feature/featurebuilder/BaseWindowStreamOp$PunctuatedAssigner.class */
    public static class PunctuatedAssigner implements AssignerWithPunctuatedWatermarks<Row> {
        private static final long serialVersionUID = 2992934406761649522L;
        long lateness;
        int timeIndex;
        private long currentMaxTimestamp = -9223372036854775807L;

        PunctuatedAssigner(Time time, int i) {
            this.lateness = time.toMilliseconds();
            this.timeIndex = i;
        }

        public long extractTimestamp(Row row, long j) {
            long longValue = ((Long) row.getField(this.timeIndex)).longValue();
            this.currentMaxTimestamp = Math.max(longValue, this.currentMaxTimestamp);
            return longValue;
        }

        public Watermark checkAndGetNextWatermark(Row row, long j) {
            return new Watermark(this.currentMaxTimestamp - this.lateness);
        }
    }

    public BaseWindowStreamOp(Params params) {
        super(params);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // com.alibaba.alink.operator.stream.StreamOperator
    public T linkFrom(StreamOperator<?>... streamOperatorArr) {
        generateWindowClause();
        long longValue = getMLEnvironmentId().longValue();
        StreamTableEnvironment streamTableEnvironment = MLEnvironmentFactory.get(Long.valueOf(longValue)).getStreamTableEnvironment();
        MLEnvironmentFactory.get(Long.valueOf(longValue)).getStreamExecutionEnvironment().setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        String createUniqueTableName = StreamOperator.createUniqueTableName();
        StreamOperator<?> checkAndGetFirst = checkAndGetFirst(streamOperatorArr);
        this.inputColNames = checkAndGetFirst.getColNames();
        String str = (String) getParams().get(BaseWindowParams.TIME_COL);
        long round = Math.round(getLatency().doubleValue() * 1000.0d);
        final int findColIndexWithAssert = TableUtil.findColIndexWithAssert(this.inputColNames, str);
        String[] strArr = new String[this.inputColNames.length + 1];
        System.arraycopy(this.inputColNames, 0, strArr, 0, this.inputColNames.length);
        strArr[this.inputColNames.length] = WATERMARK_COL_NAME;
        TypeInformation[] typeInformationArr = new TypeInformation[strArr.length];
        System.arraycopy(checkAndGetFirst.getColTypes(), 0, typeInformationArr, 0, this.inputColNames.length);
        typeInformationArr[this.inputColNames.length] = TypeInformation.of(Long.class);
        SingleOutputStreamOperator map = checkAndGetFirst.getDataStream().map(new MapFunction<Row, Row>() { // from class: com.alibaba.alink.operator.common.feature.featurebuilder.BaseWindowStreamOp.1
            public Row map(Row row) throws Exception {
                Row row2 = new Row(row.getArity() + 1);
                for (int i = 0; i < row.getArity(); i++) {
                    row2.setField(i, row.getField(i));
                }
                row2.setField(row.getArity(), Long.valueOf(((Timestamp) row.getField(findColIndexWithAssert)).getTime() + 28800000));
                return row2;
            }
        });
        BaseWindowParams.WatermarkType watermarkType = getWatermarkType();
        int length = this.inputColNames.length;
        Table fromDataStream = streamTableEnvironment.fromDataStream(BaseWindowParams.WatermarkType.PERIOD.equals(watermarkType) ? map.assignTimestampsAndWatermarks(new PeriodExtendBound(Time.of(round, TimeUnit.MILLISECONDS), length)).returns(new RowTypeInfo(typeInformationArr, strArr)) : map.assignTimestampsAndWatermarks(new PunctuatedAssigner(Time.of(round, TimeUnit.MILLISECONDS), length)).returns(new RowTypeInfo(typeInformationArr, strArr)), concatColNames(strArr));
        streamTableEnvironment.registerTable(createUniqueTableName, fromDataStream);
        Tuple2<FeatureClauseUtil.ClauseInfo, String> generateSqlInfo = generateSqlInfo(createUniqueTableName, fromDataStream.getSchema());
        String str2 = (String) generateSqlInfo.f1;
        if (AlinkGlobalConfiguration.isPrintProcessInfo()) {
            System.out.println(str2);
        }
        StreamOperator<?> streamOperator = (StreamOperator) MLEnvironmentFactory.get(Long.valueOf(longValue)).streamSQL(str2).setMLEnvironmentId(getMLEnvironmentId());
        if (this instanceof BaseOverWindowStreamOp) {
            String[] colNames = streamOperator.getColNames();
            StringBuilder sb = new StringBuilder();
            for (String str3 : colNames) {
                if (!str3.equals(ROW_TIME_COL_NAME)) {
                    sb.append(",");
                    sb.append(str3);
                }
            }
            if (AlinkGlobalConfiguration.isPrintProcessInfo()) {
                System.out.println(sb.substring(1));
            }
            streamOperator = streamOperator.select(sb.substring(1));
        }
        String[] colNames2 = streamOperator.getColNames();
        TypeInformation<?>[] colTypes = streamOperator.getColTypes();
        modifyResType(checkAndGetFirst.getSchema(), colNames2, colTypes, (FeatureClauseUtil.ClauseInfo) generateSqlInfo.f0, buildTimeCols((FeatureClauseUtil.ClauseInfo) generateSqlInfo.f0, str));
        setOutputTable(DataStreamConversionUtil.toTable(Long.valueOf(longValue), streamOperator.getDataStream(), colNames2, colTypes));
        return this;
    }

    public abstract void generateWindowClause();

    abstract Tuple2<FeatureClauseUtil.ClauseInfo, String> generateSqlInfo(String str, TableSchema tableSchema);

    abstract String[] buildTimeCols(FeatureClauseUtil.ClauseInfo clauseInfo, String str);

    private static void modifyResType(TableSchema tableSchema, String[] strArr, TypeInformation<?>[] typeInformationArr, FeatureClauseUtil.ClauseInfo clauseInfo, String[] strArr2) {
        for (int i = 0; i < clauseInfo.operatorIndex; i++) {
            int findColIndex = TableUtil.findColIndex(strArr, clauseInfo.asCols[i]);
            if (clauseInfo.operators[i] != null) {
                TypeInformation<?> resType = clauseInfo.operators[i].getResType();
                if (WindowResColType.RES_TYPE.equals(resType)) {
                    typeInformationArr[findColIndex] = TableUtil.findColType(tableSchema, clauseInfo.inputCols[i]);
                } else {
                    typeInformationArr[findColIndex] = resType;
                }
            } else {
                TypeInformation<?> findColType = TableUtil.findColType(tableSchema, clauseInfo.inputCols[i]);
                if (findColType != null) {
                    typeInformationArr[findColIndex] = findColType;
                }
            }
        }
        for (int i2 = 0; i2 < strArr.length; i2++) {
            int length = strArr2.length;
            int i3 = 0;
            while (true) {
                if (i3 < length) {
                    if (strArr[i2].equals(strArr2[i3])) {
                        typeInformationArr[i2] = TypeInformation.of(Timestamp.class);
                        break;
                    }
                    i3++;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static String concatColNames(String[] strArr) {
        if (strArr == null || strArr.length == 0) {
            return null;
        }
        StringBuilder sb = new StringBuilder();
        boolean z = true;
        for (String str : strArr) {
            if (!z) {
                sb.append(", ");
            }
            sb.append(str);
            if (z) {
                z = false;
            }
        }
        return sb.toString();
    }

    @Override // com.alibaba.alink.operator.stream.StreamOperator
    public /* bridge */ /* synthetic */ StreamOperator linkFrom(StreamOperator[] streamOperatorArr) {
        return linkFrom((StreamOperator<?>[]) streamOperatorArr);
    }
}
