package com.alibaba.alink.common.dl.plugin;

import com.alibaba.alink.common.AlinkGlobalConfiguration;
import com.alibaba.alink.common.dl.exchange.BytesDataExchange;
import com.alibaba.alink.common.dl.plugin.DLPredictServiceMapper;
import com.alibaba.alink.common.exceptions.AkUnclassifiedErrorException;
import com.alibaba.alink.common.exceptions.AkUnsupportedOperationException;
import com.alibaba.alink.common.io.plugin.ResourcePluginFactory;
import com.alibaba.alink.common.io.plugin.TemporaryClassLoaderContext;
import com.alibaba.alink.operator.common.pytorch.ListSerializer;
import com.alibaba.flink.ml.util.ShellExec;
import java.io.File;
import java.io.IOException;
import java.lang.ProcessBuilder;
import java.lang.reflect.Field;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/alink/common/dl/plugin/BaseDLProcessPredictorService.class */
public abstract class BaseDLProcessPredictorService<T> implements DLPredictorService {
    private static final Logger LOG = LoggerFactory.getLogger(BaseDLProcessPredictorService.class);
    private FutureTask<Void> futureTask;
    private BytesDataExchange bytesDataExchange;
    private File inQueueFile;
    private File outQueueFile;
    private File procReadyFile;
    private String libraryPath;
    private Integer intraOpParallelism;
    private ListSerializer listSerializer;
    private DLPredictServiceMapper.PredictorConfig config;

    public abstract Class<T> getPredictorClass();

    public static FutureTask<Void> startInferenceProcessWatcher(Process process) {
        Thread thread = new Thread((Runnable) new ShellExec.ProcessLogger(process.getInputStream(), str -> {
            LOG.info("Inference process stdout: {}", str);
            if (AlinkGlobalConfiguration.isPrintProcessInfo()) {
                System.out.println("Inference process stdout: " + str);
            }
        }));
        Thread thread2 = new Thread((Runnable) new ShellExec.ProcessLogger(process.getErrorStream(), str2 -> {
            LOG.error("Inference process stderr: {}", str2);
            if (AlinkGlobalConfiguration.isPrintProcessInfo()) {
                System.err.println("Inference process stderr: " + str2);
            }
        }));
        thread.setName("JavaInferenceProcess-in-logger");
        thread.setDaemon(true);
        thread2.setName("JavaInferenceProcess-err-logger");
        thread2.setDaemon(true);
        thread.start();
        thread2.start();
        FutureTask<Void> futureTask = new FutureTask<>(() -> {
            try {
                try {
                    int waitFor = process.waitFor();
                    thread.join();
                    thread2.join();
                    if (waitFor != 0) {
                        throw new AkUnclassifiedErrorException("Java inference process exited with " + waitFor);
                    }
                    LOG.info("Java inference process finished successfully");
                    process.destroyForcibly();
                } catch (InterruptedException e) {
                    LOG.info("Java inference process watcher interrupted, killing the process");
                    process.destroyForcibly();
                }
            } catch (Throwable th) {
                process.destroyForcibly();
                throw th;
            }
        }, null);
        Thread thread3 = new Thread(futureTask);
        thread3.setName("JavaInferenceWatcher");
        thread3.setDaemon(true);
        thread3.start();
        return futureTask;
    }

    public static Process launchInferenceProcess(Class<?> cls, String str, String str2, String str3, DLPredictServiceMapper.PredictorConfig predictorConfig, String str4, Integer num) throws IOException {
        ArrayList arrayList = new ArrayList();
        arrayList.add(String.join(File.separator, System.getProperty("java.home"), "bin", "java"));
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(System.getProperty("java.class.path"));
        LOG.info("java.class.path = {}", System.getProperty("java.class.path"));
        ClassLoader classLoader = ResourcePluginFactory.class.getClassLoader();
        if (classLoader instanceof URLClassLoader) {
            for (URL url : ((URLClassLoader) classLoader).getURLs()) {
                arrayList2.add(url.toString());
                LOG.info("classloader url: {}", url);
            }
        }
        arrayList.add("-cp");
        arrayList.add(String.join(File.pathSeparator, arrayList2));
        arrayList.add("-Djava.library.path=" + str4);
        arrayList.add(ProcessPredictorRunner.class.getCanonicalName());
        arrayList.add(cls.getCanonicalName());
        arrayList.add(str);
        arrayList.add(str2);
        arrayList.add(predictorConfig.serialize());
        arrayList.add(str3);
        arrayList.add(String.valueOf(predictorConfig.threadMode));
        LOG.info("Java Inference Cmd: " + String.join(" ", arrayList));
        if (AlinkGlobalConfiguration.isPrintProcessInfo()) {
            System.out.println("Java Inference Cmd: " + String.join(" ", arrayList));
        }
        ProcessBuilder processBuilder = new ProcessBuilder(arrayList);
        processBuilder.environment().put("OMP_NUM_THREADS", String.valueOf(num));
        processBuilder.redirectOutput(ProcessBuilder.Redirect.PIPE);
        processBuilder.redirectError(ProcessBuilder.Redirect.PIPE);
        return processBuilder.start();
    }

    public static void addLibraryPath(String str) throws Exception {
        Field declaredField = ClassLoader.class.getDeclaredField("usr_paths");
        declaredField.setAccessible(true);
        String[] strArr = (String[]) declaredField.get(null);
        System.out.println(Arrays.toString(strArr));
        for (String str2 : strArr) {
            if (str2.equals(str)) {
                return;
            }
        }
        String[] strArr2 = (String[]) Arrays.copyOf(strArr, strArr.length + 1);
        strArr2[strArr2.length - 1] = str;
        System.out.println(Arrays.toString(strArr2));
        declaredField.set(null, strArr2);
    }

    private FutureTask<Void> createInferFutureTask() {
        if (null != this.futureTask) {
            return this.futureTask;
        }
        try {
            return startInferenceProcessWatcher(launchInferenceProcess(getPredictorClass(), this.outQueueFile.getAbsolutePath(), this.inQueueFile.getAbsolutePath(), this.procReadyFile.getAbsolutePath(), this.config, this.libraryPath, this.intraOpParallelism));
        } catch (IOException e) {
            throw new AkUnclassifiedErrorException("Launch inference process failed.", e);
        }
    }

    private FutureTask<Void> createInferFutureTaskDebug() {
        if (null != this.config.libraryPath) {
            try {
                addLibraryPath(this.config.libraryPath);
            } catch (Exception e) {
                LOG.warn("add library path {} failed", this.config.libraryPath, e);
            }
        }
        FutureTask<Void> futureTask = new FutureTask<>(() -> {
            ProcessPredictorRunner.main(new String[]{getPredictorClass().getCanonicalName(), this.outQueueFile.getAbsolutePath(), this.inQueueFile.getAbsolutePath(), this.config.serialize(), this.procReadyFile.getAbsolutePath(), Boolean.toString(true)});
        }, null);
        Thread thread = new Thread(futureTask);
        thread.setContextClassLoader(getClass().getClassLoader());
        thread.setName("JavaInference");
        thread.setDaemon(true);
        thread.start();
        return futureTask;
    }

    private void destroyInferFutureTask(FutureTask<Void> futureTask) {
        try {
            futureTask.get();
        } catch (InterruptedException e) {
            LOG.error("Interrupted waiting for server join {}.", e.getMessage());
            futureTask.cancel(true);
        } catch (ExecutionException e2) {
            throw new AkUnclassifiedErrorException("Inference process exited with exception.", e2);
        }
    }

    @Override // com.alibaba.alink.common.dl.plugin.DLPredictorService
    public void open(DLPredictServiceMapper.PredictorConfig predictorConfig) {
        this.config = predictorConfig;
        this.libraryPath = predictorConfig.libraryPath;
        this.intraOpParallelism = predictorConfig.intraOpNumThreads;
        this.listSerializer = new ListSerializer();
        try {
            this.inQueueFile = File.createTempFile("queue-", ".input");
            this.outQueueFile = File.createTempFile("queue-", ".output");
            this.procReadyFile = File.createTempFile("queue-", ".ready");
            try {
                this.bytesDataExchange = new BytesDataExchange(this.inQueueFile.getAbsolutePath(), this.outQueueFile.getAbsolutePath());
                TemporaryClassLoaderContext of = TemporaryClassLoaderContext.of(getClass().getClassLoader());
                Throwable th = null;
                try {
                    try {
                        this.futureTask = predictorConfig.threadMode ? createInferFutureTaskDebug() : createInferFutureTask();
                        if (of != null) {
                            if (0 != 0) {
                                try {
                                    of.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                of.close();
                            }
                        }
                        while (this.procReadyFile.exists()) {
                            LOG.info("Waiting procReadyFile to be deleted.");
                            try {
                                if (this.futureTask.isDone()) {
                                    this.futureTask.get();
                                }
                                Thread.sleep(50L);
                            } catch (InterruptedException e) {
                            } catch (ExecutionException e2) {
                                throw new AkUnclassifiedErrorException("Exception thrown in inference process.", e2);
                            }
                        }
                        if (this.procReadyFile.exists()) {
                            throw new AkUnclassifiedErrorException("ProcReadyFile for inference process still exists.Inference process launched error.");
                        }
                    } finally {
                    }
                } catch (Throwable th3) {
                    if (of != null) {
                        if (th != null) {
                            try {
                                of.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            of.close();
                        }
                    }
                    throw th3;
                }
            } catch (Exception e3) {
                throw new AkUnclassifiedErrorException("Failed to create BytesDataBridge", e3);
            }
        } catch (IOException e4) {
            throw new AkUnclassifiedErrorException("Failed to create in/out queue files", e4);
        }
    }

    @Override // com.alibaba.alink.common.dl.plugin.DLPredictorService
    public void close() {
        if (null != this.bytesDataExchange) {
            this.bytesDataExchange.markWriteFinished();
            try {
                this.bytesDataExchange.close();
            } catch (IOException e) {
                LOG.info("Close BytesDataBridge failed, ignore.", e);
            }
        }
        if (null != this.inQueueFile) {
            this.inQueueFile.delete();
        }
        if (null != this.outQueueFile) {
            this.outQueueFile.delete();
        }
        destroyInferFutureTask(this.futureTask);
    }

    @Override // com.alibaba.alink.common.dl.plugin.DLPredictorService
    public List<?> predict(List<?> list) {
        try {
            this.bytesDataExchange.write(this.listSerializer.serialize(list));
            byte[] bArr = null;
            while (null == bArr) {
                try {
                    if (this.futureTask.isDone()) {
                        this.futureTask.get();
                    }
                    bArr = this.bytesDataExchange.read(false);
                } catch (IOException e) {
                    throw new AkUnclassifiedErrorException("Failed to read from data exchange.", e);
                } catch (InterruptedException | ExecutionException e2) {
                    throw new AkUnclassifiedErrorException("Exception thrown in inference process.", e2);
                }
            }
            return this.listSerializer.deserialize(bArr);
        } catch (IOException e3) {
            throw new AkUnclassifiedErrorException("Failed to write to data exchange.", e3);
        }
    }

    @Override // com.alibaba.alink.common.dl.plugin.DLPredictorService
    public List<List<?>> predictRows(List<List<?>> list, int i) {
        throw new AkUnsupportedOperationException("Not supported batch inference yet.");
    }
}
