package com.alibaba.alink.operator.common.io.partition;

import com.alibaba.alink.common.io.filesystem.AkUtils;
import com.alibaba.alink.common.io.filesystem.FilePath;
import com.alibaba.alink.operator.common.io.reader.FSFileSplitReader;
import java.io.IOException;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

/* loaded from: input_file:com/alibaba/alink/operator/common/io/partition/CsvSourceCollectorCreator.class */
public class CsvSourceCollectorCreator implements SourceCollectorCreator {
    private final String rowDelim;
    private final Character quoteChar;
    private final boolean ignoreFirstLine;
    private final String[] dataFieldNames;
    private final TypeInformation[] dataFieldTypes;

    public CsvSourceCollectorCreator(TableSchema tableSchema, String str, boolean z, Character ch) {
        this.dataFieldNames = tableSchema.getFieldNames();
        this.dataFieldTypes = tableSchema.getFieldTypes();
        this.rowDelim = str;
        this.ignoreFirstLine = z;
        this.quoteChar = ch;
    }

    @Override // com.alibaba.alink.operator.common.io.partition.SourceCollectorCreator
    public TableSchema schema() {
        return new TableSchema(this.dataFieldNames, this.dataFieldTypes);
    }

    @Override // com.alibaba.alink.operator.common.io.partition.SourceCollectorCreator
    public void collect(FilePath filePath, final Collector<Row> collector) throws IOException {
        AkUtils.getFromFolderForEach(filePath, new AkUtils.FileProcFunction<FilePath, Boolean>() { // from class: com.alibaba.alink.operator.common.io.partition.CsvSourceCollectorCreator.1
            @Override // com.alibaba.alink.common.io.filesystem.AkUtils.FileProcFunction
            public Boolean apply(FilePath filePath2) throws IOException {
                Row nextRecord;
                FSFileSplitReader fSFileSplitReader = new FSFileSplitReader(filePath2);
                FSFileSplitReader.FSCsvInputFormat inputFormat = fSFileSplitReader.getInputFormat(CsvSourceCollectorCreator.this.rowDelim, CsvSourceCollectorCreator.this.ignoreFirstLine, CsvSourceCollectorCreator.this.quoteChar);
                try {
                    inputFormat.open((FSFileSplitReader.FSCsvInputFormat) new FileInputSplit(1, filePath2.getPath(), 0L, fSFileSplitReader.getFileLength(), (String[]) null));
                    while (!inputFormat.reachedEnd() && (nextRecord = inputFormat.nextRecord((Row) null)) != null) {
                        collector.collect(nextRecord);
                    }
                    return true;
                } finally {
                    inputFormat.close();
                }
            }
        });
    }
}
