package com.alibaba.alink.operator.stream.evaluation;

import com.alibaba.alink.common.annotation.InputPorts;
import com.alibaba.alink.common.annotation.NameCn;
import com.alibaba.alink.common.annotation.NameEn;
import com.alibaba.alink.common.annotation.OutputPorts;
import com.alibaba.alink.common.annotation.PortSpec;
import com.alibaba.alink.common.annotation.PortType;
import com.alibaba.alink.common.exceptions.AkIllegalOperatorParameterException;
import com.alibaba.alink.common.exceptions.AkPreconditions;
import com.alibaba.alink.common.exceptions.ExceptionWithErrorCode;
import com.alibaba.alink.common.utils.TableUtil;
import com.alibaba.alink.operator.common.evaluation.BaseMetrics;
import com.alibaba.alink.operator.common.evaluation.BaseMetricsSummary;
import com.alibaba.alink.operator.common.evaluation.ClassificationEvaluationUtil;
import com.alibaba.alink.operator.common.evaluation.ConfusionMatrix;
import com.alibaba.alink.operator.common.evaluation.EvalOutlierUtils;
import com.alibaba.alink.operator.common.evaluation.OutlierMetricsSummary;
import com.alibaba.alink.operator.stream.StreamOperator;
import com.alibaba.alink.operator.stream.utils.TimeUtil;
import com.alibaba.alink.params.evaluation.EvalOutlierStreamParams;
import com.google.common.collect.Iterables;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.TreeSet;
import java.util.stream.IntStream;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

@InputPorts(values = {@PortSpec(PortType.DATA)})
@OutputPorts(values = {@PortSpec(PortType.EVAL_METRICS)})
@NameCn("异常检测评估")
@NameEn("Evaluation for outlier detection")
/* loaded from: input_file:com/alibaba/alink/operator/stream/evaluation/EvalOutlierStreamOp.class */
public class EvalOutlierStreamOp extends StreamOperator<EvalOutlierStreamOp> implements EvalOutlierStreamParams<EvalOutlierStreamOp> {
    private static final long serialVersionUID = -5391026894056384871L;
    private static final String DATA_OUTPUT = "Data";

    /* loaded from: input_file:com/alibaba/alink/operator/stream/evaluation/EvalOutlierStreamOp$AccSummaryMapFunction.class */
    public static class AccSummaryMapFunction<M extends BaseMetricsSummary<?, M>> implements MapFunction<M, M> {
        private static final long serialVersionUID = 7732966899822527214L;
        private M acc;

        public M map(M m) {
            this.acc = null == this.acc ? m : (M) this.acc.merge(m);
            return this.acc;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/alibaba/alink/operator/stream/evaluation/EvalOutlierStreamOp$CalcOutlierMetricsSummaryWindowFunction.class */
    public static class CalcOutlierMetricsSummaryWindowFunction implements AllWindowFunction<Row, OutlierMetricsSummary, TimeWindow> {
        private static final long serialVersionUID = 8057305974098408321L;
        private final String[] outlierValueStrings;

        public CalcOutlierMetricsSummaryWindowFunction(String[] strArr) {
            this.outlierValueStrings = strArr;
        }

        public void apply(TimeWindow timeWindow, Iterable<Row> iterable, Collector<OutlierMetricsSummary> collector) throws Exception {
            TreeSet treeSet = new TreeSet(Collections.reverseOrder());
            Iterator<Row> it = iterable.iterator();
            while (it.hasNext()) {
                treeSet.add(String.valueOf(it.next().getField(2)));
            }
            String[] strArr = (String[]) treeSet.toArray(new String[0]);
            Object[] objArr = {EvalOutlierUtils.OUTLIER_LABEL, EvalOutlierUtils.INLIER_LABEL};
            Tuple4<Integer, List<Tuple2<Double, ConfusionMatrix>>, Double, Double> calcOutlierStats = EvalOutlierStreamOp.calcOutlierStats(iterable, EvalOutlierUtils.OUTLIER_LABEL);
            int intValue = ((Integer) calcOutlierStats.f0).intValue();
            if (0 == intValue) {
                return;
            }
            collector.collect(new OutlierMetricsSummary(intValue, objArr, strArr, this.outlierValueStrings, ((Double) calcOutlierStats.f2).doubleValue(), ((Double) calcOutlierStats.f3).doubleValue(), (List) calcOutlierStats.f1));
        }

        public /* bridge */ /* synthetic */ void apply(Window window, Iterable iterable, Collector collector) throws Exception {
            apply((TimeWindow) window, (Iterable<Row>) iterable, (Collector<OutlierMetricsSummary>) collector);
        }
    }

    /* loaded from: input_file:com/alibaba/alink/operator/stream/evaluation/EvalOutlierStreamOp$PrependTagMapFunction.class */
    public static class PrependTagMapFunction<T extends BaseMetrics<T>, M extends BaseMetricsSummary<T, M>> implements MapFunction<M, Row> {
        private static final long serialVersionUID = 7054471982821965299L;
        private final String tag;

        public PrependTagMapFunction(String str) {
            this.tag = str;
        }

        public Row map(M m) throws Exception {
            return Row.of(new Object[]{this.tag, m.toMetrics().serialize().getField(0)});
        }
    }

    public EvalOutlierStreamOp() {
        this(new Params());
    }

    public EvalOutlierStreamOp(Params params) {
        super(params);
    }

    static Tuple4<Integer, boolean[], boolean[], double[]> extractSampleInfo(Iterable<Row> iterable, String str) {
        int size = Iterables.size(iterable);
        boolean[] zArr = new boolean[size];
        boolean[] zArr2 = new boolean[size];
        double[] dArr = new double[size];
        int i = 0;
        for (Row row : iterable) {
            String str2 = (String) row.getField(0);
            String str3 = (String) row.getField(1);
            if (!StringUtils.isEmpty(str3)) {
                Tuple2<Boolean, Double> extractPredictionScore = EvalOutlierUtils.extractPredictionScore(str3);
                boolean booleanValue = ((Boolean) extractPredictionScore.f0).booleanValue();
                double doubleValue = ((Double) extractPredictionScore.f1).doubleValue();
                zArr[i] = str.equals(str2);
                dArr[i] = doubleValue;
                zArr2[i] = booleanValue;
                i++;
            }
        }
        return Tuple4.of(Integer.valueOf(i), zArr, zArr2, dArr);
    }

    /* JADX WARN: Type inference failed for: r2v3, types: [long[], long[][]] */
    public static Tuple4<Integer, List<Tuple2<Double, ConfusionMatrix>>, Double, Double> calcOutlierStats(Iterable<Row> iterable, String str) {
        Tuple4<Integer, boolean[], boolean[], double[]> extractSampleInfo = extractSampleInfo(iterable, str);
        int intValue = ((Integer) extractSampleInfo.f0).intValue();
        boolean[] zArr = (boolean[]) extractSampleInfo.f1;
        boolean[] zArr2 = (boolean[]) extractSampleInfo.f2;
        double[] dArr = (double[]) extractSampleInfo.f3;
        int i = 0;
        int i2 = 0;
        double d = Double.MAX_VALUE;
        double d2 = -1.7976931348623157E308d;
        for (int i3 = 0; i3 < intValue; i3++) {
            if (zArr[i3]) {
                i++;
            } else {
                i2++;
            }
            if (zArr2[i3]) {
                d = Math.min(d, dArr[i3]);
            } else {
                d2 = Math.max(d2, dArr[i3]);
            }
        }
        int[] array = IntStream.range(0, intValue).boxed().sorted(Comparator.comparing(num -> {
            return Double.valueOf(dArr[num.intValue()]);
        }).reversed()).mapToInt(num2 -> {
            return num2.intValue();
        }).toArray();
        ArrayList arrayList = new ArrayList();
        long j = 0;
        long j2 = 0;
        for (int i4 = 0; i4 < intValue; i4++) {
            int i5 = array[i4];
            double d3 = dArr[i5];
            if (zArr[i5]) {
                j++;
            } else {
                j2++;
            }
            arrayList.add(Tuple2.of(Double.valueOf(d3), new ConfusionMatrix((long[][]) new long[]{new long[]{j, j2}, new long[]{i - j, i2 - j2}})));
        }
        return Tuple4.of(Integer.valueOf(intValue), arrayList, Double.valueOf(d), Double.valueOf(d2));
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.alibaba.alink.operator.stream.StreamOperator
    public EvalOutlierStreamOp linkFrom(StreamOperator<?>... streamOperatorArr) {
        StreamOperator<?> checkAndGetFirst = checkAndGetFirst(streamOperatorArr);
        String labelCol = getLabelCol();
        String[] outlierValueStrings = getOutlierValueStrings();
        HashSet hashSet = new HashSet(Arrays.asList(outlierValueStrings));
        double doubleValue = getTimeInterval().doubleValue();
        AkPreconditions.checkArgument(getParams().contains(EvalOutlierStreamParams.PREDICTION_DETAIL_COL), (ExceptionWithErrorCode) new AkIllegalOperatorParameterException("Outlier detection evaluation must give predictionDetailCol!"));
        String predictionDetailCol = getPredictionDetailCol();
        TableUtil.assertSelectedColExist(checkAndGetFirst.getColNames(), labelCol, predictionDetailCol);
        SingleOutputStreamOperator parallelism = checkAndGetFirst.select(new String[]{labelCol, predictionDetailCol}).getDataStream().map(new EvalOutlierUtils.ReplaceLabelMapFunction(hashSet, 1, 0)).timeWindowAll(TimeUtil.convertTime(doubleValue)).apply(new CalcOutlierMetricsSummaryWindowFunction(outlierValueStrings)).setParallelism(1);
        setOutput(parallelism.map(new PrependTagMapFunction((String) ClassificationEvaluationUtil.WINDOW.f0)).union(new DataStream[]{parallelism.map(new AccSummaryMapFunction()).setParallelism(1).map(new PrependTagMapFunction((String) ClassificationEvaluationUtil.ALL.f0))}), new String[]{ClassificationEvaluationUtil.STATISTICS_OUTPUT, DATA_OUTPUT}, new TypeInformation[]{Types.STRING, Types.STRING});
        return this;
    }

    @Override // com.alibaba.alink.operator.stream.StreamOperator
    public /* bridge */ /* synthetic */ EvalOutlierStreamOp linkFrom(StreamOperator[] streamOperatorArr) {
        return linkFrom((StreamOperator<?>[]) streamOperatorArr);
    }
}
