package com.alibaba.alink.common.dl.utils;

import com.alibaba.alink.common.MLEnvironmentFactory;
import com.alibaba.alink.common.exceptions.AkUnclassifiedErrorException;
import com.alibaba.alink.common.utils.DownloadUtils;
import com.alibaba.alink.operator.batch.BatchOperator;
import com.alibaba.alink.operator.batch.utils.DataSetConversionUtil;
import com.alibaba.flink.ml.util.IpHostUtil;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.functions.RichMapPartitionFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/alink/common/dl/utils/DataSetDiskDownloader.class */
public class DataSetDiskDownloader implements Serializable {
    private static final Logger LOG;
    private static final String MARK_FILENAME = ".mark";
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:com/alibaba/alink/common/dl/utils/DataSetDiskDownloader$DownloadFilesMapPartitionFunction.class */
    static class DownloadFilesMapPartitionFunction extends RichMapPartitionFunction<Integer, Tuple2<Integer, byte[]>> {
        private final List<String> paths;

        public DownloadFilesMapPartitionFunction(List<String> list) {
            this.paths = list;
        }

        public void mapPartition(Iterable<Integer> iterable, Collector<Tuple2<Integer, byte[]>> collector) throws Exception {
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
            int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
            if (indexOfThisSubtask != 0) {
                DataSetDiskDownloader.LOG.info("No downloading on TM with taskId: " + indexOfThisSubtask + " ip: " + IpHostUtil.getIpAddress());
                return;
            }
            String path = PythonFileUtils.createTempDir(String.format("downloaded_files_%s_", Integer.valueOf(indexOfThisSubtask))).toString();
            for (String str : this.paths) {
                if (!FileDownloadUtils.isLocalPath(str)) {
                    FileInputStream fileInputStream = new FileInputStream(new File(path, FileDownloadUtils.downloadFileToDirectory(str, new File(path))));
                    byte[] bArr = new byte[65536];
                    while (true) {
                        int read = fileInputStream.read(bArr, 0, 65536);
                        if (read == -1) {
                            break;
                        }
                        if (read > 0) {
                            byte[] bArr2 = new byte[read];
                            System.arraycopy(bArr, 0, bArr2, 0, read);
                            for (int i = 0; i < numberOfParallelSubtasks; i++) {
                                collector.collect(Tuple2.of(Integer.valueOf(i), bArr2));
                            }
                        }
                    }
                    for (int i2 = 0; i2 < numberOfParallelSubtasks; i2++) {
                        collector.collect(Tuple2.of(Integer.valueOf(i2), new byte[0]));
                    }
                    DataSetDiskDownloader.LOG.info("Downloading on TM with taskId: " + indexOfThisSubtask + " ip: " + IpHostUtil.getIpAddress());
                    fileInputStream.close();
                }
            }
        }
    }

    /* loaded from: input_file:com/alibaba/alink/common/dl/utils/DataSetDiskDownloader$ExtractRenameFilesMapPartitionFunction.class */
    static class ExtractRenameFilesMapPartitionFunction extends RichMapPartitionFunction<Tuple2<Integer, byte[]>, Row> {
        private final List<String> paths;
        private final Map<String, String> renameMap;

        public ExtractRenameFilesMapPartitionFunction(List<String> list, Map<String, String> map) {
            this.paths = list;
            this.renameMap = map;
        }

        public void mapPartition(Iterable<Tuple2<Integer, byte[]>> iterable, Collector<Row> collector) throws Exception {
            Iterator<Tuple2<Integer, byte[]>> it = iterable.iterator();
            String path = PythonFileUtils.createTempDir(String.format("work_dir_%s_", Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()))).toString();
            for (String str : this.paths) {
                boolean isCompressedFile = PythonFileUtils.isCompressedFile(str);
                String str2 = this.renameMap.get(str);
                File file = new File(path, (isCompressedFile || str2 == null) ? PythonFileUtils.getFileName(str) : str2);
                if (FileDownloadUtils.isLocalPath(str)) {
                    FileDownloadUtils.downloadFile(str, file);
                } else {
                    FileOutputStream fileOutputStream = new FileOutputStream(file);
                    while (it.hasNext()) {
                        byte[] bArr = (byte[]) it.next().f1;
                        if (bArr.length <= 0) {
                            break;
                        } else {
                            fileOutputStream.write(bArr, 0, bArr.length);
                        }
                    }
                    fileOutputStream.close();
                }
                if (isCompressedFile) {
                    File file2 = str2 != null ? new File(path, str2) : new File(path);
                    PythonFileUtils.ensureDirectoryExists(file2);
                    ArchivesUtils.decompressFile(file, file2);
                }
            }
            collector.collect(Row.of(new Object[]{path}));
        }
    }

    public static void unzipUserFileFromDisk(String[] strArr, String str) throws IOException {
        String str2 = null;
        int length = strArr.length;
        int i = 0;
        while (true) {
            if (i >= length) {
                break;
            }
            String str3 = strArr[i];
            if (new File(str3).exists()) {
                str2 = str3;
                break;
            }
            i++;
        }
        if (null != str2) {
            ArchivesUtils.decompressFile(new File(str2), new File(str));
        }
    }

    private static BatchOperator handleEmptyFile() {
        return BatchOperator.fromTable(DataSetConversionUtil.toTable(MLEnvironmentFactory.DEFAULT_ML_ENVIRONMENT_ID, (DataSet<Row>) MLEnvironmentFactory.getDefault().getExecutionEnvironment().fromElements(new Integer[]{0}).mapPartition(new MapPartitionFunction<Integer, Row>() { // from class: com.alibaba.alink.common.dl.utils.DataSetDiskDownloader.1
            public void mapPartition(Iterable<Integer> iterable, Collector<Row> collector) throws Exception {
                iterable.forEach(num -> {
                });
            }
        }), new String[]{"targetPath"}, (TypeInformation<?>[]) new TypeInformation[]{TypeInformation.of(String.class)}));
    }

    public static BatchOperator downloadUserFile(final String str) {
        ExecutionEnvironment executionEnvironment = MLEnvironmentFactory.getDefault().getExecutionEnvironment();
        if (null == str || str.length() == 0) {
            return handleEmptyFile();
        }
        return BatchOperator.fromTable(DataSetConversionUtil.toTable(MLEnvironmentFactory.DEFAULT_ML_ENVIRONMENT_ID, (DataSet<Row>) executionEnvironment.fromElements(new Tuple2[]{Tuple2.of(1, 1)}).partitionCustom(new Partitioner<Integer>() { // from class: com.alibaba.alink.common.dl.utils.DataSetDiskDownloader.3
            public int partition(Integer num, int i) {
                return num.intValue() % i;
            }
        }, 0).map(new MapFunction<Tuple2<Integer, Integer>, Integer>() { // from class: com.alibaba.alink.common.dl.utils.DataSetDiskDownloader.2
            public Integer map(Tuple2<Integer, Integer> tuple2) throws Exception {
                return (Integer) tuple2.f1;
            }
        }).mapPartition(new RichMapPartitionFunction<Integer, Tuple2<Integer, byte[]>>() { // from class: com.alibaba.alink.common.dl.utils.DataSetDiskDownloader.6
            String targetFileName;
            String targetDir;

            public void open(Configuration configuration) throws Exception {
                if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
                    this.targetFileName = str.contains("\\") ? str.substring(str.lastIndexOf(92) + 1) : str.substring(str.lastIndexOf(47) + 1);
                    this.targetDir = PythonFileUtils.createTempDir("temp_user_files_").toString();
                    File file = Paths.get(this.targetDir, this.targetFileName).toAbsolutePath().toFile();
                    if (file.exists()) {
                        file.delete();
                    }
                }
            }

            public void mapPartition(Iterable<Integer> iterable, Collector<Tuple2<Integer, byte[]>> collector) throws Exception {
                int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
                int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
                if (indexOfThisSubtask != 0) {
                    DataSetDiskDownloader.LOG.info("No downloading on TM with taskId: " + indexOfThisSubtask + " ip: " + IpHostUtil.getIpAddress());
                    return;
                }
                DownloadUtils.resumableDownloadHttpFile(str, this.targetDir, this.targetFileName);
                FileInputStream fileInputStream = new FileInputStream(Paths.get(this.targetDir, this.targetFileName).toAbsolutePath().toFile());
                byte[] bArr = new byte[65536];
                while (true) {
                    int read = fileInputStream.read(bArr, 0, 65536);
                    if (read == -1) {
                        DataSetDiskDownloader.LOG.info("Downloading on TM with taskId: " + indexOfThisSubtask + " ip: " + IpHostUtil.getIpAddress());
                        fileInputStream.close();
                        return;
                    }
                    byte[] bArr2 = new byte[read];
                    System.arraycopy(bArr, 0, bArr2, 0, read);
                    for (int i = 0; i < numberOfParallelSubtasks; i++) {
                        collector.collect(Tuple2.of(Integer.valueOf(i), bArr2));
                    }
                }
            }
        }).partitionCustom(new Partitioner<Integer>() { // from class: com.alibaba.alink.common.dl.utils.DataSetDiskDownloader.5
            public int partition(Integer num, int i) {
                return num.intValue();
            }
        }, 0).mapPartition(new RichMapPartitionFunction<Tuple2<Integer, byte[]>, Row>() { // from class: com.alibaba.alink.common.dl.utils.DataSetDiskDownloader.4
            String targetFileName;
            String targetDir;

            public void open(Configuration configuration) throws Exception {
                this.targetFileName = str.contains("\\") ? str.substring(str.lastIndexOf(92) + 1) : str.substring(str.lastIndexOf(47) + 1);
                this.targetDir = PythonFileUtils.createTempDir("temp_user_files_").toString();
                File file = Paths.get(this.targetDir, this.targetFileName).toAbsolutePath().toFile();
                if (file.exists()) {
                    file.delete();
                }
            }

            public void mapPartition(Iterable<Tuple2<Integer, byte[]>> iterable, Collector<Row> collector) throws Exception {
                int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
                FileOutputStream fileOutputStream = new FileOutputStream(Paths.get(this.targetDir, this.targetFileName).toAbsolutePath().toFile(), true);
                for (Tuple2<Integer, byte[]> tuple2 : iterable) {
                    fileOutputStream.write((byte[]) tuple2.f1, 0, ((byte[]) tuple2.f1).length);
                }
                fileOutputStream.close();
                DataSetDiskDownloader.LOG.info("Write to disk on TM with taskId: " + indexOfThisSubtask + " ip: " + IpHostUtil.getIpAddress());
                Row row = new Row(1);
                row.setField(0, this.targetDir + File.separator + this.targetFileName);
                collector.collect(row);
            }
        }), new String[]{"targetPath"}, (TypeInformation<?>[]) new TypeInformation[]{TypeInformation.of(String.class)}));
    }

    public static BatchOperator<?> downloadFilesWithRename(Long l, List<String> list, Map<String, String> map) {
        ExecutionEnvironment executionEnvironment = MLEnvironmentFactory.getDefault().getExecutionEnvironment();
        TypeInformation typeInformation = Types.STRING;
        return list.size() == 0 ? handleEmptyFile() : BatchOperator.fromTable(DataSetConversionUtil.toTable(l, (DataSet<Row>) executionEnvironment.fromElements(new Tuple2[]{Tuple2.of(1, 1)}).partitionCustom(new Partitioner<Integer>() { // from class: com.alibaba.alink.common.dl.utils.DataSetDiskDownloader.8
            public int partition(Integer num, int i) {
                return num.intValue() % i;
            }
        }, 0).map(new MapFunction<Tuple2<Integer, Integer>, Integer>() { // from class: com.alibaba.alink.common.dl.utils.DataSetDiskDownloader.7
            public Integer map(Tuple2<Integer, Integer> tuple2) throws Exception {
                return (Integer) tuple2.f1;
            }
        }).mapPartition(new DownloadFilesMapPartitionFunction(list)).partitionCustom(new Partitioner<Integer>() { // from class: com.alibaba.alink.common.dl.utils.DataSetDiskDownloader.9
            public int partition(Integer num, int i) {
                return num.intValue();
            }
        }, 0).mapPartition(new ExtractRenameFilesMapPartitionFunction(list, map)), new String[]{"targetPath"}, (TypeInformation<?>[]) new TypeInformation[]{TypeInformation.of(String.class)}));
    }

    static synchronized boolean markPath(File file) {
        try {
            return file.createNewFile();
        } catch (IOException e) {
            return false;
        }
    }

    public static synchronized String getDownloadPath(String[] strArr) {
        String str = null;
        int length = strArr.length;
        int i = 0;
        while (true) {
            if (i >= length) {
                break;
            }
            String str2 = strArr[i];
            if (new File(str2).exists() && markPath(new File(str2, MARK_FILENAME))) {
                str = str2;
                break;
            }
            i++;
        }
        if (str == null) {
            throw new AkUnclassifiedErrorException("Cannot get download path from candidates: " + Arrays.toString(strArr));
        }
        return str;
    }

    public static void moveFilesToWorkDir(String[] strArr, File file) throws IOException {
        File[] listFiles = new File(getDownloadPath(strArr)).listFiles();
        if (!$assertionsDisabled && listFiles == null) {
            throw new AssertionError();
        }
        for (File file2 : listFiles) {
            Files.createSymbolicLink(new File(file, file2.getName()).toPath(), file2.toPath(), new FileAttribute[0]);
        }
    }

    static {
        $assertionsDisabled = !DataSetDiskDownloader.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(DataSetDiskDownloader.class);
    }
}
