package com.alibaba.alink.operator.common.modelstream;

import com.alibaba.alink.common.io.filesystem.AkStream;
import com.alibaba.alink.common.io.filesystem.AkUtils;
import com.alibaba.alink.common.io.filesystem.BaseFileSystem;
import com.alibaba.alink.common.io.filesystem.FilePath;
import com.alibaba.alink.common.utils.JsonConverter;
import java.io.IOException;
import java.io.Serializable;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.types.Row;

/* loaded from: input_file:com/alibaba/alink/operator/common/modelstream/FileModelStreamSink.class */
public class FileModelStreamSink implements Serializable {
    private final FilePath filePath;
    private final String schemaStr;
    public static final String MODEL_CONF = "conf";
    private transient AkStream.AkWriter.AkCollector collector;

    public FileModelStreamSink(FilePath filePath, String str) {
        this.filePath = filePath;
        this.schemaStr = str;
    }

    public void initializeGlobal() throws IOException {
        BaseFileSystem<?> fileSystem = this.filePath.getFileSystem();
        Path path = new Path(this.filePath.getPath(), MODEL_CONF);
        if (!fileSystem.exists(path)) {
            fileSystem.mkdirs(path);
        } else if (!fileSystem.getFileStatus(path).isDir()) {
            throw new IllegalStateException("Conf dir of file model is exists and it it not a directory.");
        }
    }

    public void open(Timestamp timestamp, int i) throws IOException {
        this.collector = new AkStream(new FilePath(new Path(new Path(this.filePath.getPath(), MODEL_CONF), String.format("%s_%d", ModelStreamUtils.toStringPresentation(timestamp), Integer.valueOf(i))), this.filePath.getFileSystem()), new AkUtils.AkMeta(this.schemaStr)).getWriter().getCollector();
    }

    public void collect(Row row) {
        this.collector.collect(row);
    }

    public void close() {
        if (this.collector != null) {
            this.collector.close();
        }
    }

    public void finalizeGlobal(Timestamp timestamp, long j, int i, int i2) throws IOException {
        ArrayList arrayList = new ArrayList();
        for (int i3 = 0; i3 < i; i3++) {
            arrayList.add(Integer.valueOf(i3));
        }
        finalizeGlobal(timestamp, j, arrayList, i2);
    }

    public void finalizeGlobal(Timestamp timestamp, long j, List<Integer> list, int i) throws IOException {
        BaseFileSystem<?> fileSystem = this.filePath.getFileSystem();
        Path path = new Path(this.filePath.getPath(), MODEL_CONF);
        Path path2 = new Path(path, ModelStreamUtils.toStringPresentation(timestamp));
        if (fileSystem.exists(path2)) {
            throw new IOException(String.format("ModelPath: %s has existed.", path2));
        }
        fileSystem.mkdirs(path2);
        list.sort((v0, v1) -> {
            return v0.compareTo(v1);
        });
        for (int i2 = 0; i2 < list.size(); i2++) {
            Path path3 = new Path(path, String.format("%s_%d", ModelStreamUtils.toStringPresentation(timestamp), list.get(i2)));
            Path path4 = new Path(path2, String.valueOf(i2));
            if (!fileSystem.rename(path3, path4)) {
                throw new IOException(String.format("Submit sub-model %s to %s failed. Maybe folder %s exists.", path3, path4, path4));
            }
        }
        Path path5 = new Path(path, String.format("%s.log", ModelStreamUtils.toStringPresentation(timestamp)));
        try {
            FSDataOutputStream create = fileSystem.create(path5, FileSystem.WriteMode.OVERWRITE);
            Throwable th = null;
            try {
                try {
                    create.write(JsonConverter.toJson(new ModelStreamMeta(j, list.size())).getBytes());
                    if (create != null) {
                        if (0 != 0) {
                            try {
                                create.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            create.close();
                        }
                    }
                    Path path6 = new Path(this.filePath.getPath(), ModelStreamUtils.toStringPresentation(timestamp));
                    if (!fileSystem.rename(path2, path6)) {
                        throw new IOException(String.format("Submit model %s to %s failed. Maybe folder %s exists.", path2, path6, path6));
                    }
                    cleanUp(this.filePath, i);
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            fileSystem.delete(path5, false);
            throw e;
        }
    }

    private static void cleanUp(FilePath filePath, int i) throws IOException {
        if (i < 0) {
            return;
        }
        List<Timestamp> listModels = ModelStreamUtils.listModels(filePath);
        listModels.sort((v0, v1) -> {
            return v0.compareTo(v1);
        });
        BaseFileSystem<?> fileSystem = filePath.getFileSystem();
        Path path = new Path(filePath.getPath(), MODEL_CONF);
        for (int i2 = 0; i2 < listModels.size() - i; i2++) {
            fileSystem.delete(new Path(filePath.getPath(), ModelStreamUtils.toStringPresentation(listModels.get(i2))), true);
            fileSystem.delete(new Path(path, String.format("%s.log", ModelStreamUtils.toStringPresentation(listModels.get(i2)))), false);
        }
    }
}
