package com.alibaba.alink.operator.common.io.csv;

import com.alibaba.alink.common.MLEnvironmentFactory;
import com.alibaba.alink.common.annotation.Internal;
import com.alibaba.alink.common.exceptions.AkIllegalDataException;
import com.alibaba.alink.common.io.annotations.AnnotationUtils;
import com.alibaba.alink.common.io.annotations.IOType;
import com.alibaba.alink.common.io.annotations.IoOpAnnotation;
import com.alibaba.alink.common.utils.TableUtil;
import com.alibaba.alink.operator.batch.source.BaseSourceBatchOp;
import com.alibaba.alink.operator.batch.utils.DataSetConversionUtil;
import com.alibaba.alink.operator.batch.utils.DataSetUtil;
import com.alibaba.alink.operator.common.io.csv.CsvUtil;
import com.alibaba.alink.operator.common.io.partition.CsvSourceCollectorCreator;
import com.alibaba.alink.operator.common.io.reader.FSFileSplitReader;
import com.alibaba.alink.operator.common.io.reader.FileSplitReader;
import com.alibaba.alink.operator.common.io.reader.HttpFileSplitReader;
import com.alibaba.alink.params.io.CsvSourceParams;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.List;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.Operator;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

@IoOpAnnotation(name = "internal_csv_beta", ioType = IOType.SourceBatch)
@Internal
/* loaded from: input_file:com/alibaba/alink/operator/common/io/csv/InternalCsvSourceBetaBatchOp.class */
public class InternalCsvSourceBetaBatchOp extends BaseSourceBatchOp<InternalCsvSourceBetaBatchOp> implements CsvSourceParams<InternalCsvSourceBetaBatchOp> {
    public InternalCsvSourceBetaBatchOp() {
        this(new Params());
    }

    public InternalCsvSourceBetaBatchOp(Params params) {
        super(AnnotationUtils.annotatedName(InternalCsvSourceBetaBatchOp.class), params);
    }

    @Override // com.alibaba.alink.operator.batch.source.BaseSourceBatchOp
    public Table initializeDataSource() {
        Operator operator;
        String pathStr = getFilePath().getPathStr();
        String schemaStr = getSchemaStr();
        String fieldDelimiter = getFieldDelimiter();
        final String rowDelimiter = getRowDelimiter();
        final Character quoteChar = getQuoteChar();
        boolean booleanValue = getSkipBlankLine().booleanValue();
        boolean booleanValue2 = getLenient().booleanValue();
        String[] colNames = TableUtil.getColNames(schemaStr);
        TypeInformation<?>[] colTypes = TableUtil.getColTypes(schemaStr);
        final boolean booleanValue3 = getIgnoreFirstLine().booleanValue();
        String str = "";
        try {
            str = new URL(pathStr).getProtocol();
        } catch (MalformedURLException e) {
        }
        ExecutionEnvironment executionEnvironment = MLEnvironmentFactory.get(getMLEnvironmentId()).getExecutionEnvironment();
        if (getPartitions() == null) {
            FileSplitReader httpFileSplitReader = (str.equalsIgnoreCase("http") || str.equalsIgnoreCase("https")) ? new HttpFileSplitReader(pathStr) : new FSFileSplitReader(getFilePath());
            if (getQuoteChar() != null) {
                TableSchema tableSchema = new TableSchema(new String[]{"_QUOTE_NUM_", "_SPLIT_NUMBER_", "_SPLIT_INFO_"}, new TypeInformation[]{Types.LONG, Types.LONG, Types.STRING});
                Operator name = executionEnvironment.createInput(httpFileSplitReader.convertFileSplitToInputFormat(rowDelimiter, booleanValue3, quoteChar), new RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames())).name("csv_split_summary_source");
                final FileSplitReader fileSplitReader = httpFileSplitReader;
                operator = name.flatMap(new RichFlatMapFunction<Row, Row>() { // from class: com.alibaba.alink.operator.common.io.csv.InternalCsvSourceBetaBatchOp.1
                    boolean[] filedInQuote;

                    public void open(Configuration configuration) throws Exception {
                        super.open(configuration);
                        List<Row> broadcastVariable = getRuntimeContext().getBroadcastVariable("splits");
                        this.filedInQuote = new boolean[broadcastVariable.size()];
                        long[] jArr = new long[broadcastVariable.size()];
                        for (Row row : broadcastVariable) {
                            jArr[(int) ((Long) row.getField(1)).longValue()] = ((Long) row.getField(0)).longValue();
                        }
                        this.filedInQuote[0] = false;
                        for (int i = 1; i < jArr.length; i++) {
                            jArr[i] = jArr[i - 1] + jArr[i];
                            this.filedInQuote[i] = jArr[i - 1] % 2 == 1;
                        }
                    }

                    public void flatMap(Row row, Collector<Row> collector) throws Exception {
                        long longValue = ((Long) row.getField(1)).longValue();
                        InputSplit convertStringToSplitObject = fileSplitReader.convertStringToSplitObject((String) row.getField(2));
                        GenericCsvInputFormatBeta inputFormat = fileSplitReader.getInputFormat(rowDelimiter, booleanValue3, quoteChar);
                        inputFormat.setFieldInQuote(this.filedInQuote[(int) longValue]);
                        inputFormat.open(convertStringToSplitObject);
                        while (true) {
                            Row nextRecord = inputFormat.nextRecord((Row) null);
                            if (nextRecord == null) {
                                return;
                            } else {
                                collector.collect(nextRecord);
                            }
                        }
                    }

                    public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
                        flatMap((Row) obj, (Collector<Row>) collector);
                    }
                }).withBroadcastSet(name, "splits").name("csv_flat_map");
            } else {
                TableSchema tableSchema2 = new TableSchema(new String[]{"f1"}, new TypeInformation[]{Types.STRING});
                operator = executionEnvironment.createInput(httpFileSplitReader.getInputFormat(rowDelimiter, booleanValue3, quoteChar), new RowTypeInfo(tableSchema2.getFieldTypes(), tableSchema2.getFieldNames())).name("csv_source");
            }
        } else {
            try {
                operator = (DataSet) DataSetUtil.readFromPartitionBatch(getParams(), getMLEnvironmentId(), new CsvSourceCollectorCreator(new TableSchema(new String[]{"f1"}, new TypeInformation[]{Types.STRING}), rowDelimiter, booleanValue3, quoteChar)).f0;
            } catch (IOException e2) {
                throw new AkIllegalDataException(String.format("Fail to list directories in %s and select partitions", getFilePath().getPathStr()));
            }
        }
        return DataSetConversionUtil.toTable(getMLEnvironmentId(), (DataSet<Row>) operator.flatMap(new CsvUtil.ParseCsvFunc(colTypes, fieldDelimiter, quoteChar, booleanValue, booleanValue2)), colNames, colTypes);
    }
}
