package com.alibaba.alink.operator.batch.feature;

import com.alibaba.alink.common.annotation.InputPorts;
import com.alibaba.alink.common.annotation.Internal;
import com.alibaba.alink.common.annotation.OutputPorts;
import com.alibaba.alink.common.annotation.ParamSelectColumnSpec;
import com.alibaba.alink.common.annotation.ParamSelectColumnSpecs;
import com.alibaba.alink.common.annotation.PortSpec;
import com.alibaba.alink.common.annotation.PortType;
import com.alibaba.alink.common.annotation.TypeCollections;
import com.alibaba.alink.common.utils.TableUtil;
import com.alibaba.alink.operator.batch.BatchOperator;
import com.alibaba.alink.operator.common.evaluation.EvaluationUtil;
import com.alibaba.alink.operator.common.feature.WoeModelDataConverter;
import com.alibaba.alink.operator.common.feature.binning.FeatureBinsCalculator;
import com.alibaba.alink.operator.common.io.types.FlinkTypeConverter;
import com.alibaba.alink.params.dataproc.HasSelectedColTypes;
import com.alibaba.alink.params.finance.WoeTrainParams;
import com.alibaba.alink.params.shared.colname.HasSelectedCols;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.functions.RichMapPartitionFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.ml.api.misc.param.ParamInfo;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;

@InputPorts(values = {@PortSpec(PortType.DATA)})
@OutputPorts(values = {@PortSpec(PortType.MODEL)})
@Internal
@ParamSelectColumnSpecs({@ParamSelectColumnSpec(name = "selectedCols", allowedTypeCollections = {TypeCollections.NUMERIC_TYPES}), @ParamSelectColumnSpec(name = "labelCol")})
/* loaded from: input_file:com/alibaba/alink/operator/batch/feature/WoeTrainBatchOp.class */
public final class WoeTrainBatchOp extends BatchOperator<WoeTrainBatchOp> implements WoeTrainParams<WoeTrainBatchOp> {
    private static final long serialVersionUID = 5413307707249156884L;
    public static String NULL_STR = "WOE_NULL_STRING";

    public WoeTrainBatchOp() {
    }

    public WoeTrainBatchOp(Params params) {
        super(params);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.alibaba.alink.operator.batch.BatchOperator
    public WoeTrainBatchOp linkFrom(BatchOperator<?>... batchOperatorArr) {
        BatchOperator<?> checkAndGetFirst = checkAndGetFirst(batchOperatorArr);
        final String[] selectedCols = getSelectedCols();
        final String[] strArr = new String[selectedCols.length];
        for (int i = 0; i < selectedCols.length; i++) {
            strArr[i] = FlinkTypeConverter.getTypeString(TableUtil.findColType(checkAndGetFirst.getSchema(), selectedCols[i]));
        }
        final int length = selectedCols.length;
        String labelCol = getLabelCol();
        final TypeInformation<?> findColTypeWithAssertAndHint = TableUtil.findColTypeWithAssertAndHint(checkAndGetFirst.getSchema(), labelCol);
        AggregateOperator sum = checkAndGetFirst.select(labelCol).getDataSet().map(new MapFunction<Row, Tuple2<String, Long>>() { // from class: com.alibaba.alink.operator.batch.feature.WoeTrainBatchOp.1
            private static final long serialVersionUID = 7244041023667742068L;

            public Tuple2<String, Long> map(Row row) throws Exception {
                Preconditions.checkNotNull(row.getField(0), "LabelCol contains null value!");
                return Tuple2.of(new EvaluationUtil.ComparableLabel(row.getField(0).toString(), findColTypeWithAssertAndHint).label.toString(), 1L);
            }
        }).groupBy(new int[]{0}).sum(1);
        final EvaluationUtil.ComparableLabel comparableLabel = new EvaluationUtil.ComparableLabel(getPositiveLabelValueString(), findColTypeWithAssertAndHint);
        setOutput((DataSet<Row>) checkAndGetFirst.select((String[]) ArrayUtils.add(selectedCols, labelCol)).getDataSet().flatMap(new FlatMapFunction<Row, Tuple3<Integer, String, Long>>() { // from class: com.alibaba.alink.operator.batch.feature.WoeTrainBatchOp.3
            private static final long serialVersionUID = 3246134198223500699L;

            public void flatMap(Row row, Collector<Tuple3<Integer, String, Long>> collector) {
                Long valueOf = Long.valueOf(new EvaluationUtil.ComparableLabel(row.getField(length), findColTypeWithAssertAndHint).equals(comparableLabel) ? 1L : 0L);
                for (int i2 = 0; i2 < length; i2++) {
                    Object field = row.getField(i2);
                    collector.collect(Tuple3.of(Integer.valueOf(i2), null == field ? WoeTrainBatchOp.NULL_STR : field.toString(), valueOf));
                }
            }

            public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
                flatMap((Row) obj, (Collector<Tuple3<Integer, String, Long>>) collector);
            }
        }).groupBy(new int[]{0, 1}).reduceGroup(new GroupReduceFunction<Tuple3<Integer, String, Long>, Tuple4<Integer, String, Long, Long>>() { // from class: com.alibaba.alink.operator.batch.feature.WoeTrainBatchOp.2
            private static final long serialVersionUID = 8132981693511963253L;

            public void reduce(Iterable<Tuple3<Integer, String, Long>> iterable, Collector<Tuple4<Integer, String, Long, Long>> collector) throws Exception {
                Long l = 0L;
                Long l2 = 0L;
                int i2 = -1;
                String str = null;
                for (Tuple3<Integer, String, Long> tuple3 : iterable) {
                    l2 = Long.valueOf(l2.longValue() + 1);
                    i2 = ((Integer) tuple3.f0).intValue();
                    str = (String) tuple3.f1;
                    l = Long.valueOf(l.longValue() + ((Long) tuple3.f2).longValue());
                }
                if (i2 >= 0) {
                    collector.collect(Tuple4.of(Integer.valueOf(i2), str, l2, l));
                }
            }
        }).mapPartition(new RichMapPartitionFunction<Tuple4<Integer, String, Long, Long>, Row>() { // from class: com.alibaba.alink.operator.batch.feature.WoeTrainBatchOp.4
            private static final long serialVersionUID = 9015674191729072450L;
            private long positiveTotal;
            private long negativeTotal;

            public void open(Configuration configuration) {
                List broadcastVariable = getRuntimeContext().getBroadcastVariable("labelCount");
                Preconditions.checkArgument(broadcastVariable.size() == 2, "Only support binary classification!");
                if (comparableLabel.equals(new EvaluationUtil.ComparableLabel(((Tuple2) broadcastVariable.get(0)).f0, findColTypeWithAssertAndHint))) {
                    this.positiveTotal = ((Long) ((Tuple2) broadcastVariable.get(0)).f1).longValue();
                    this.negativeTotal = ((Long) ((Tuple2) broadcastVariable.get(1)).f1).longValue();
                } else {
                    if (!comparableLabel.equals(new EvaluationUtil.ComparableLabel(((Tuple2) broadcastVariable.get(1)).f0, findColTypeWithAssertAndHint))) {
                        throw new IllegalArgumentException("Not contain positiveValue " + comparableLabel);
                    }
                    this.positiveTotal = ((Long) ((Tuple2) broadcastVariable.get(1)).f1).longValue();
                    this.negativeTotal = ((Long) ((Tuple2) broadcastVariable.get(0)).f1).longValue();
                }
            }

            public void mapPartition(Iterable<Tuple4<Integer, String, Long, Long>> iterable, Collector<Row> collector) {
                Params params = null;
                if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
                    params = new Params().set((ParamInfo<ParamInfo<String[]>>) HasSelectedCols.SELECTED_COLS, (ParamInfo<String[]>) selectedCols).set((ParamInfo<ParamInfo<String[]>>) HasSelectedColTypes.SELECTED_COL_TYPES, (ParamInfo<String[]>) strArr).set((ParamInfo<ParamInfo<Long>>) WoeModelDataConverter.POSITIVE_TOTAL, (ParamInfo<Long>) Long.valueOf(this.positiveTotal)).set((ParamInfo<ParamInfo<Long>>) WoeModelDataConverter.NEGATIVE_TOTAL, (ParamInfo<Long>) Long.valueOf(this.negativeTotal));
                }
                new WoeModelDataConverter().save2(Tuple2.of(params, iterable), collector);
            }
        }).withBroadcastSet(sum, "labelCount").name("build_model"), new WoeModelDataConverter().getModelSchema());
        return this;
    }

    public static DataSet<FeatureBinsCalculator> setFeatureBinsWoe(DataSet<FeatureBinsCalculator> dataSet, DataSet<Row> dataSet2) {
        return dataSet.map(new MapFunction<FeatureBinsCalculator, Tuple2<String, FeatureBinsCalculator>>() { // from class: com.alibaba.alink.operator.batch.feature.WoeTrainBatchOp.5
            private static final long serialVersionUID = 3810414585464772028L;

            public Tuple2<String, FeatureBinsCalculator> map(FeatureBinsCalculator featureBinsCalculator) {
                return Tuple2.of(featureBinsCalculator.getFeatureName(), featureBinsCalculator);
            }
        }).join(dataSet2.groupBy(new int[]{0}).reduceGroup(new RichGroupReduceFunction<Row, Tuple3<String, Map<Long, Long>, Map<Long, Long>>>() { // from class: com.alibaba.alink.operator.batch.feature.WoeTrainBatchOp.7
            private static final long serialVersionUID = 2877684088626081532L;

            public void reduce(Iterable<Row> iterable, Collector<Tuple3<String, Map<Long, Long>, Map<Long, Long>>> collector) {
                String[] strArr = (String[]) Params.fromJson((String) ((Row) getRuntimeContext().getBroadcastVariable("selectedCols").get(0)).getField(1)).get(WoeTrainParams.SELECTED_COLS);
                HashMap hashMap = new HashMap();
                HashMap hashMap2 = new HashMap();
                long j = -1;
                for (Row row : iterable) {
                    j = ((Long) row.getField(0)).longValue();
                    if (j < 0) {
                        return;
                    }
                    Long valueOf = Long.valueOf((String) row.getField(1));
                    hashMap.put(valueOf, Long.valueOf(((Long) row.getField(2)).longValue()));
                    hashMap2.put(valueOf, Long.valueOf(((Long) row.getField(3)).longValue()));
                }
                collector.collect(Tuple3.of(strArr[(int) j], hashMap, hashMap2));
            }
        }).withBroadcastSet(dataSet2.filter(new FilterFunction<Row>() { // from class: com.alibaba.alink.operator.batch.feature.WoeTrainBatchOp.6
            private static final long serialVersionUID = -2272981616877035934L;

            public boolean filter(Row row) {
                return ((Long) row.getField(0)).longValue() < 0;
            }
        }), "selectedCols").name("GetBinTotalFromWoeModel")).where(new int[]{0}).equalTo(new int[]{0}).with(new JoinFunction<Tuple2<String, FeatureBinsCalculator>, Tuple3<String, Map<Long, Long>, Map<Long, Long>>, FeatureBinsCalculator>() { // from class: com.alibaba.alink.operator.batch.feature.WoeTrainBatchOp.8
            private static final long serialVersionUID = -4468441310215491228L;

            public FeatureBinsCalculator join(Tuple2<String, FeatureBinsCalculator> tuple2, Tuple3<String, Map<Long, Long>, Map<Long, Long>> tuple3) {
                FeatureBinsCalculator featureBinsCalculator = (FeatureBinsCalculator) tuple2.f1;
                featureBinsCalculator.setTotal((Map) tuple3.f1);
                featureBinsCalculator.setPositiveTotal((Map) tuple3.f2);
                return featureBinsCalculator;
            }
        }).name("SetBinTotal");
    }

    @Override // com.alibaba.alink.operator.batch.BatchOperator
    public /* bridge */ /* synthetic */ WoeTrainBatchOp linkFrom(BatchOperator[] batchOperatorArr) {
        return linkFrom((BatchOperator<?>[]) batchOperatorArr);
    }
}
