package com.alibaba.alink.operator.common.io.csv;

import com.alibaba.alink.common.MLEnvironmentFactory;
import com.alibaba.alink.common.annotation.Internal;
import com.alibaba.alink.common.exceptions.AkUnclassifiedErrorException;
import com.alibaba.alink.common.io.annotations.AnnotationUtils;
import com.alibaba.alink.common.io.annotations.IOType;
import com.alibaba.alink.common.io.annotations.IoOpAnnotation;
import com.alibaba.alink.common.io.filesystem.FilePath;
import com.alibaba.alink.common.io.filesystem.copy.csv.RowCsvInputFormat;
import com.alibaba.alink.common.utils.TableUtil;
import com.alibaba.alink.operator.batch.source.BaseSourceBatchOp;
import com.alibaba.alink.operator.batch.utils.DataSetConversionUtil;
import com.alibaba.alink.operator.batch.utils.DataSetUtil;
import com.alibaba.alink.operator.common.io.csv.CsvUtil;
import com.alibaba.alink.operator.common.io.partition.CsvSourceCollectorCreator;
import com.alibaba.alink.operator.common.io.reader.HttpFileSplitReader;
import com.alibaba.alink.params.io.CsvSourceParams;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.Operator;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.core.fs.Path;
import org.apache.flink.ml.api.misc.param.ParamInfo;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;

@IoOpAnnotation(name = "internal_csv", ioType = IOType.SourceBatch)
@Internal
/* loaded from: input_file:com/alibaba/alink/operator/common/io/csv/InternalCsvSourceBatchOp.class */
public final class InternalCsvSourceBatchOp extends BaseSourceBatchOp<InternalCsvSourceBatchOp> implements CsvSourceParams<InternalCsvSourceBatchOp> {
    private static final long serialVersionUID = -3105428980027751387L;

    public InternalCsvSourceBatchOp() {
        this(new Params());
    }

    public InternalCsvSourceBatchOp(Params params) {
        super(AnnotationUtils.annotatedName(InternalCsvSourceBatchOp.class), params);
    }

    public InternalCsvSourceBatchOp(String str, String str2) {
        this(new Params().set((ParamInfo<ParamInfo<String>>) FILE_PATH, (ParamInfo<String>) new FilePath(str).serialize()).set((ParamInfo<ParamInfo<String>>) SCHEMA_STR, (ParamInfo<String>) str2));
    }

    public InternalCsvSourceBatchOp(String str, TableSchema tableSchema) {
        this(new Params().set((ParamInfo<ParamInfo<String>>) FILE_PATH, (ParamInfo<String>) new FilePath(str).serialize()).set((ParamInfo<ParamInfo<String>>) SCHEMA_STR, (ParamInfo<String>) TableUtil.schema2SchemaStr(tableSchema)));
    }

    public InternalCsvSourceBatchOp(String str, String[] strArr, TypeInformation<?>[] typeInformationArr, String str2, String str3) {
        this(new Params().set((ParamInfo<ParamInfo<String>>) FILE_PATH, (ParamInfo<String>) new FilePath(str).serialize()).set((ParamInfo<ParamInfo<String>>) SCHEMA_STR, (ParamInfo<String>) TableUtil.schema2SchemaStr(new TableSchema(strArr, typeInformationArr))).set((ParamInfo<ParamInfo<String>>) FIELD_DELIMITER, (ParamInfo<String>) str2).set((ParamInfo<ParamInfo<String>>) ROW_DELIMITER, (ParamInfo<String>) str3));
    }

    @Override // com.alibaba.alink.operator.batch.source.BaseSourceBatchOp
    public Table initializeDataSource() {
        Operator operator;
        String pathStr = getFilePath().getPathStr();
        String schemaStr = getSchemaStr();
        String fieldDelimiter = getFieldDelimiter();
        String rowDelimiter = getRowDelimiter();
        Character quoteChar = getQuoteChar();
        boolean booleanValue = getSkipBlankLine().booleanValue();
        boolean booleanValue2 = getLenient().booleanValue();
        String[] colNames = TableUtil.getColNames(schemaStr);
        TypeInformation<?>[] colTypes = TableUtil.getColTypes(schemaStr);
        boolean booleanValue3 = getIgnoreFirstLine().booleanValue();
        String str = "";
        try {
            str = new URL(pathStr).getProtocol();
        } catch (MalformedURLException e) {
        }
        ExecutionEnvironment executionEnvironment = MLEnvironmentFactory.get(getMLEnvironmentId()).getExecutionEnvironment();
        TableSchema tableSchema = new TableSchema(new String[]{"f1"}, new TypeInformation[]{Types.STRING});
        if (getPartitions() != null) {
            try {
                operator = (DataSet) DataSetUtil.readFromPartitionBatch(getParams(), getMLEnvironmentId(), new CsvSourceCollectorCreator(tableSchema, rowDelimiter, booleanValue3, quoteChar)).f0;
            } catch (IOException e2) {
                throw new AkUnclassifiedErrorException(String.format("Fail to list directories in %s and select partitions", getFilePath().getPathStr()));
            }
        } else if (str.equalsIgnoreCase("http") || str.equalsIgnoreCase("https")) {
            operator = executionEnvironment.createInput(new GenericCsvInputFormat(new HttpFileSplitReader(pathStr), tableSchema.getFieldTypes(), rowDelimiter, rowDelimiter, booleanValue3), new RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames())).name("http_csv_source");
        } else {
            RowCsvInputFormat rowCsvInputFormat = new RowCsvInputFormat(new Path(pathStr), tableSchema.getFieldTypes(), rowDelimiter, rowDelimiter, new int[]{0}, true, getFilePath().getFileSystem());
            rowCsvInputFormat.setSkipFirstLineAsHeader(booleanValue3);
            operator = executionEnvironment.createInput(rowCsvInputFormat).name("csv_source");
        }
        return DataSetConversionUtil.toTable(getMLEnvironmentId(), (DataSet<Row>) operator.flatMap(new CsvUtil.ParseCsvFunc(colTypes, fieldDelimiter, quoteChar, booleanValue, booleanValue2)), colNames, colTypes);
    }
}
