package com.alibaba.alink.operator.batch.sink;

import com.alibaba.alink.common.annotation.InputPorts;
import com.alibaba.alink.common.annotation.NameCn;
import com.alibaba.alink.common.annotation.NameEn;
import com.alibaba.alink.common.annotation.PortSpec;
import com.alibaba.alink.common.annotation.PortType;
import com.alibaba.alink.common.exceptions.AkUnclassifiedErrorException;
import com.alibaba.alink.common.io.annotations.AnnotationUtils;
import com.alibaba.alink.common.io.annotations.IOType;
import com.alibaba.alink.common.io.annotations.IoOpAnnotation;
import com.alibaba.alink.common.io.filesystem.FilePath;
import com.alibaba.alink.common.utils.TableUtil;
import com.alibaba.alink.operator.batch.BatchOperator;
import com.alibaba.alink.operator.common.modelstream.FileModelStreamSink;
import com.alibaba.alink.operator.common.modelstream.ModelStreamUtils;
import com.alibaba.alink.params.io.AppendModelStreamFileSinkParams;
import java.io.IOException;
import java.sql.Timestamp;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.types.Row;

@InputPorts(values = {@PortSpec(PortType.MODEL)})
@IoOpAnnotation(name = "append_model_stream", ioType = IOType.SinkBatch)
@NameCn("模型流导出")
@NameEn("Append Model Stream File Sink")
/* loaded from: input_file:com/alibaba/alink/operator/batch/sink/AppendModelStreamFileSinkBatchOp.class */
public class AppendModelStreamFileSinkBatchOp extends BaseSinkBatchOp<AppendModelStreamFileSinkBatchOp> implements AppendModelStreamFileSinkParams<AppendModelStreamFileSinkBatchOp> {
    public AppendModelStreamFileSinkBatchOp() {
        this(new Params());
    }

    public AppendModelStreamFileSinkBatchOp(Params params) {
        super(AnnotationUtils.annotatedName(AppendModelStreamFileSinkBatchOp.class), params);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.alibaba.alink.operator.batch.sink.BaseSinkBatchOp
    protected AppendModelStreamFileSinkBatchOp sinkFrom(BatchOperator<?> batchOperator) {
        FilePath filePath = getFilePath();
        final Timestamp createStartTime = ModelStreamUtils.createStartTime(getModelTime());
        final int intValue = getNumFiles().intValue();
        final int numKeepModel = getNumKeepModel();
        final FileModelStreamSink fileModelStreamSink = new FileModelStreamSink(filePath, TableUtil.schema2SchemaStr(batchOperator.getSchema()));
        try {
            fileModelStreamSink.initializeGlobal();
            DataSetUtils.countElementsPerPartition(batchOperator.getDataSet().map(new RichMapFunction<Row, Row>() { // from class: com.alibaba.alink.operator.batch.sink.AppendModelStreamFileSinkBatchOp.1
                public void open(Configuration configuration) throws Exception {
                    fileModelStreamSink.open(createStartTime, getRuntimeContext().getIndexOfThisSubtask());
                }

                public void close() throws Exception {
                    fileModelStreamSink.close();
                }

                public Row map(Row row) throws Exception {
                    fileModelStreamSink.collect(row);
                    return row;
                }
            }).setParallelism(intValue)).sum(1).output(new OutputFormat<Tuple2<Integer, Long>>() { // from class: com.alibaba.alink.operator.batch.sink.AppendModelStreamFileSinkBatchOp.2
                public void configure(Configuration configuration) {
                }

                public void open(int i, int i2) throws IOException {
                }

                public void writeRecord(Tuple2<Integer, Long> tuple2) throws IOException {
                    fileModelStreamSink.finalizeGlobal(createStartTime, ((Long) tuple2.f1).longValue(), intValue, numKeepModel);
                }

                public void close() throws IOException {
                }
            }).setParallelism(1);
            return this;
        } catch (IOException e) {
            throw new AkUnclassifiedErrorException("Error. ", e);
        }
    }

    @Override // com.alibaba.alink.operator.batch.sink.BaseSinkBatchOp
    protected /* bridge */ /* synthetic */ AppendModelStreamFileSinkBatchOp sinkFrom(BatchOperator batchOperator) {
        return sinkFrom((BatchOperator<?>) batchOperator);
    }
}
