package com.alibaba.alink.operator.common.io.partition;

import com.alibaba.alink.common.exceptions.AkUnclassifiedErrorException;
import com.alibaba.alink.common.io.filesystem.FilePath;
import com.alibaba.alink.common.io.filesystem.copy.csv.TextOutputFormat;
import com.alibaba.alink.operator.common.io.csv.CsvUtil;
import java.io.IOException;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

/* loaded from: input_file:com/alibaba/alink/operator/common/io/partition/CsvSinkCollectorCreator.class */
public class CsvSinkCollectorCreator implements SinkCollectorCreator {
    private final CsvUtil.FormatCsvFunc formatCsvFunc;
    private final CsvUtil.FlattenCsvFromRow flattenCsvFromRow;
    private final String rowDelimiter;

    public CsvSinkCollectorCreator(CsvUtil.FormatCsvFunc formatCsvFunc, CsvUtil.FlattenCsvFromRow flattenCsvFromRow, String str) {
        this.formatCsvFunc = formatCsvFunc;
        this.flattenCsvFromRow = flattenCsvFromRow;
        this.rowDelimiter = str;
    }

    @Override // com.alibaba.alink.operator.common.io.partition.SinkCollectorCreator
    public Collector<Row> createCollector(FilePath filePath) throws IOException {
        final TextOutputFormat textOutputFormat = new TextOutputFormat(filePath.getPath(), filePath.getFileSystem(), this.rowDelimiter);
        textOutputFormat.open(0, 1);
        return new Collector<Row>() { // from class: com.alibaba.alink.operator.common.io.partition.CsvSinkCollectorCreator.1
            public void collect(Row row) {
                try {
                    textOutputFormat.writeRecord(CsvSinkCollectorCreator.this.flattenCsvFromRow.map(CsvSinkCollectorCreator.this.formatCsvFunc.map(row)));
                } catch (Exception e) {
                    throw new AkUnclassifiedErrorException("CsvSinkCollectorCreator collect error. ", e);
                }
            }

            public void close() {
                try {
                    textOutputFormat.close();
                } catch (IOException e) {
                    throw new AkUnclassifiedErrorException("CsvSinkCollectorCreator close error. ", e);
                }
            }
        };
    }
}
