package com.alibaba.alink.operator.common.io.partition;

import com.alibaba.alink.common.MTableUtil;
import com.alibaba.alink.common.exceptions.AkIllegalDataException;
import com.alibaba.alink.common.io.filesystem.AkUtils;
import com.alibaba.alink.common.io.filesystem.BaseFileSystem;
import com.alibaba.alink.common.io.filesystem.FilePath;
import com.alibaba.alink.common.utils.TableUtil;
import com.alibaba.alink.operator.local.LocalOperator;
import com.alibaba.alink.params.io.HasFilePath;
import com.alibaba.alink.params.io.shared.HasPartitionColsDefaultAsNull;
import com.alibaba.alink.params.io.shared.HasPartitions;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.Path;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

/* loaded from: input_file:com/alibaba/alink/operator/common/io/partition/LocalUtils.class */
public class LocalUtils {
    public static Tuple2<List<Row>, TableSchema> readFromPartitionLocal(Params params, SourceCollectorCreator sourceCollectorCreator) throws IOException {
        return readFromPartitionLocal(params, sourceCollectorCreator, null);
    }

    public static Tuple2<List<Row>, TableSchema> readFromPartitionLocal(Params params, final SourceCollectorCreator sourceCollectorCreator, String[] strArr) throws IOException {
        final FilePath deserialize = FilePath.deserialize((String) params.get(HasFilePath.FILE_PATH));
        LocalOperator<?> selectPartitionLocalOp = AkUtils.selectPartitionLocalOp(deserialize, (String) params.get(HasPartitions.PARTITIONS), strArr);
        final String[] colNames = selectPartitionLocalOp.getColNames();
        return Tuple2.of(MTableUtil.flatMapWithMultiThreads(selectPartitionLocalOp.getOutputTable(), params, new MTableUtil.FlatMapFunction() { // from class: com.alibaba.alink.operator.common.io.partition.LocalUtils.1
            @Override // com.alibaba.alink.common.MTableUtil.FlatMapFunction
            public void flatMap(Row row, Collector<Row> collector) throws Exception {
                Path path = FilePath.this.getPath();
                for (int i = 0; i < row.getArity(); i++) {
                    path = new Path(path, String.format("%s=%s", colNames[i], row.getField(i)));
                }
                sourceCollectorCreator.collect(new FilePath(path, FilePath.this.getFileSystem()), collector);
            }
        }), sourceCollectorCreator.schema());
    }

    public static void partitionAndWriteFile(LocalOperator<?> localOperator, final SinkCollectorCreator sinkCollectorCreator, Params params) {
        TableSchema schema = localOperator.getSchema();
        String[] strArr = (String[]) params.get(HasPartitionColsDefaultAsNull.PARTITION_COLS);
        final int[] findColIndicesWithAssertAndHint = TableUtil.findColIndicesWithAssertAndHint(schema, strArr);
        final int[] findColIndices = TableUtil.findColIndices(schema.getFieldNames(), (String[]) ArrayUtils.removeElements(schema.getFieldNames(), strArr));
        final FilePath deserialize = FilePath.deserialize((String) params.get(HasFilePath.FILE_PATH));
        MTableUtil.groupFunc(localOperator.getOutputTable(), strArr, new MTableUtil.GroupFunction() { // from class: com.alibaba.alink.operator.common.io.partition.LocalUtils.2
            @Override // com.alibaba.alink.common.MTableUtil.GroupFunction
            public void calc(List<Row> list, Collector<Row> collector) {
                try {
                    Path path = FilePath.this.getPath();
                    BaseFileSystem<?> fileSystem = FilePath.this.getFileSystem();
                    Path path2 = new Path(path.getPath());
                    for (int i : findColIndicesWithAssertAndHint) {
                        path2 = new Path(path2, list.get(0).getField(i).toString());
                    }
                    fileSystem.mkdirs(path2);
                    Collector<Row> createCollector = sinkCollectorCreator.createCollector(new FilePath(new Path(path2, "0.inprogress"), fileSystem));
                    Iterator<Row> it = list.iterator();
                    while (it.hasNext()) {
                        createCollector.collect(Row.project(it.next(), findColIndices));
                    }
                    if (createCollector != null) {
                        createCollector.close();
                        fileSystem.rename(new Path(path2, "0.inprogress"), new Path(path2, "0"));
                    }
                } catch (IOException e) {
                    throw new AkIllegalDataException("Fail to create partition directories or write files.", e);
                }
            }
        });
    }
}
