package com.alibaba.alink.common.pyrunner.fn;

import com.alibaba.alink.common.AlinkGlobalConfiguration;
import com.alibaba.alink.common.exceptions.AkIllegalOperationException;
import com.alibaba.alink.common.exceptions.AkPreconditions;
import com.alibaba.alink.common.exceptions.ExceptionWithErrorCode;
import com.alibaba.alink.common.linalg.VectorUtil;
import com.alibaba.alink.common.pyrunner.bridge.BasePythonBridge;
import com.alibaba.alink.common.type.AlinkTypes;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

/* loaded from: input_file:com/alibaba/alink/common/pyrunner/fn/PyTableFn.class */
public class PyTableFn extends TableFunction<Row> implements Serializable {
    protected final String name;
    protected final String fnSpecJson;
    protected final Map<String, String> runConfig;
    protected PyTableFnRunner runner;
    protected String[] resultTypeStrs;
    protected TypeInformation<?>[] resultTypes;

    /* loaded from: input_file:com/alibaba/alink/common/pyrunner/fn/PyTableFn$PyCollector.class */
    public static class PyCollector implements Collector<List<Object>> {
        private Consumer<Row> collectFn;
        private final TypeInformation<?>[] resultTypes;

        public PyCollector(Consumer<Row> consumer, TypeInformation<?>[] typeInformationArr) {
            this.collectFn = consumer;
            this.resultTypes = typeInformationArr;
        }

        public void setCollectFn(Consumer<Row> consumer) {
            this.collectFn = consumer;
        }

        public void collect(List<Object> list) {
            AkPreconditions.checkArgument(list.size() == this.resultTypes.length, (ExceptionWithErrorCode) new AkIllegalOperationException("Python UDTF returns wrong number of elements."));
            Object[] objArr = new Object[this.resultTypes.length];
            for (int i = 0; i < this.resultTypes.length; i++) {
                objArr[i] = DataConversionUtils.pyToJava(list.get(i), this.resultTypes[i]);
            }
            this.collectFn.accept(Row.of(objArr));
        }

        public void close() {
        }
    }

    public PyTableFn(String str, String str2, String[] strArr) {
        this(str, str2, strArr, Collections.emptyMap());
    }

    public PyTableFn(String str, String str2, String[] strArr, Map<String, String> map) {
        this.name = str;
        this.fnSpecJson = str2;
        this.resultTypeStrs = strArr;
        this.runConfig = map;
    }

    public void open(FunctionContext functionContext) throws Exception {
        super.open(functionContext);
        Tuple2<String, Map<String, String>> updateFnSpecRunConfigWithPlugin = PyFnUtils.updateFnSpecRunConfigWithPlugin(this.fnSpecJson, this.runConfig);
        String str = (String) updateFnSpecRunConfigWithPlugin.f0;
        Map map = (Map) updateFnSpecRunConfigWithPlugin.f1;
        this.runner = new PyTableFnRunner(new PyCollector((v1) -> {
            collect(v1);
        }, this.resultTypes), str, this.resultTypeStrs, (str2, str3) -> {
            return BasePythonBridge.PY_TURN_ON_LOGGING_KEY.equals(str2) ? String.valueOf(AlinkGlobalConfiguration.isPrintProcessInfo()) : (String) map.getOrDefault(str2, functionContext.getJobParameter(str2, str3));
        });
        this.runner.open();
    }

    public void close() throws Exception {
        this.runner.close();
        super.close();
    }

    public void eval(Object... objArr) {
        this.runner.calc(objArr);
    }

    public TypeInformation<Row> getResultType() {
        this.resultTypes = (TypeInformation[]) Arrays.stream(this.resultTypeStrs).map(AlinkTypes::getTypeInformation).toArray(i -> {
            return new TypeInformation[i];
        });
        return Types.ROW(this.resultTypes);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -298056162:
                if (implMethodName.equals("lambda$open$ff8acc7b$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case VectorUtil.VectorSerialType.DENSE_VECTOR /* 0 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/alibaba/alink/common/utils/Functional$SerializableBiFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/alibaba/alink/common/pyrunner/fn/PyTableFn") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Map;Lorg/apache/flink/table/functions/FunctionContext;Ljava/lang/String;Ljava/lang/String;)Ljava/lang/String;")) {
                    Map map = (Map) serializedLambda.getCapturedArg(0);
                    FunctionContext functionContext = (FunctionContext) serializedLambda.getCapturedArg(1);
                    return (str2, str3) -> {
                        return BasePythonBridge.PY_TURN_ON_LOGGING_KEY.equals(str2) ? String.valueOf(AlinkGlobalConfiguration.isPrintProcessInfo()) : (String) map.getOrDefault(str2, functionContext.getJobParameter(str2, str3));
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
