package com.alibaba.alink.common.dl;

import com.alibaba.alink.common.AlinkGlobalConfiguration;
import com.alibaba.alink.common.dl.utils.PythonFileUtils;
import com.alibaba.alink.common.exceptions.AkPreconditions;
import com.alibaba.alink.common.io.plugin.OsType;
import com.alibaba.alink.common.io.plugin.OsUtils;
import com.alibaba.flink.ml.cluster.node.MLContext;
import com.alibaba.flink.ml.cluster.node.runner.python.ProcessPythonRunner;
import com.alibaba.flink.ml.util.MLException;
import com.alibaba.flink.ml.util.ShellExec;
import com.google.common.base.Joiner;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import org.apache.flink.util.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/alink/common/dl/ProcessPythonRunnerV2.class */
public class ProcessPythonRunnerV2 extends ProcessPythonRunner implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(ProcessPythonRunnerV2.class);
    private volatile Process child;
    private static final String CALL_CONDA_UNPACK_SCRIPT = "/call_conda_pack.sh";
    private static final String WIN_CALL_CONDA_UNPACK_SCRIPT = "/call_conda_pack.bat";

    public ProcessPythonRunnerV2(MLContext mLContext) {
        super(mLContext);
        this.child = null;
    }

    public void runScript() throws IOException {
        String str = (String) this.mlContext.getProperties().get("startup_file_name");
        ArrayList arrayList = new ArrayList();
        String str2 = "python" + ((String) this.mlContext.getProperties().getOrDefault("python.version", ""));
        String str3 = System.getenv("PATH");
        String str4 = (String) this.mlContext.getProperties().getOrDefault("virtual_env_dir", "");
        if (!str4.isEmpty()) {
            if (OsType.WINDOWS.equals(OsUtils.getSystemType())) {
                str2 = str4 + File.separator + "python";
            } else {
                str2 = str4 + "/bin/python";
                callCondaUnpack(str4);
                str3 = str4 + "/bin:" + str3;
            }
        }
        arrayList.add(str2);
        if (this.mlContext.startWithStartup()) {
            arrayList.add(str);
            LOG.info("Running {} via {}", this.mlContext.getScript().getName(), str);
        } else {
            arrayList.add(this.mlContext.getScript().getAbsolutePath());
        }
        arrayList.add(String.format("%s:%d", this.mlContext.getNodeServerIP(), Integer.valueOf(this.mlContext.getNodeServerPort())));
        ProcessBuilder processBuilder = new ProcessBuilder(arrayList);
        processBuilder.environment().clear();
        String str5 = null;
        if (!OsType.WINDOWS.equals(OsUtils.getSystemType())) {
            str5 = getClassPath();
        }
        if (str5 == null) {
            LOG.warn("Cannot find proper classpath for the Python process.");
        } else {
            this.mlContext.putEnvProperty("CLASSPATH", str5);
        }
        this.mlContext.putEnvProperty("PATH", str3);
        this.mlContext.putEnvProperty("PYTHONIOENCODING", "utf8");
        if (System.getenv().containsKey("CUDA_VISIBLE_DEVICES")) {
            this.mlContext.putEnvProperty("CUDA_VISIBLE_DEVICES", System.getenv("CUDA_VISIBLE_DEVICES"));
        }
        buildProcessBuilder(processBuilder);
        LOG.info("{} Python cmd: {}", this.mlContext.getIdentity(), Joiner.on(" ").join(arrayList));
        runProcess(processBuilder);
    }

    protected synchronized void callCondaUnpack(String str) throws IOException {
        String str2;
        String str3;
        String str4;
        if ((!OsType.WINDOWS.equals(OsUtils.getSystemType()) || (Files.exists(Paths.get(str, "Scripts", "activate.bat"), new LinkOption[0]) && Files.exists(Paths.get(str, "Scripts", "conda-unpack.exe"), new LinkOption[0]))) && Files.exists(Paths.get(str, "bin", "activate"), new LinkOption[0]) && Files.exists(Paths.get(str, "bin", "conda-unpack"), new LinkOption[0])) {
            if (OsType.WINDOWS.equals(OsUtils.getSystemType())) {
                str2 = WIN_CALL_CONDA_UNPACK_SCRIPT;
                str3 = ".bat";
                str4 = "cmd.exe";
            } else {
                str2 = CALL_CONDA_UNPACK_SCRIPT;
                str3 = ".sh";
                str4 = "/bin/bash";
            }
            Path createTempFile = PythonFileUtils.createTempFile("call_conda_pack", str3);
            InputStream resourceAsStream = getClass().getResourceAsStream(str2);
            Throwable th = null;
            try {
                try {
                    AkPreconditions.checkNotNull(resourceAsStream, "Cannot get resource " + str2);
                    Files.copy(resourceAsStream, createTempFile, StandardCopyOption.REPLACE_EXISTING);
                    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                        try {
                            FileUtils.deleteFileOrDirectory(createTempFile.toFile());
                        } catch (IOException e) {
                            LOG.info("Failed to delete {}.", createTempFile.toFile().getAbsolutePath(), e);
                        }
                    }));
                    if (resourceAsStream != null) {
                        if (0 != 0) {
                            try {
                                resourceAsStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            resourceAsStream.close();
                        }
                    }
                    String[] strArr = {str4, createTempFile.toAbsolutePath().toString(), str};
                    LOG.info("{} Python cmd: {}", this.mlContext.getIdentity(), Joiner.on(" ").join(strArr));
                    if (AlinkGlobalConfiguration.isPrintProcessInfo()) {
                        System.out.println("Python cmd: " + Joiner.on(" ").join(strArr));
                    }
                    try {
                        runProcess(new ProcessBuilder(new String[0]).command(strArr).directory(new File(str)));
                    } catch (Exception e) {
                        LOG.info("Call conda-unpack failed, ignore it: {}", e.toString());
                        if (AlinkGlobalConfiguration.isPrintProcessInfo()) {
                            System.err.println("Call conda-unpack failed, ignore it: " + e);
                        }
                    }
                    LOG.info("Leave ProcessPythonRunnerV2.callCondaUnpack");
                    if (AlinkGlobalConfiguration.isPrintProcessInfo()) {
                        System.err.println("Leave ProcessPythonRunnerV2.callCondaUnpack");
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (resourceAsStream != null) {
                    if (th != null) {
                        try {
                            resourceAsStream.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        resourceAsStream.close();
                    }
                }
                throw th4;
            }
        }
    }

    protected void runProcess(ProcessBuilder processBuilder) throws IOException {
        this.child = processBuilder.start();
        Thread thread = new Thread((Runnable) new ShellExec.ProcessLogger(this.child.getInputStream(), str -> {
            LOG.info("Python stdout: {}", str);
            if (AlinkGlobalConfiguration.isPrintProcessInfo()) {
                System.out.println(str);
            }
        }));
        Thread thread2 = new Thread((Runnable) new ShellExec.ProcessLogger(this.child.getErrorStream(), str2 -> {
            LOG.info("Python stderr: {}", str2);
            if (AlinkGlobalConfiguration.isPrintProcessInfo()) {
                System.out.println(str2);
            }
        }));
        thread.setName(this.mlContext.getIdentity() + "-in-logger");
        thread.setDaemon(true);
        thread2.setName(this.mlContext.getIdentity() + "-err-logger");
        thread2.setDaemon(true);
        thread.start();
        thread2.start();
        int i = 0;
        while (true) {
            try {
                try {
                    if (this.child.waitFor(5L, TimeUnit.SECONDS)) {
                        i = this.child.exitValue();
                        break;
                    } else if (this.toKill.get()) {
                        break;
                    }
                } catch (InterruptedException e) {
                    LOG.warn("{} interrupted, killing the process", this.mlContext.getIdentity());
                    killProcess();
                    return;
                }
            } finally {
                killProcess();
            }
        }
        if (i != 0) {
            throw new MLException(String.format("%s python process exited with code %d", this.mlContext.getIdentity(), Integer.valueOf(i)));
        }
    }

    private synchronized void killProcess() {
        if (this.child == null || !this.child.isAlive()) {
            return;
        }
        LOG.info("Force kill {} process", this.mlContext.getIdentity());
        this.child.destroyForcibly();
        this.child = null;
    }
}
