package com.alibaba.alink.common.dl.utils;

import com.alibaba.alink.common.AlinkGlobalConfiguration;
import com.alibaba.alink.common.dl.DLConstants;
import com.alibaba.alink.common.dl.DLRunner;
import com.alibaba.alink.common.exceptions.AkPreconditions;
import com.alibaba.alink.common.exceptions.AkUnclassifiedErrorException;
import com.alibaba.alink.common.utils.JsonConverter;
import com.alibaba.flink.ml.cluster.ExecutionMode;
import com.alibaba.flink.ml.cluster.MLConfig;
import com.alibaba.flink.ml.cluster.node.MLContext;
import com.alibaba.flink.ml.cluster.role.BaseRole;
import com.alibaba.flink.ml.cluster.rpc.NodeServer;
import com.alibaba.flink.ml.data.DataExchange;
import com.alibaba.flink.ml.util.IpHostUtil;
import com.alibaba.flink.ml.util.MLException;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.function.Consumer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/alink/common/dl/utils/DLClusterUtils.class */
public class DLClusterUtils {
    private static final Logger LOG = LoggerFactory.getLogger(DLClusterUtils.class);

    public static MLContext makeMLContext(int i, MLConfig mLConfig, ExecutionMode executionMode) {
        int parseInt = Integer.parseInt((String) mLConfig.getProperties().get(DLConstants.NUM_WORKERS));
        Integer.parseInt((String) mLConfig.getProperties().get(DLConstants.NUM_PSS));
        Tuple2<BaseRole, Integer> roleAndIndex = DLRunner.getRoleAndIndex(i, parseInt);
        try {
            mLConfig.getProperties().put("current_work_dir", PythonFileUtils.createTempDir(String.format("temp_%d_", Integer.valueOf(i))).toString());
            try {
                return new MLContext(executionMode, mLConfig, ((BaseRole) roleAndIndex.f0).name(), ((Integer) roleAndIndex.f1).intValue(), mLConfig.getEnvPath(), (Map) null);
            } catch (MLException e) {
                throw new AkUnclassifiedErrorException("Failed to create MLContext: ", e);
            }
        } catch (Exception e2) {
            throw new AkUnclassifiedErrorException("Failed to crate temporary work dir: ", e2);
        }
    }

    public static void setMLContextIpPorts(int i, MLContext mLContext, List<Tuple3<Integer, String, Integer>> list) throws Exception {
        String[] strArr = new String[list.size()];
        int[] iArr = new int[list.size()];
        for (Tuple3<Integer, String, Integer> tuple3 : list) {
            int intValue = ((Integer) tuple3.f0).intValue();
            strArr[intValue] = (String) tuple3.f1;
            if (i == intValue) {
                AkPreconditions.checkState(strArr[intValue].equals(IpHostUtil.getIpAddress()), "task allocation changed");
            }
            iArr[intValue] = ((Integer) tuple3.f2).intValue();
        }
        DLUtils.safePutProperties(mLContext, DLRunner.IPS, JsonConverter.toJson(strArr));
        DLUtils.safePutProperties(mLContext, DLRunner.PORTS, JsonConverter.toJson(iArr));
    }

    public static Tuple3<DataExchange<Row, Row>, FutureTask<Void>, Thread> startDLCluster(MLContext mLContext) {
        DataExchange dataExchange = new DataExchange(mLContext);
        FutureTask futureTask = new FutureTask(new NodeServer(mLContext, mLContext.getRoleName()), null);
        Thread thread = new Thread(futureTask);
        thread.setDaemon(true);
        thread.setName("NodeServer_" + mLContext.getIdentity());
        thread.start();
        LOG.info("start: {}, index: {}", mLContext.getRoleName(), Integer.valueOf(mLContext.getIndex()));
        if (AlinkGlobalConfiguration.isPrintProcessInfo()) {
            System.out.println("start:" + mLContext.getRoleName() + " index:" + mLContext.getIndex());
        }
        return Tuple3.of(dataExchange, futureTask, thread);
    }

    public static void stopCluster(MLContext mLContext, FutureTask<Void> futureTask, Consumer<?> consumer) {
        if (null == mLContext) {
            return;
        }
        if (mLContext.getOutputQueue() != null) {
            mLContext.getOutputQueue().markFinished();
        }
        try {
            if (mLContext.getRoleName() == "ps") {
                LOG.info("PS job return");
                return;
            }
            try {
                consumer.accept(null);
                if (futureTask != null && !futureTask.isCancelled()) {
                    futureTask.get();
                }
                int failNum = mLContext.getFailNum();
                try {
                    mLContext.close();
                } catch (IOException e) {
                    LOG.error("Fail to close mlContext.", e);
                }
                if (failNum > 0) {
                    throw new AkUnclassifiedErrorException("Python script run failed, please check TaskManager logs.");
                }
            } catch (InterruptedException e2) {
                LOG.error("Interrupted waiting for server join {}.", e2.getMessage());
                futureTask.cancel(true);
                int failNum2 = mLContext.getFailNum();
                try {
                    mLContext.close();
                } catch (IOException e3) {
                    LOG.error("Fail to close mlContext.", e3);
                }
                if (failNum2 > 0) {
                    throw new AkUnclassifiedErrorException("Python script run failed, please check TaskManager logs.");
                }
            } catch (ExecutionException e4) {
                LOG.error(mLContext.getIdentity() + " node server failed");
                throw new AkUnclassifiedErrorException(mLContext.getIdentity() + " node server failed", e4);
            }
        } catch (Throwable th) {
            int failNum3 = mLContext.getFailNum();
            try {
                mLContext.close();
            } catch (IOException e5) {
                LOG.error("Fail to close mlContext.", e5);
            }
            if (failNum3 <= 0) {
                throw th;
            }
            throw new AkUnclassifiedErrorException("Python script run failed, please check TaskManager logs.");
        }
    }
}
