package com.alibaba.alink.common.io.directreader;

import com.alibaba.alink.common.exceptions.AkIllegalArgumentException;
import com.alibaba.alink.common.io.annotations.AnnotationUtils;
import com.alibaba.alink.operator.batch.BatchOperator;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Paths;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.ServiceLoader;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.ml.api.misc.param.ParamInfo;
import org.apache.flink.ml.api.misc.param.ParamInfoFactory;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.types.Row;

/* loaded from: input_file:com/alibaba/alink/common/io/directreader/DirectReader.class */
public class DirectReader implements Serializable {
    private static final String DIRECT_READER_PREFIX = "direct.reader";
    private static final String DIRECT_READER_CONFIG_FILE_PATH = "direct_reader.properties";
    private static final ParamInfo<String> POLICY_KEY = ParamInfoFactory.createParamInfo("policy", String.class).setDescription("policy of direct reader").setRequired().build();
    private static final long serialVersionUID = 8810531309886342278L;

    public static DataBridge collect(BatchOperator<?> batchOperator) {
        Params readProperties = readProperties();
        String str = (String) readProperties.get(POLICY_KEY);
        Iterator it = ServiceLoader.load(DataBridgeGenerator.class, DirectReader.class.getClassLoader()).iterator();
        while (it.hasNext()) {
            DataBridgeGenerator dataBridgeGenerator = (DataBridgeGenerator) it.next();
            if (str.equals(((DataBridgeGeneratorPolicy) dataBridgeGenerator.getClass().getAnnotation(DataBridgeGeneratorPolicy.class)).policy())) {
                return dataBridgeGenerator.generate(batchOperator, readProperties);
            }
        }
        throw new AkIllegalArgumentException("Can not find the policy: " + str);
    }

    public static List<Row> directRead(BatchOperator<?> batchOperator) {
        return directRead(collect(batchOperator));
    }

    public static List<Row> directRead(DataBridge dataBridge) {
        return dataBridge.read(null);
    }

    public static List<Row> directRead(DataBridge dataBridge, FilterFunction<Row> filterFunction) {
        return dataBridge.read(filterFunction);
    }

    private static Properties filterProperties(Properties properties) {
        Properties properties2 = new Properties();
        for (String str : properties.stringPropertyNames()) {
            if (str.startsWith(DIRECT_READER_PREFIX)) {
                properties2.put(str.substring(DIRECT_READER_PREFIX.length() + 1), properties.getProperty(str));
            }
        }
        return properties2;
    }

    private static Params properties2Params(Properties properties) {
        if (!properties.containsKey(POLICY_KEY.getName())) {
            throw new AkIllegalArgumentException("Error properties. it has not policy key");
        }
        String property = properties.getProperty(POLICY_KEY.getName());
        Params params = new Params().set((ParamInfo<ParamInfo<String>>) POLICY_KEY, (ParamInfo<String>) property);
        for (String str : properties.stringPropertyNames()) {
            if (str.startsWith(property)) {
                params.set((ParamInfo<ParamInfo<String>>) AnnotationUtils.dynamicParamKey(str.substring(property.length() + 1)), (ParamInfo<String>) properties.getProperty(str));
            }
        }
        return params;
    }

    private static Params readProperties() {
        Properties properties = new Properties();
        InputStream inputStream = null;
        if (Files.exists(Paths.get(DIRECT_READER_CONFIG_FILE_PATH, new String[0]), new LinkOption[0])) {
            try {
                inputStream = new FileInputStream(DIRECT_READER_CONFIG_FILE_PATH);
            } catch (FileNotFoundException e) {
            }
        } else {
            inputStream = Thread.currentThread().getContextClassLoader().getResourceAsStream(DIRECT_READER_CONFIG_FILE_PATH);
        }
        if (inputStream != null) {
            try {
                properties.load(inputStream);
            } catch (IOException e2) {
            }
        }
        Properties properties2 = new Properties();
        properties2.put(POLICY_KEY.getName(), ((DataBridgeGeneratorPolicy) MemoryDataBridgeGenerator.class.getAnnotation(DataBridgeGeneratorPolicy.class)).policy());
        properties2.putAll(filterProperties(properties));
        properties2.putAll(filterProperties(System.getProperties()));
        properties2.putAll(filterProperties(DirectReaderPropertiesStore.getProperties()));
        return properties2Params(properties2);
    }
}
