package com.alibaba.alink.operator.common.io.reader;

import com.alibaba.alink.common.exceptions.AkIllegalArgumentException;
import com.alibaba.alink.common.exceptions.AkUnclassifiedErrorException;
import com.alibaba.alink.common.io.filesystem.BaseFileSystem;
import com.alibaba.alink.common.io.filesystem.FilePath;
import com.alibaba.alink.common.io.filesystem.FileSystemUtils;
import com.alibaba.alink.common.io.filesystem.copy.FileInputFormat;
import com.alibaba.alink.operator.batch.associationrule.PrefixSpanBatchOp;
import com.alibaba.alink.operator.common.io.csv.GenericCsvInputFormatBeta;
import com.alibaba.alink.operator.common.outlier.TimeSeriesAnomsUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import org.apache.flink.api.common.io.FilePathFilter;
import org.apache.flink.api.common.io.GlobFilePathFilter;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.BlockLocation;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/alink/operator/common/io/reader/FSFileSplitReader.class */
public class FSFileSplitReader implements FileSplitReader, AutoCloseable {
    private final Path readerPath;
    private final BaseFileSystem fs;
    protected static final long openTimeout = 300000;
    private FSCsvInputFormat inputFormat = null;
    private transient Path filePath;
    private transient FileInputSplit split;
    private transient FSDataInputStream stream;

    /* loaded from: input_file:com/alibaba/alink/operator/common/io/reader/FSFileSplitReader$FSCsvInputFormat.class */
    public static class FSCsvInputFormat extends GenericCsvInputFormatBeta<FileInputSplit> {
        private static final Logger LOG = LoggerFactory.getLogger(FSCsvInputFormat.class);
        private static final float MAX_SPLIT_SIZE_DISCREPANCY = 1.1f;
        private static long DEFAULT_OPENING_TIMEOUT;
        private final Path filePath;
        protected long minSplitSize;
        protected int numSplits;
        protected long openTimeout;
        private long offset;
        protected boolean enumerateNestedFiles;
        private FilePathFilter filesFilter;
        private final BaseFileSystem<?> fs;

        private static void initDefaultsFromConfiguration(Configuration configuration) {
            long j = configuration.getLong("taskmanager.runtime.fs_timeout", 0L);
            if (j < 0) {
                LOG.error("Invalid timeout value for filesystem stream opening: " + j + ". Using default value of 0");
                DEFAULT_OPENING_TIMEOUT = 0L;
            } else if (j == 0) {
                DEFAULT_OPENING_TIMEOUT = FSFileSplitReader.openTimeout;
            } else {
                DEFAULT_OPENING_TIMEOUT = j;
            }
        }

        public FSCsvInputFormat(FSFileSplitReader fSFileSplitReader, String str, boolean z) {
            super(fSFileSplitReader, str, z);
            this.minSplitSize = 0L;
            this.numSplits = -1;
            this.openTimeout = DEFAULT_OPENING_TIMEOUT;
            this.offset = -1L;
            this.enumerateNestedFiles = true;
            this.filesFilter = new GlobFilePathFilter();
            this.fs = fSFileSplitReader.getFs();
            this.filePath = fSFileSplitReader.getFilePath();
        }

        public FSCsvInputFormat(FSFileSplitReader fSFileSplitReader, String str, boolean z, Character ch) {
            super(fSFileSplitReader, str, z, ch);
            this.minSplitSize = 0L;
            this.numSplits = -1;
            this.openTimeout = DEFAULT_OPENING_TIMEOUT;
            this.offset = -1L;
            this.enumerateNestedFiles = true;
            this.filesFilter = new GlobFilePathFilter();
            this.fs = fSFileSplitReader.getFs();
            this.filePath = fSFileSplitReader.getFilePath();
        }

        public FSCsvInputFormat(FSFileSplitReader fSFileSplitReader, String str, boolean z, boolean z2, Character ch) {
            super(fSFileSplitReader, str, z, z2, ch);
            this.minSplitSize = 0L;
            this.numSplits = -1;
            this.openTimeout = DEFAULT_OPENING_TIMEOUT;
            this.offset = -1L;
            this.enumerateNestedFiles = true;
            this.filesFilter = new GlobFilePathFilter();
            this.fs = fSFileSplitReader.getFs();
            this.filePath = fSFileSplitReader.getFilePath();
        }

        @Override // com.alibaba.alink.operator.common.io.csv.GenericCsvInputFormatBeta
        public void configure(Configuration configuration) {
        }

        @Override // com.alibaba.alink.operator.common.io.csv.GenericCsvInputFormatBeta
        public BaseStatistics getStatistics(BaseStatistics baseStatistics) throws IOException {
            return null;
        }

        @Override // com.alibaba.alink.operator.common.io.csv.GenericCsvInputFormatBeta
        public void open(FileInputSplit fileInputSplit) throws IOException {
            super.open((FSCsvInputFormat) fileInputSplit);
            if (fileInputSplit.getStart() <= 0 && !this.ignoreFirstLine) {
                fillBuffer(0);
            } else {
                if (readLine()) {
                    return;
                }
                setEnd(true);
            }
        }

        public void openWithoutSkipLine(FileInputSplit fileInputSplit) throws IOException {
            super.open((FSCsvInputFormat) fileInputSplit);
        }

        @Override // com.alibaba.alink.operator.common.io.csv.GenericCsvInputFormatBeta
        public FileInputSplit[] createInputSplits(int i) throws IOException {
            long j;
            if (i < 1) {
                throw new AkIllegalArgumentException("Number of input splits has to be at least 1.");
            }
            int max = Math.max(i, this.numSplits);
            ArrayList arrayList = new ArrayList(max);
            ArrayList arrayList2 = new ArrayList();
            long j2 = 0;
            if (this.filePath != null) {
                FileStatus fileStatus = FileSystemUtils.getFlinkFileSystem(this.fs, this.filePath.toString()).getFileStatus(this.filePath);
                if (fileStatus.isDir()) {
                    j2 = 0 + addFilesInDir(this.filePath, arrayList2, true);
                } else {
                    arrayList2.add(fileStatus);
                    j2 = 0 + fileStatus.getLen();
                }
            }
            if (this.unsplittable) {
                int i2 = 0;
                for (FileStatus fileStatus2 : arrayList2) {
                    BlockLocation[] fileBlockLocations = FileSystemUtils.getFlinkFileSystem(this.fs, fileStatus2.getPath().toString()).getFileBlockLocations(fileStatus2, 0L, fileStatus2.getLen());
                    HashSet hashSet = new HashSet();
                    for (BlockLocation blockLocation : fileBlockLocations) {
                        hashSet.addAll(Arrays.asList(blockLocation.getHosts()));
                    }
                    int i3 = i2;
                    i2++;
                    arrayList.add(new FileInputSplit(i3, fileStatus2.getPath(), 0L, fileStatus2.getLen(), (String[]) hashSet.toArray(new String[hashSet.size()])));
                }
                return (FileInputSplit[]) arrayList.toArray(new FileInputSplit[arrayList.size()]);
            }
            long j3 = (j2 / max) + (j2 % ((long) max) == 0 ? 0 : 1);
            int i4 = 0;
            for (FileStatus fileStatus3 : arrayList2) {
                FileSystem flinkFileSystem = FileSystemUtils.getFlinkFileSystem(this.fs, fileStatus3.getPath().toString());
                long len = fileStatus3.getLen();
                long blockSize = fileStatus3.getBlockSize();
                if (this.minSplitSize <= blockSize) {
                    j = this.minSplitSize;
                } else {
                    if (LOG.isWarnEnabled()) {
                        LOG.warn("Minimal split size of " + this.minSplitSize + " is larger than the block size of " + blockSize + ". Decreasing minimal split size to block size.");
                    }
                    j = blockSize;
                }
                long max2 = Math.max(j, Math.min(j3, blockSize));
                long j4 = max2 >>> 1;
                long j5 = ((float) max2) * MAX_SPLIT_SIZE_DISCREPANCY;
                if (len > 0) {
                    BlockLocation[] fileBlockLocations2 = flinkFileSystem.getFileBlockLocations(fileStatus3, 0L, len);
                    Arrays.sort(fileBlockLocations2);
                    long j6 = len;
                    long j7 = 0;
                    int i5 = 0;
                    while (j6 > j5) {
                        i5 = getBlockIndexForPosition(fileBlockLocations2, j7, j4, i5);
                        int i6 = i4;
                        i4++;
                        arrayList.add(new FileInputSplit(i6, fileStatus3.getPath(), j7, max2, fileBlockLocations2[i5].getHosts()));
                        j7 += max2;
                        j6 -= max2;
                    }
                    if (j6 > 0) {
                        int i7 = i4;
                        i4++;
                        arrayList.add(new FileInputSplit(i7, fileStatus3.getPath(), j7, j6, fileBlockLocations2[getBlockIndexForPosition(fileBlockLocations2, j7, j4, i5)].getHosts()));
                    }
                } else {
                    BlockLocation[] fileBlockLocations3 = flinkFileSystem.getFileBlockLocations(fileStatus3, 0L, 0L);
                    int i8 = i4;
                    i4++;
                    arrayList.add(new FileInputSplit(i8, fileStatus3.getPath(), 0L, 0L, fileBlockLocations3.length > 0 ? fileBlockLocations3[0].getHosts() : new String[0]));
                }
            }
            return (FileInputSplit[]) arrayList.toArray(new FileInputSplit[arrayList.size()]);
        }

        private int getBlockIndexForPosition(BlockLocation[] blockLocationArr, long j, long j2, int i) {
            int i2 = i;
            while (i2 < blockLocationArr.length) {
                long offset = blockLocationArr[i2].getOffset();
                long length = offset + blockLocationArr[i2].getLength();
                if (j >= offset && j < length) {
                    return (i2 >= blockLocationArr.length - 1 || length - j >= j2) ? i2 : i2 + 1;
                }
                i2++;
            }
            throw new AkIllegalArgumentException("The given offset is not contained in the any block.");
        }

        private long addFilesInDir(Path path, List<FileStatus> list, boolean z) throws IOException {
            long j = 0;
            for (FileStatus fileStatus : FileSystemUtils.getFlinkFileSystem(this.fs, path.toString()).listStatus(path)) {
                if (fileStatus.isDir()) {
                    j += addFilesInDir(fileStatus.getPath(), list, z);
                } else if (acceptFile(fileStatus)) {
                    list.add(fileStatus);
                    j += fileStatus.getLen();
                } else if (z && LOG.isDebugEnabled()) {
                    LOG.debug("Directory " + fileStatus.getPath().toString() + " did not pass the file-filter and is excluded.");
                }
            }
            return j;
        }

        private boolean acceptFile(FileStatus fileStatus) {
            String name = fileStatus.getPath().getName();
            return (name.startsWith("_") || name.startsWith(".") || new GlobFilePathFilter().filterPath(fileStatus.getPath())) ? false : true;
        }

        static {
            initDefaultsFromConfiguration(GlobalConfiguration.loadConfiguration());
        }
    }

    /* loaded from: input_file:com/alibaba/alink/operator/common/io/reader/FSFileSplitReader$FSCsvSplitInputFormat.class */
    public static class FSCsvSplitInputFormat extends FSCsvInputFormat {
        public FSCsvSplitInputFormat(FSFileSplitReader fSFileSplitReader, String str, boolean z, Character ch) {
            super(fSFileSplitReader, str, z, false, ch);
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // com.alibaba.alink.operator.common.io.reader.FSFileSplitReader.FSCsvInputFormat, com.alibaba.alink.operator.common.io.csv.GenericCsvInputFormatBeta
        public void open(FileInputSplit fileInputSplit) throws IOException {
            super.openWithoutSkipLine(fileInputSplit);
        }

        @Override // com.alibaba.alink.operator.common.io.csv.GenericCsvInputFormatBeta
        public Row nextRecord(Row row) throws IOException {
            long analyzeSplit = QuoteUtil.analyzeSplit(this.reader, this.quoteCharacter);
            StringBuilder sb = new StringBuilder();
            sb.append(this.currentSplit.toString());
            sb.append("[");
            for (String str : this.currentSplit.getHostnames()) {
                sb.append(str).append(PrefixSpanBatchOp.ELEMENT_SEPARATOR);
            }
            sb.append("]");
            setEnd(true);
            this.reader.close();
            return Row.of(new Object[]{Long.valueOf(analyzeSplit), Long.valueOf(this.reader.getSplitNumber()), sb.toString()});
        }

        public static FileInputSplit fromString(String str) {
            int indexOf = str.indexOf("[");
            int indexOf2 = str.indexOf("]");
            int indexOf3 = str.indexOf("[", indexOf + 1);
            int indexOf4 = str.indexOf("]", indexOf2 + 1);
            int lastIndexOf = str.lastIndexOf(TimeSeriesAnomsUtils.VAL_DELIMITER);
            int lastIndexOf2 = str.lastIndexOf("+");
            int intValue = Integer.valueOf(str.substring(indexOf + 1, indexOf2)).intValue();
            String substring = str.substring(indexOf2 + 2, lastIndexOf);
            return new FileInputSplit(intValue, new Path(substring), Long.valueOf(str.substring(lastIndexOf + 1, lastIndexOf2)).longValue(), Long.valueOf(str.substring(lastIndexOf2 + 1, indexOf3)).longValue(), str.substring(indexOf3 + 1, indexOf4).split(PrefixSpanBatchOp.ELEMENT_SEPARATOR));
        }
    }

    public FSFileSplitReader(FilePath filePath) {
        this.readerPath = filePath.getPath();
        this.fs = filePath.getFileSystem();
    }

    @Override // com.alibaba.alink.operator.common.io.reader.FileSplitReader
    public void open(InputSplit inputSplit) throws IOException {
        reopen(inputSplit, ((FileInputSplit) inputSplit).getStart());
    }

    @Override // com.alibaba.alink.operator.common.io.reader.FileSplitReader
    public void reopen(InputSplit inputSplit, long j) throws IOException {
        this.split = (FileInputSplit) inputSplit;
        long length = this.split.getLength();
        FileInputFormat.InputSplitOpenThread inputSplitOpenThread = new FileInputFormat.InputSplitOpenThread(this.split, openTimeout, this.fs);
        inputSplitOpenThread.start();
        try {
            this.stream = inputSplitOpenThread.waitForCompletion();
            this.filePath = this.split.getPath();
            if (j > 0) {
                this.stream.seek(j);
            }
        } catch (Throwable th) {
            throw new AkUnclassifiedErrorException("Error opening the Input Split " + this.split.getPath() + " [" + j + "," + length + "]: " + th.getMessage(), th);
        }
    }

    @Override // com.alibaba.alink.operator.common.io.reader.FileSplitReader, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.stream != null) {
            this.stream.close();
            this.stream = null;
        }
    }

    @Override // com.alibaba.alink.operator.common.io.reader.FileSplitReader
    public int read(byte[] bArr, int i, int i2) throws IOException {
        return this.stream.read(bArr, i, i2);
    }

    @Override // com.alibaba.alink.operator.common.io.reader.FileSplitReader
    public long getFileLength() {
        try {
            return this.fs.getFileStatus(this.readerPath).getLen();
        } catch (IOException e) {
            return 0L;
        }
    }

    @Override // com.alibaba.alink.operator.common.io.reader.FileSplitReader
    public long getSplitLength() {
        return this.split.getLength();
    }

    @Override // com.alibaba.alink.operator.common.io.reader.FileSplitReader
    public long getSplitStart() {
        return this.split.getStart();
    }

    public String getSplit() {
        return this.split.toString();
    }

    @Override // com.alibaba.alink.operator.common.io.reader.FileSplitReader
    public long getSplitEnd() {
        try {
            return this.fs.getFileStatus(this.filePath).getLen();
        } catch (IOException e) {
            return 0L;
        }
    }

    @Override // com.alibaba.alink.operator.common.io.reader.FileSplitReader
    public long getSplitNumber() {
        return this.split.getSplitNumber();
    }

    public Path getFilePath() {
        return this.readerPath;
    }

    public BaseFileSystem getFs() {
        return this.fs;
    }

    @Override // com.alibaba.alink.operator.common.io.reader.FileSplitReader
    public FSCsvInputFormat getInputFormat(String str, boolean z, Character ch) {
        if (this.inputFormat == null) {
            this.inputFormat = new FSCsvInputFormat(this, str, z, ch);
        }
        return this.inputFormat;
    }

    @Override // com.alibaba.alink.operator.common.io.reader.FileSplitReader
    public FSCsvInputFormat convertFileSplitToInputFormat(String str, boolean z, Character ch) {
        return new FSCsvSplitInputFormat(this, str, z, ch);
    }

    @Override // com.alibaba.alink.operator.common.io.reader.FileSplitReader
    public InputSplit convertStringToSplitObject(String str) {
        return FSCsvSplitInputFormat.fromString(str);
    }
}
