package com.alibaba.alink.common.dl.plugin;

import com.alibaba.alink.common.AlinkGlobalConfiguration;
import com.alibaba.alink.common.dl.utils.FileDownloadUtils;
import com.alibaba.alink.common.dl.utils.PythonFileUtils;
import com.alibaba.alink.common.exceptions.AkIllegalOperatorParameterException;
import com.alibaba.alink.common.exceptions.AkPluginErrorException;
import com.alibaba.alink.common.exceptions.AkPreconditions;
import com.alibaba.alink.common.exceptions.AkUnclassifiedErrorException;
import com.alibaba.alink.common.exceptions.ExceptionWithErrorCode;
import com.alibaba.alink.common.io.plugin.ClassLoaderFactory;
import com.alibaba.alink.common.io.plugin.PluginDistributeCache;
import com.alibaba.alink.common.io.plugin.TemporaryClassLoaderContext;
import com.alibaba.alink.common.mapper.Mapper;
import com.alibaba.alink.common.utils.CloseableThreadLocal;
import com.alibaba.alink.common.utils.TableUtil;
import com.alibaba.alink.operator.common.optim.subfunc.OptimVariable;
import com.alibaba.alink.params.dl.HasModelPath;
import com.alibaba.alink.params.shared.colname.HasReservedColsDefaultAsNull;
import com.alibaba.alink.params.shared.colname.HasSelectedColsDefaultAsNull;
import com.alibaba.alink.params.tensorflow.savedmodel.HasOutputSchemaStr;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import java.io.File;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.codec.binary.Base64;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.util.FileUtils;
import org.objenesis.strategy.StdInstantiatorStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/alink/common/dl/plugin/DLPredictServiceMapper.class */
public abstract class DLPredictServiceMapper<FACTORY extends ClassLoaderFactory> extends Mapper implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(DLPredictServiceMapper.class);
    protected final boolean isThreadSafe;
    protected final FACTORY factory;
    protected final String[] outputCols;
    protected final Class<?>[] outputColTypeClasses;
    protected String[] inputCols;
    protected String modelPath;
    protected String localModelPath;
    protected File workDir;
    protected DLPredictorService predictor;
    protected transient CloseableThreadLocal<DLPredictorService> threadLocalPredictor;
    private final LongAdder counter;

    /* loaded from: input_file:com/alibaba/alink/common/dl/plugin/DLPredictServiceMapper$PredictorConfig.class */
    public static class PredictorConfig {
        public ClassLoaderFactory factory;
        public String modelPath;
        public Class<?>[] outputTypeClasses;
        public String[] inputNames;
        public String[] outputNames;
        public Integer intraOpNumThreads;
        public Integer interOpNumThreads;
        public Integer cudaDeviceNum;
        public boolean threadMode = true;
        public String libraryPath;

        static Kryo newKryoInstance() {
            Kryo kryo = new Kryo();
            kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
            kryo.register(PluginDistributeCache.class, new Serializer<PluginDistributeCache>() { // from class: com.alibaba.alink.common.dl.plugin.DLPredictServiceMapper.PredictorConfig.1
                public void write(Kryo kryo2, Output output, PluginDistributeCache pluginDistributeCache) {
                    kryo2.writeClassAndObject(output, new HashMap(pluginDistributeCache.context()));
                }

                public PluginDistributeCache read(Kryo kryo2, Input input, Class<PluginDistributeCache> cls) {
                    return new PluginDistributeCache((Map) kryo2.readClassAndObject(input));
                }

                /* renamed from: read, reason: collision with other method in class */
                public /* bridge */ /* synthetic */ Object m49read(Kryo kryo2, Input input, Class cls) {
                    return read(kryo2, input, (Class<PluginDistributeCache>) cls);
                }
            });
            return kryo;
        }

        public String serialize() {
            Kryo newKryoInstance = newKryoInstance();
            Output output = new Output(1, Integer.MAX_VALUE);
            newKryoInstance.writeClassAndObject(output, this);
            return Base64.encodeBase64String(output.toBytes());
        }

        public static synchronized PredictorConfig deserialize(String str) {
            return (PredictorConfig) newKryoInstance().readClassAndObject(new Input(Base64.decodeBase64(str)));
        }
    }

    public DLPredictServiceMapper(TableSchema tableSchema, Params params, FACTORY factory, boolean z) {
        super(tableSchema, params);
        this.counter = new LongAdder();
        this.factory = factory;
        this.isThreadSafe = z;
        this.inputCols = (String[]) params.get(HasSelectedColsDefaultAsNull.SELECTED_COLS);
        if (null == this.inputCols) {
            this.inputCols = tableSchema.getFieldNames();
        }
        AkPreconditions.checkArgument(params.contains(HasOutputSchemaStr.OUTPUT_SCHEMA_STR), (ExceptionWithErrorCode) new AkIllegalOperatorParameterException("Must set outputSchemaStr."));
        TableSchema schemaStr2Schema = TableUtil.schemaStr2Schema((String) params.get(HasOutputSchemaStr.OUTPUT_SCHEMA_STR));
        this.outputCols = schemaStr2Schema.getFieldNames();
        this.outputColTypeClasses = (Class[]) Arrays.stream(schemaStr2Schema.getFieldTypes()).map((v0) -> {
            return v0.getTypeClass();
        }).toArray(i -> {
            return new Class[i];
        });
        if (params.contains(HasModelPath.MODEL_PATH)) {
            this.modelPath = (String) params.get(HasModelPath.MODEL_PATH);
        }
    }

    public DLPredictServiceMapper<FACTORY> setModelPath(String str) {
        this.modelPath = str;
        return this;
    }

    protected abstract PredictorConfig getPredictorConfig();

    protected DLPredictorService createPredictor() {
        ClassLoader create = this.factory.create();
        try {
            DLPredictorService dLPredictorService = (DLPredictorService) this.factory.getClass().getMethod("create", this.factory.getClass()).invoke(null, this.factory);
            TemporaryClassLoaderContext of = TemporaryClassLoaderContext.of(create);
            Throwable th = null;
            try {
                try {
                    dLPredictorService.open(getPredictorConfig());
                    if (of != null) {
                        if (0 != 0) {
                            try {
                                of.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            of.close();
                        }
                    }
                    return dLPredictorService;
                } finally {
                }
            } catch (Throwable th3) {
                if (of != null) {
                    if (th != null) {
                        try {
                            of.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        of.close();
                    }
                }
                throw th3;
            }
        } catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
            throw new AkPluginErrorException(String.format("Failed to call %s#create(factory).", this.factory.getClass().getCanonicalName()), e);
        }
    }

    protected void destroyPredictor(DLPredictorService dLPredictorService) {
        try {
            dLPredictorService.close();
        } catch (Exception e) {
            throw new AkUnclassifiedErrorException("Failed to close predictor", e);
        }
    }

    @Override // com.alibaba.alink.common.mapper.Mapper
    public void open() {
        AkPreconditions.checkArgument(this.modelPath != null, "Model path is not set.");
        this.workDir = PythonFileUtils.createTempDir("temp_d_").toFile();
        File file = new File(this.workDir, OptimVariable.model);
        FileDownloadUtils.downloadFile(this.modelPath, file);
        this.localModelPath = file.getAbsolutePath();
        if (this.isThreadSafe) {
            this.predictor = createPredictor();
        } else {
            this.threadLocalPredictor = new CloseableThreadLocal<>(this::createPredictor, this::destroyPredictor);
        }
    }

    @Override // com.alibaba.alink.common.mapper.Mapper
    public void close() {
        if (this.isThreadSafe) {
            destroyPredictor(this.predictor);
        } else {
            this.threadLocalPredictor.close();
        }
        FileUtils.deleteDirectoryQuietly(this.workDir);
    }

    @Override // com.alibaba.alink.common.mapper.Mapper
    protected Tuple4<String[], String[], TypeInformation<?>[], String[]> prepareIoSchema(TableSchema tableSchema, Params params) {
        String[] strArr = (String[]) params.get(HasSelectedColsDefaultAsNull.SELECTED_COLS);
        if (null == strArr) {
            strArr = tableSchema.getFieldNames();
        }
        TableSchema schemaStr2Schema = TableUtil.schemaStr2Schema((String) params.get(HasOutputSchemaStr.OUTPUT_SCHEMA_STR));
        return Tuple4.of(strArr, schemaStr2Schema.getFieldNames(), schemaStr2Schema.getFieldTypes(), (String[]) params.get(HasReservedColsDefaultAsNull.RESERVED_COLS));
    }

    @Override // com.alibaba.alink.common.mapper.Mapper
    protected void map(Mapper.SlicedSelectedSample slicedSelectedSample, Mapper.SlicedResult slicedResult) throws Exception {
        DLPredictorService dLPredictorService = this.isThreadSafe ? this.predictor : this.threadLocalPredictor.get();
        long currentTimeMillis = System.currentTimeMillis();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < slicedSelectedSample.length(); i++) {
            arrayList.add(slicedSelectedSample.get(i));
        }
        List<?> predict = dLPredictorService.predict(arrayList);
        for (int i2 = 0; i2 < slicedResult.length(); i2++) {
            slicedResult.set(i2, predict.get(i2));
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        if (this.counter.sum() < 100) {
            String format = String.format("Time elapsed for %s inference: %d ms", dLPredictorService.getClass().getSimpleName(), Long.valueOf(currentTimeMillis2 - currentTimeMillis));
            LOG.info(format);
            if (AlinkGlobalConfiguration.isPrintProcessInfo()) {
                System.out.println(format);
            }
        }
        this.counter.increment();
    }
}
