package com.alibaba.alink.operator.common.outlier;

import com.alibaba.alink.common.MTable;
import com.alibaba.alink.common.annotation.InputPorts;
import com.alibaba.alink.common.annotation.Internal;
import com.alibaba.alink.common.annotation.OutputPorts;
import com.alibaba.alink.common.annotation.ParamSelectColumnSpec;
import com.alibaba.alink.common.annotation.PortDesc;
import com.alibaba.alink.common.annotation.PortSpec;
import com.alibaba.alink.common.annotation.PortType;
import com.alibaba.alink.common.exceptions.AkColumnNotFoundException;
import com.alibaba.alink.common.mapper.FlatMapperAdapter;
import com.alibaba.alink.common.mapper.Mapper;
import com.alibaba.alink.common.type.AlinkTypes;
import com.alibaba.alink.common.utils.TableUtil;
import com.alibaba.alink.operator.batch.BatchOperator;
import com.alibaba.alink.operator.batch.source.TableSourceBatchOp;
import com.alibaba.alink.operator.batch.utils.DataSetConversionUtil;
import com.alibaba.alink.operator.batch.utils.MapBatchOp;
import com.alibaba.alink.operator.common.dataproc.FlattenMTableMapper;
import com.alibaba.alink.operator.common.outlier.BaseOutlierBatchOp;
import com.alibaba.alink.params.dataproc.FlattenMTableParams;
import com.alibaba.alink.params.outlier.HasDetectLast;
import com.alibaba.alink.params.outlier.HasInputMTableCol;
import com.alibaba.alink.params.outlier.HasMaxOutlierNumPerGroup;
import com.alibaba.alink.params.outlier.HasMaxOutlierRatio;
import com.alibaba.alink.params.outlier.HasMaxSampleNumPerGroup;
import com.alibaba.alink.params.outlier.HasOutputMTableCol;
import com.alibaba.alink.params.outlier.OutlierDetectorParams;
import com.alibaba.alink.params.outlier.OutlierParams;
import com.google.common.base.Joiner;
import java.util.Arrays;
import java.util.function.BiFunction;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.ml.api.misc.param.ParamInfo;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

@InputPorts(values = {@PortSpec(PortType.DATA)})
@OutputPorts(values = {@PortSpec(value = PortType.DATA, desc = PortDesc.OUTLIER_DETECTION_RESULT)})
@Internal
@ParamSelectColumnSpec(name = "groupCols")
/* loaded from: input_file:com/alibaba/alink/operator/common/outlier/BaseOutlierBatchOp.class */
public class BaseOutlierBatchOp<T extends BaseOutlierBatchOp<T>> extends MapBatchOp<T> implements OutlierParams<T>, HasMaxOutlierRatio<T>, HasMaxOutlierNumPerGroup<T>, HasMaxSampleNumPerGroup<T> {

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/alibaba/alink/operator/common/outlier/BaseOutlierBatchOp$GroupKeySelector.class */
    public static class GroupKeySelector implements KeySelector<Row, String> {
        private final int[] groupColIndices;

        GroupKeySelector(int[] iArr) {
            this.groupColIndices = iArr;
        }

        public String getKey(Row row) throws Exception {
            if (0 == this.groupColIndices.length) {
                return "1";
            }
            StringBuilder sb = new StringBuilder();
            for (int i = 0; i < this.groupColIndices.length; i++) {
                if (null == row.getField(i)) {
                    throw new AkColumnNotFoundException("There is NULL value in group col!");
                }
                sb.append(row.getField(this.groupColIndices[i]));
                sb.append("\u0001");
            }
            return sb.toString();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/alibaba/alink/operator/common/outlier/BaseOutlierBatchOp$ReGroupReduce.class */
    public static class ReGroupReduce implements GroupReduceFunction<Row, Row> {
        private final int maxSampleNumPerGroup;
        private final String tableSchemaStr;

        private ReGroupReduce(int i, String str) {
            this.maxSampleNumPerGroup = i;
            this.tableSchemaStr = str;
        }

        public void reduce(Iterable<Row> iterable, Collector<Row> collector) {
            int i = 0;
            int i2 = 2 * this.maxSampleNumPerGroup;
            Row[] rowArr = new Row[i2];
            for (Row row : iterable) {
                if (i < i2) {
                    rowArr[i] = row;
                    i++;
                } else {
                    collector.collect(Row.of(new Object[]{new MTable((Row[]) Arrays.copyOfRange(rowArr, this.maxSampleNumPerGroup, i2), this.tableSchemaStr)}));
                    rowArr[this.maxSampleNumPerGroup] = row;
                    i = this.maxSampleNumPerGroup + 1;
                }
            }
            if (i < this.maxSampleNumPerGroup) {
                collector.collect(Row.of(new Object[]{new MTable((Row[]) Arrays.copyOfRange(rowArr, 0, i), this.tableSchemaStr)}));
                return;
            }
            int round = (int) Math.round(i / 2.0d);
            collector.collect(Row.of(new Object[]{new MTable((Row[]) Arrays.copyOfRange(rowArr, 0, round), this.tableSchemaStr)}));
            collector.collect(Row.of(new Object[]{new MTable((Row[]) Arrays.copyOfRange(rowArr, round, i), this.tableSchemaStr)}));
        }
    }

    public BaseOutlierBatchOp(BiFunction<TableSchema, Params, Mapper> biFunction, Params params) {
        super(biFunction, params);
    }

    @Override // com.alibaba.alink.operator.batch.utils.MapBatchOp, com.alibaba.alink.operator.batch.BatchOperator
    public T linkFrom(BatchOperator<?>... batchOperatorArr) {
        BatchOperator<?> checkAndGetFirst = checkAndGetFirst(batchOperatorArr);
        if (null == getParams().get(OutlierParams.GROUP_COLS) && !getParams().contains(HasMaxSampleNumPerGroup.MAX_SAMPLE_NUM_PER_GROUP) && supportDealWholeData()) {
            setOutputTable(dealWholeData(checkAndGetFirst));
        } else {
            BatchOperator<?> group2MTables = group2MTables(checkAndGetFirst, getParams());
            Mapper apply = this.mapperBuilder.apply(group2MTables.getSchema(), getParams().m1495clone().set((ParamInfo<ParamInfo<String>>) HasInputMTableCol.INPUT_MTABLE_COL, (ParamInfo<String>) OutlierDetector.TEMP_MTABLE_COL).set((ParamInfo<ParamInfo<String>>) HasOutputMTableCol.OUTPUT_MTABLE_COL, (ParamInfo<String>) OutlierDetector.TEMP_MTABLE_COL).set((ParamInfo<ParamInfo<Boolean>>) HasDetectLast.DETECT_LAST, (ParamInfo<Boolean>) false));
            setOutputTable(flattenMTable(MapBatchOp.calcResultRows(group2MTables, apply, getParams()), checkAndGetFirst.getSchema(), apply.getOutputSchema(), getParams(), getMLEnvironmentId()));
        }
        return this;
    }

    protected boolean supportDealWholeData() {
        return false;
    }

    protected Table dealWholeData(BatchOperator<?> batchOperator) {
        return null;
    }

    public static BatchOperator<?> group2MTables(BatchOperator<?> batchOperator, Params params) {
        String[] strArr = (String[]) params.get(OutlierParams.GROUP_COLS);
        if (null == strArr) {
            strArr = new String[0];
        }
        if (!params.contains(HasMaxSampleNumPerGroup.MAX_SAMPLE_NUM_PER_GROUP)) {
            return group2MTablesWithoutMaxSampleNum(batchOperator, strArr);
        }
        return group2MTablesWithMaxSampleNum(batchOperator, strArr, ((Integer) params.get(HasMaxSampleNumPerGroup.MAX_SAMPLE_NUM_PER_GROUP)).intValue());
    }

    public static BatchOperator<?> group2MTablesWithoutMaxSampleNum(BatchOperator<?> batchOperator, String[] strArr) {
        return batchOperator.groupBy(strArr.length == 0 ? "1" : Joiner.on(", ").join(strArr), "MTABLE_AGG(" + Joiner.on(", ").join(batchOperator.getColNames()) + ") AS " + OutlierDetector.TEMP_MTABLE_COL);
    }

    public static BatchOperator<?> group2MTablesWithMaxSampleNum(BatchOperator<?> batchOperator, String[] strArr, int i) {
        return new TableSourceBatchOp(DataSetConversionUtil.toTable(batchOperator.getMLEnvironmentId(), (DataSet<Row>) batchOperator.shuffle().getDataSet().groupBy(new GroupKeySelector(TableUtil.findColIndices(batchOperator.getColNames(), strArr))).reduceGroup(new ReGroupReduce(i, TableUtil.schema2SchemaStr(batchOperator.getSchema()))).rebalance(), new String[]{OutlierDetector.TEMP_MTABLE_COL}, (TypeInformation<?>[]) new TypeInformation[]{AlinkTypes.M_TABLE}));
    }

    public static Table flattenMTable(DataSet<Row> dataSet, TableSchema tableSchema, TableSchema tableSchema2, Params params, Long l) {
        String[] strArr = (String[]) ArrayUtils.add(tableSchema.getFieldNames(), params.get(OutlierDetectorParams.PREDICTION_COL));
        TypeInformation[] typeInformationArr = (TypeInformation[]) ArrayUtils.add(tableSchema.getFieldTypes(), AlinkTypes.BOOLEAN);
        if (params.contains(OutlierDetectorParams.PREDICTION_DETAIL_COL)) {
            strArr = (String[]) ArrayUtils.add(strArr, params.get(OutlierDetectorParams.PREDICTION_DETAIL_COL));
            typeInformationArr = (TypeInformation[]) ArrayUtils.add(typeInformationArr, AlinkTypes.STRING);
        }
        FlattenMTableMapper flattenMTableMapper = new FlattenMTableMapper(tableSchema2, new Params().set((ParamInfo<ParamInfo<String>>) FlattenMTableParams.SELECTED_COL, (ParamInfo<String>) OutlierDetector.TEMP_MTABLE_COL).set((ParamInfo<ParamInfo<String>>) FlattenMTableParams.SCHEMA_STR, (ParamInfo<String>) TableUtil.schema2SchemaStr(new TableSchema(strArr, typeInformationArr))).set((ParamInfo<ParamInfo<String[]>>) FlattenMTableParams.RESERVED_COLS, (ParamInfo<String[]>) new String[0]));
        return DataSetConversionUtil.toTable(l, (DataSet<Row>) dataSet.flatMap(new FlatMapperAdapter(flattenMTableMapper)), flattenMTableMapper.getOutputSchema());
    }

    @Override // com.alibaba.alink.operator.batch.utils.MapBatchOp, com.alibaba.alink.operator.batch.BatchOperator
    public /* bridge */ /* synthetic */ MapBatchOp linkFrom(BatchOperator[] batchOperatorArr) {
        return linkFrom((BatchOperator<?>[]) batchOperatorArr);
    }

    @Override // com.alibaba.alink.operator.batch.utils.MapBatchOp, com.alibaba.alink.operator.batch.BatchOperator
    public /* bridge */ /* synthetic */ BatchOperator linkFrom(BatchOperator[] batchOperatorArr) {
        return linkFrom((BatchOperator<?>[]) batchOperatorArr);
    }
}
