package com.alibaba.alink.operator.local.sink;

import com.alibaba.alink.common.MTable;
import com.alibaba.alink.common.annotation.OutputPorts;
import com.alibaba.alink.common.exceptions.AkIllegalOperationException;
import com.alibaba.alink.common.exceptions.AkIllegalStateException;
import com.alibaba.alink.operator.local.AlinkLocalSession;
import com.alibaba.alink.operator.local.LocalOperator;
import com.alibaba.alink.operator.local.sink.BaseSinkLocalOp;
import com.alibaba.alink.operator.local.utils.MTableSerializeLocalOp;
import com.alibaba.alink.operator.local.utils.TensorSerializeLocalOp;
import com.alibaba.alink.operator.local.utils.VectorSerializeLocalOp;
import com.alibaba.alink.params.shared.HasNumThreads;
import java.io.IOException;
import java.util.List;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.flink.api.common.io.CleanupWhenUnsuccessful;
import org.apache.flink.api.common.io.FinalizeOnMaster;
import org.apache.flink.api.common.io.InitializeOnMaster;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.ml.api.misc.param.Params;

@OutputPorts
/* loaded from: input_file:com/alibaba/alink/operator/local/sink/BaseSinkLocalOp.class */
public abstract class BaseSinkLocalOp<T extends BaseSinkLocalOp<T>> extends LocalOperator<T> {
    /* JADX INFO: Access modifiers changed from: protected */
    public BaseSinkLocalOp(Params params) {
        super(params);
    }

    @Override // com.alibaba.alink.operator.local.LocalOperator
    public T linkFrom(LocalOperator<?>... localOperatorArr) {
        return sinkFrom(((MTableSerializeLocalOp) ((VectorSerializeLocalOp) checkAndGetFirst(localOperatorArr).link(new VectorSerializeLocalOp())).link(new MTableSerializeLocalOp())).link(new TensorSerializeLocalOp()));
    }

    protected abstract T sinkFrom(LocalOperator<?> localOperator);

    @Override // com.alibaba.alink.operator.local.LocalOperator
    public final MTable getOutputTable() {
        throw new AkIllegalOperationException("Sink Operator has no output data.");
    }

    @Override // com.alibaba.alink.operator.local.LocalOperator
    protected final MTable[] getSideOutputTables() {
        throw new AkIllegalOperationException("Sink Operator has no side-output data.");
    }

    public static <T> void output(List<T> list, OutputFormat<T> outputFormat, Params params) {
        int defaultNumThreads = LocalOperator.getDefaultNumThreads();
        if (params.contains(HasNumThreads.NUM_THREADS)) {
            defaultNumThreads = ((Integer) params.get(HasNumThreads.NUM_THREADS)).intValue();
        }
        output(list, outputFormat, defaultNumThreads);
    }

    public static <T> void output(List<T> list, OutputFormat<T> outputFormat, int i) {
        if (outputFormat == null) {
            return;
        }
        int size = list.size();
        AlinkLocalSession.IOTaskRunner iOTaskRunner = new AlinkLocalSession.IOTaskRunner();
        try {
            try {
                outputFormat.configure(new Configuration());
                if (outputFormat instanceof InitializeOnMaster) {
                    ((InitializeOnMaster) outputFormat).initializeGlobal(i);
                }
                byte[] serialize = SerializationUtils.serialize(outputFormat);
                for (int i2 = 0; i2 < i; i2++) {
                    int startPos = (int) AlinkLocalSession.DISTRIBUTOR.startPos(i2, i, size);
                    int localRowCnt = (int) AlinkLocalSession.DISTRIBUTOR.localRowCnt(i2, i, size);
                    int i3 = i2;
                    if (localRowCnt > 0) {
                        iOTaskRunner.submit(() -> {
                            CleanupWhenUnsuccessful cleanupWhenUnsuccessful = (OutputFormat) SerializationUtils.deserialize(serialize);
                            cleanupWhenUnsuccessful.configure(new Configuration());
                            if (cleanupWhenUnsuccessful instanceof RichOutputFormat) {
                            }
                            try {
                                try {
                                    cleanupWhenUnsuccessful.open(i3, i);
                                    for (int i4 = startPos; i4 < startPos + localRowCnt; i4++) {
                                        cleanupWhenUnsuccessful.writeRecord(list.get(i4));
                                    }
                                } catch (IOException e) {
                                    if (cleanupWhenUnsuccessful instanceof CleanupWhenUnsuccessful) {
                                        try {
                                            cleanupWhenUnsuccessful.tryCleanupOnError();
                                        } catch (Exception e2) {
                                            e2.printStackTrace();
                                        }
                                    }
                                    e.printStackTrace();
                                    try {
                                        cleanupWhenUnsuccessful.close();
                                    } catch (Exception e3) {
                                        e3.printStackTrace();
                                    }
                                }
                            } finally {
                                try {
                                    cleanupWhenUnsuccessful.close();
                                } catch (Exception e4) {
                                    e4.printStackTrace();
                                }
                            }
                        });
                    }
                }
                iOTaskRunner.join();
            } catch (IOException e) {
                throw new AkIllegalStateException("Output format error.", e);
            }
        } finally {
            if (outputFormat instanceof FinalizeOnMaster) {
                try {
                    ((FinalizeOnMaster) outputFormat).finalizeGlobal(i);
                } catch (IOException e2) {
                }
            }
        }
    }

    @Override // com.alibaba.alink.operator.local.LocalOperator
    public /* bridge */ /* synthetic */ LocalOperator linkFrom(LocalOperator[] localOperatorArr) {
        return linkFrom((LocalOperator<?>[]) localOperatorArr);
    }
}
