package com.alibaba.alink.operator.common.tree.parallelcart;

import com.alibaba.alink.common.annotation.InputPorts;
import com.alibaba.alink.common.annotation.OutputPorts;
import com.alibaba.alink.common.annotation.ParamSelectColumnSpec;
import com.alibaba.alink.common.annotation.ParamSelectColumnSpecs;
import com.alibaba.alink.common.annotation.PortDesc;
import com.alibaba.alink.common.annotation.PortSpec;
import com.alibaba.alink.common.annotation.PortType;
import com.alibaba.alink.common.annotation.TypeCollections;
import com.alibaba.alink.common.comqueue.CompareCriterionFunction;
import com.alibaba.alink.common.comqueue.IterativeComQueue;
import com.alibaba.alink.common.comqueue.communication.AllReduce;
import com.alibaba.alink.common.exceptions.AkIllegalArgumentException;
import com.alibaba.alink.common.model.ModelParamName;
import com.alibaba.alink.common.utils.TableUtil;
import com.alibaba.alink.operator.batch.BatchOperator;
import com.alibaba.alink.operator.batch.source.TableSourceBatchOp;
import com.alibaba.alink.operator.batch.utils.DataSetConversionUtil;
import com.alibaba.alink.operator.common.io.types.FlinkTypeConverter;
import com.alibaba.alink.operator.common.tree.FeatureMeta;
import com.alibaba.alink.operator.common.tree.Node;
import com.alibaba.alink.operator.common.tree.Preprocessing;
import com.alibaba.alink.operator.common.tree.TreeModelDataConverter;
import com.alibaba.alink.operator.common.tree.parallelcart.BaseGbdtTrainBatchOp;
import com.alibaba.alink.operator.common.tree.parallelcart.BuildLocalSketch;
import com.alibaba.alink.operator.common.tree.parallelcart.EpsilonApproQuantile;
import com.alibaba.alink.operator.common.tree.parallelcart.booster.BoosterType;
import com.alibaba.alink.operator.common.tree.parallelcart.communication.AllReduceT;
import com.alibaba.alink.operator.common.tree.parallelcart.communication.ReduceScatter;
import com.alibaba.alink.operator.common.tree.parallelcart.criteria.CriteriaType;
import com.alibaba.alink.operator.common.tree.parallelcart.data.DataUtil;
import com.alibaba.alink.operator.common.tree.parallelcart.leafscoreupdater.LeafScoreUpdaterType;
import com.alibaba.alink.operator.common.tree.parallelcart.loss.LossType;
import com.alibaba.alink.operator.common.tree.parallelcart.loss.LossUtils;
import com.alibaba.alink.params.classification.GbdtTrainParams;
import com.alibaba.alink.params.regression.LambdaMartNdcgParams;
import com.alibaba.alink.params.shared.colname.HasCategoricalCols;
import com.alibaba.alink.params.shared.colname.HasFeatureCols;
import com.alibaba.alink.params.shared.colname.HasLabelCol;
import java.util.ArrayList;
import java.util.Arrays;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.ReduceOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.ml.api.misc.param.ParamInfo;
import org.apache.flink.ml.api.misc.param.ParamInfoFactory;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InputPorts(values = {@PortSpec(PortType.DATA)})
@OutputPorts(values = {@PortSpec(PortType.MODEL), @PortSpec(value = PortType.DATA, desc = PortDesc.FEATURE_IMPORTANCE)})
@ParamSelectColumnSpecs({@ParamSelectColumnSpec(name = "featureCols", allowedTypeCollections = {TypeCollections.TREE_FEATURE_TYPES}), @ParamSelectColumnSpec(name = "vectorCol", allowedTypeCollections = {TypeCollections.VECTOR_TYPES}), @ParamSelectColumnSpec(name = "categoricalCols", allowedTypeCollections = {TypeCollections.TREE_FEATURE_TYPES}), @ParamSelectColumnSpec(name = "weightCol", allowedTypeCollections = {TypeCollections.NUMERIC_TYPES}), @ParamSelectColumnSpec(name = "labelCol")})
/* loaded from: input_file:com/alibaba/alink/operator/common/tree/parallelcart/BaseGbdtTrainBatchOp.class */
public abstract class BaseGbdtTrainBatchOp<T extends BaseGbdtTrainBatchOp<T>> extends BatchOperator<T> {
    private static final long serialVersionUID = 6942357843795354849L;
    private static final Logger LOG = LoggerFactory.getLogger(BaseGbdtTrainBatchOp.class);
    public static final ParamInfo<Integer> ALGO_TYPE = ParamInfoFactory.createParamInfo("algoType", Integer.class).build();
    public static final ParamInfo<Boolean> USE_MISSING = ParamInfoFactory.createParamInfo("useMissing", Boolean.class).setHasDefaultValue(true).build();
    public static final ParamInfo<Boolean> USE_ONEHOT = ParamInfoFactory.createParamInfo("useOneHot", Boolean.class).setHasDefaultValue(false).build();
    public static final ParamInfo<Boolean> USE_EPSILON_APPRO_QUANTILE = ParamInfoFactory.createParamInfo("useEpsilonApproQuantile", Boolean.class).setHasDefaultValue(false).build();
    public static final ParamInfo<Double> SKETCH_EPS = ParamInfoFactory.createParamInfo("sketchEps", Double.class).setHasDefaultValue(Double.valueOf(0.03d)).build();
    public static final ParamInfo<Double> SKETCH_RATIO = ParamInfoFactory.createParamInfo("sketchRatio", Double.class).setHasDefaultValue(Double.valueOf(2.0d)).build();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/alibaba/alink/operator/common/tree/parallelcart/BaseGbdtTrainBatchOp$CheckNumLabels4BinaryClassifier.class */
    public static final class CheckNumLabels4BinaryClassifier implements MapFunction<Object[], Object[]> {
        private static final long serialVersionUID = -8337756848972278905L;

        public Object[] map(Object[] objArr) throws Exception {
            if (objArr == null || objArr.length != 2) {
                throw new AkIllegalArgumentException("The gbdt only support binary class right now.");
            }
            return objArr;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/alibaba/alink/operator/common/tree/parallelcart/BaseGbdtTrainBatchOp$NodeReducer.class */
    public static final class NodeReducer implements AllReduceT.SerializableBiConsumer<Node[], Node[]> {
        private static final long serialVersionUID = 6875638618412288149L;

        private NodeReducer() {
        }

        @Override // java.util.function.BiConsumer
        public void accept(Node[] nodeArr, Node[] nodeArr2) {
            for (int i = 0; i < nodeArr.length; i++) {
                if (nodeArr[i] == null && nodeArr2[i] != null) {
                    nodeArr[i] = nodeArr2[i];
                } else if (nodeArr[i] != null && nodeArr2[i] != null) {
                    if (nodeArr[i].getGain() < nodeArr2[i].getGain()) {
                        nodeArr[i].copy(nodeArr2[i]);
                    } else if (nodeArr[i].getGain() == nodeArr2[i].getGain() && nodeArr[i].getFeatureIndex() < nodeArr2[i].getFeatureIndex()) {
                        nodeArr[i].copy(nodeArr2[i]);
                    }
                }
            }
        }
    }

    public BaseGbdtTrainBatchOp() {
        this(new Params());
    }

    public BaseGbdtTrainBatchOp(Params params) {
        super(params);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // com.alibaba.alink.operator.batch.BatchOperator
    public T linkFrom(BatchOperator<?>... batchOperatorArr) {
        BatchOperator<?> generateStringIndexerModel;
        BatchOperator<?> generateQuantileDiscretizerModel;
        DataSet<Row> dataSet;
        BatchOperator<?> checkAndGetFirst = checkAndGetFirst(batchOperatorArr);
        LOG.info("gbdt train start");
        if (!Preprocessing.isSparse(getParams())) {
            getParams().set((ParamInfo<ParamInfo<String[]>>) HasCategoricalCols.CATEGORICAL_COLS, (ParamInfo<String[]>) TableUtil.getCategoricalCols(checkAndGetFirst.getSchema(), TableUtil.getOptionalFeatureCols(checkAndGetFirst.getSchema(), getParams()), getParams().contains(GbdtTrainParams.CATEGORICAL_COLS) ? (String[]) getParams().get(GbdtTrainParams.CATEGORICAL_COLS) : null));
        }
        LossType lossType = (LossType) getParams().get(LossUtils.LOSS_TYPE);
        getParams().set((ParamInfo<ParamInfo<Integer>>) ALGO_TYPE, (ParamInfo<Integer>) Integer.valueOf(LossUtils.lossTypeToInt(lossType)));
        rewriteLabelType(checkAndGetFirst.getSchema(), getParams());
        if (!Preprocessing.isSparse(getParams())) {
            getParams().set((ParamInfo<ParamInfo<String[]>>) ModelParamName.FEATURE_TYPES, (ParamInfo<String[]>) FlinkTypeConverter.getTypeString(TableUtil.findColTypes(checkAndGetFirst.getSchema(), TableUtil.getOptionalFeatureCols(checkAndGetFirst.getSchema(), getParams()))));
        }
        if (LossUtils.isRanking((LossType) getParams().get(LossUtils.LOSS_TYPE)) && !getParams().contains(LambdaMartNdcgParams.GROUP_COL)) {
            throw new AkIllegalArgumentException("Group column should be set in ranking loss function.");
        }
        String[] trainColsWithGroup = trainColsWithGroup(checkAndGetFirst.getSchema());
        final int findColIndex = TableUtil.findColIndex(checkAndGetFirst.getSchema(), (String) getParams().get(HasLabelCol.LABEL_COL));
        BatchOperator<?> select = Preprocessing.select((BatchOperator) new TableSourceBatchOp(DataSetConversionUtil.toTable(checkAndGetFirst.getMLEnvironmentId(), (DataSet<Row>) checkAndGetFirst.getDataSet().map(new MapFunction<Row, Row>() { // from class: com.alibaba.alink.operator.common.tree.parallelcart.BaseGbdtTrainBatchOp.1
            public Row map(Row row) throws Exception {
                if (null == row.getField(findColIndex)) {
                    throw new AkIllegalArgumentException("label col has null values.");
                }
                return row;
            }
        }), checkAndGetFirst.getSchema())).setMLEnvironmentId(checkAndGetFirst.getMLEnvironmentId()), trainColsWithGroup);
        DataSet generateLabels = Preprocessing.generateLabels(select, getParams(), LossUtils.isRegression(lossType) || LossUtils.isRanking(lossType));
        if (LossUtils.isClassification(lossType)) {
            generateLabels = generateLabels.map(new CheckNumLabels4BinaryClassifier());
        }
        if (((Boolean) getParams().get(USE_ONEHOT)).booleanValue()) {
            generateStringIndexerModel = Preprocessing.generateStringIndexerModel(select, new Params());
            generateQuantileDiscretizerModel = Preprocessing.generateQuantileDiscretizerModel(select, new Params().set((ParamInfo<ParamInfo<String[]>>) HasFeatureCols.FEATURE_COLS, (ParamInfo<String[]>) new String[0]).set((ParamInfo<ParamInfo<String[]>>) HasCategoricalCols.CATEGORICAL_COLS, (ParamInfo<String[]>) new String[0]));
            dataSet = Preprocessing.castLabel(select, getParams(), generateLabels, LossUtils.isRegression(lossType) || LossUtils.isRanking(lossType)).getDataSet();
        } else if (((Boolean) getParams().get(USE_EPSILON_APPRO_QUANTILE)).booleanValue()) {
            generateStringIndexerModel = Preprocessing.generateStringIndexerModel(select, getParams());
            generateQuantileDiscretizerModel = Preprocessing.generateQuantileDiscretizerModel(select, new Params().set((ParamInfo<ParamInfo<String[]>>) HasFeatureCols.FEATURE_COLS, (ParamInfo<String[]>) new String[0]).set((ParamInfo<ParamInfo<String[]>>) HasCategoricalCols.CATEGORICAL_COLS, (ParamInfo<String[]>) new String[0]));
            dataSet = Preprocessing.castLabel(Preprocessing.isSparse(getParams()) ? select : Preprocessing.castContinuousCols(Preprocessing.castCategoricalCols(select, generateStringIndexerModel, getParams()), getParams()), getParams(), generateLabels, LossUtils.isRegression(lossType) || LossUtils.isRanking(lossType)).getDataSet();
        } else {
            generateStringIndexerModel = Preprocessing.generateStringIndexerModel(select, getParams());
            generateQuantileDiscretizerModel = Preprocessing.generateQuantileDiscretizerModel(select, getParams());
            dataSet = Preprocessing.castLabel(Preprocessing.castToQuantile(Preprocessing.isSparse(getParams()) ? select : Preprocessing.castContinuousCols(Preprocessing.castCategoricalCols(select, generateStringIndexerModel, getParams()), getParams()), generateQuantileDiscretizerModel, getParams()), getParams(), generateLabels, LossUtils.isRegression(lossType) || LossUtils.isRanking(lossType)).getDataSet();
        }
        if (LossUtils.isRanking((LossType) getParams().get(LossUtils.LOSS_TYPE))) {
            dataSet = dataSet.partitionCustom(new Partitioner<Number>() { // from class: com.alibaba.alink.operator.common.tree.parallelcart.BaseGbdtTrainBatchOp.2
                private static final long serialVersionUID = -7790649477852624964L;

                public int partition(Number number, int i) {
                    return (int) (number.longValue() % i);
                }
            }, 0);
        }
        ReduceOperator reduce = dataSet.mapPartition(new MapPartitionFunction<Row, Tuple2<Double, Long>>() { // from class: com.alibaba.alink.operator.common.tree.parallelcart.BaseGbdtTrainBatchOp.4
            private static final long serialVersionUID = -8333738060239409640L;

            public void mapPartition(Iterable<Row> iterable, Collector<Tuple2<Double, Long>> collector) throws Exception {
                double d = 0.0d;
                long j = 0;
                for (Row row : iterable) {
                    d += ((Number) row.getField(row.getArity() - 1)).doubleValue();
                    j++;
                }
                collector.collect(Tuple2.of(Double.valueOf(d), Long.valueOf(j)));
            }
        }).reduce(new ReduceFunction<Tuple2<Double, Long>>() { // from class: com.alibaba.alink.operator.common.tree.parallelcart.BaseGbdtTrainBatchOp.3
            private static final long serialVersionUID = -6464200385237876961L;

            public Tuple2<Double, Long> reduce(Tuple2<Double, Long> tuple2, Tuple2<Double, Long> tuple22) throws Exception {
                return Tuple2.of(Double.valueOf(((Double) tuple2.f0).doubleValue() + ((Double) tuple22.f0).doubleValue()), Long.valueOf(((Long) tuple2.f1).longValue() + ((Long) tuple22.f1).longValue()));
            }
        });
        DataSet<FeatureMeta> createOneHotFeatureMeta = ((Boolean) getParams().get(USE_ONEHOT)).booleanValue() ? DataUtil.createOneHotFeatureMeta(dataSet, getParams(), trainColsWithGroup) : ((Boolean) getParams().get(USE_EPSILON_APPRO_QUANTILE)).booleanValue() ? DataUtil.createEpsilonApproQuantileFeatureMeta(dataSet, generateStringIndexerModel.getDataSet(), getParams(), trainColsWithGroup, getMLEnvironmentId().longValue()) : DataUtil.createFeatureMetas(generateQuantileDiscretizerModel.getDataSet(), generateStringIndexerModel.getDataSet(), getParams());
        getParams().set((ParamInfo<ParamInfo<BoosterType>>) BoosterType.BOOSTER_TYPE, (ParamInfo<BoosterType>) BoosterType.HESSION_BASE);
        getParams().set((ParamInfo<ParamInfo<CriteriaType>>) CriteriaType.CRITERIA_TYPE, (ParamInfo<CriteriaType>) CriteriaType.valueOf(((GbdtTrainParams.CriteriaType) getParams().get(GbdtTrainParams.CRITERIA)).toString()));
        if (((Boolean) getParams().get(GbdtTrainParams.NEWTON_STEP)).booleanValue()) {
            getParams().set((ParamInfo<ParamInfo<LeafScoreUpdaterType>>) LeafScoreUpdaterType.LEAF_SCORE_UPDATER_TYPE, (ParamInfo<LeafScoreUpdaterType>) LeafScoreUpdaterType.NEWTON_SINGLE_STEP_UPDATER);
        } else {
            getParams().set((ParamInfo<ParamInfo<LeafScoreUpdaterType>>) LeafScoreUpdaterType.LEAF_SCORE_UPDATER_TYPE, (ParamInfo<LeafScoreUpdaterType>) LeafScoreUpdaterType.WEIGHT_AVG_UPDATER);
        }
        IterativeComQueue add = new IterativeComQueue().initWithPartitionedData("trainData", dataSet).initWithBroadcastData("gbdt.y.sum", reduce).initWithBroadcastData(InitTreeObjs.QUANTILE_MODEL, generateQuantileDiscretizerModel.getDataSet()).initWithBroadcastData("stringIndexerModel", generateStringIndexerModel.getDataSet()).initWithBroadcastData("labels", generateLabels).initWithBroadcastData(InitBoostingObjs.FEATURE_METAS, createOneHotFeatureMeta).add(new InitBoostingObjs(getParams())).add(new Boosting()).add(new Bagging()).add(new InitTreeObjs());
        if (((Boolean) getParams().get(USE_EPSILON_APPRO_QUANTILE)).booleanValue()) {
            add.add(new BuildLocalSketch()).add(new AllReduceT(BuildLocalSketch.SKETCH, BuildLocalSketch.FEATURE_SKETCH_LENGTH, new BuildLocalSketch.SketchReducer(getParams()), EpsilonApproQuantile.WQSummary.class)).add(new FinalizeBuildSketch());
        }
        add.add(new ConstructLocalHistogram()).add(new ReduceScatter("histogram", "histogram", "recvcnts", AllReduce.SUM)).add(new CalcFeatureGain()).add(new AllReduceT("best", "bestLength", new NodeReducer(), Node.class)).add(new SplitInstances()).add(new UpdateLeafScore()).add(new UpdatePredictionScore()).setCompareCriterionOfNode0((CompareCriterionFunction) new TerminateCriterion()).closeWith(new SaveModel(getParams()));
        DataSet<Row> exec = add.exec();
        setOutput(exec, new TreeModelDataConverter(FlinkTypeConverter.getFlinkType((String) getParams().get(ModelParamName.LABEL_TYPE_NAME))).getModelSchema());
        setSideOutputTables(new Table[]{DataSetConversionUtil.toTable(getMLEnvironmentId(), (DataSet<Row>) exec.reduceGroup(new TreeModelDataConverter.FeatureImportanceReducer()), new String[]{(String) getParams().get(TreeModelDataConverter.IMPORTANCE_FIRST_COL), (String) getParams().get(TreeModelDataConverter.IMPORTANCE_SECOND_COL)}, (TypeInformation<?>[]) new TypeInformation[]{Types.STRING, Types.DOUBLE})});
        return this;
    }

    public static void rewriteLabelType(TableSchema tableSchema, Params params) {
        if (LossUtils.isClassification((LossType) params.get(LossUtils.LOSS_TYPE))) {
            params.set((ParamInfo<ParamInfo<String>>) ModelParamName.LABEL_TYPE_NAME, (ParamInfo<String>) FlinkTypeConverter.getTypeString(TableUtil.findColType(tableSchema, (String) params.get(GbdtTrainParams.LABEL_COL))));
        } else {
            params.set((ParamInfo<ParamInfo<String>>) ModelParamName.LABEL_TYPE_NAME, (ParamInfo<String>) FlinkTypeConverter.getTypeString((TypeInformation<?>) Types.DOUBLE));
        }
    }

    private String[] trainColsWithGroup(TableSchema tableSchema) {
        ArrayList arrayList = new ArrayList();
        if (LossUtils.isRanking((LossType) getParams().get(LossUtils.LOSS_TYPE))) {
            arrayList.add(getParams().get(LambdaMartNdcgParams.GROUP_COL));
        }
        if (Preprocessing.isSparse(getParams())) {
            arrayList.add(Preprocessing.checkAndGetOptionalVectorCols(getParams(), this));
        } else {
            arrayList.addAll(Arrays.asList(TableUtil.getOptionalFeatureCols(tableSchema, getParams())));
        }
        arrayList.add(getParams().get(GbdtTrainParams.LABEL_COL));
        return (String[]) arrayList.toArray(new String[0]);
    }

    @Override // com.alibaba.alink.operator.batch.BatchOperator
    public /* bridge */ /* synthetic */ BatchOperator linkFrom(BatchOperator[] batchOperatorArr) {
        return linkFrom((BatchOperator<?>[]) batchOperatorArr);
    }
}
