package com.alibaba.alink.operator.common.modelstream;

import com.alibaba.alink.common.concurrent.ExecutorThreadFactory;
import com.alibaba.alink.common.concurrent.FutureUtils;
import com.alibaba.alink.common.exceptions.AkUnclassifiedErrorException;
import com.alibaba.alink.common.exceptions.ReadModelStreamModelRuntimeException;
import com.alibaba.alink.common.exceptions.ScanFailRuntimeException;
import com.alibaba.alink.common.io.filesystem.AkUtils;
import com.alibaba.alink.common.io.filesystem.BaseFileSystem;
import com.alibaba.alink.common.io.filesystem.FilePath;
import com.alibaba.alink.common.mapper.ModelMapper;
import java.io.IOException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.fs.Path;
import org.apache.flink.types.Row;
import org.apache.flink.util.ExecutorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/alink/operator/common/modelstream/ModelStreamFileScanner.class */
public class ModelStreamFileScanner {
    private static final Logger LOG = LoggerFactory.getLogger(ModelStreamFileScanner.class);
    private final int schedulerPoolSize;
    private final int executorPoolSize;
    private ScheduledExecutorService scheduler;
    private ExecutorService executor;

    /* loaded from: input_file:com/alibaba/alink/operator/common/modelstream/ModelStreamFileScanner$ScanTask.class */
    public static class ScanTask {
        private final FilePath filePath;
        private final Timestamp statTime;
        private Timestamp latest;

        public ScanTask(FilePath filePath, Timestamp timestamp) {
            this.filePath = filePath;
            this.statTime = timestamp;
        }

        public List<Timestamp> doScanFile() throws IOException {
            return (List) ModelStreamUtils.listModels(this.filePath).stream().filter(timestamp -> {
                return timestamp.compareTo(this.statTime) >= 0;
            }).collect(Collectors.toList());
        }

        public synchronized List<Timestamp> doUpdateAndRemoveLatest(List<Timestamp> list) {
            if (list.isEmpty()) {
                return new ArrayList();
            }
            Timestamp timestamp = this.latest;
            this.latest = list.stream().max(Comparator.naturalOrder()).get();
            return timestamp == null ? list : (List) list.stream().filter(timestamp2 -> {
                return timestamp2.compareTo(timestamp) > 0;
            }).collect(Collectors.toList());
        }

        public List<Row> doReadLatestModel(List<Timestamp> list) throws IOException {
            if (list.isEmpty()) {
                return null;
            }
            Timestamp timestamp = list.stream().max(Comparator.naturalOrder()).get();
            AkUtils.FileForEachReaderIterable fileForEachReaderIterable = new AkUtils.FileForEachReaderIterable();
            BaseFileSystem<?> fileSystem = this.filePath.getFileSystem();
            Path path = new Path(this.filePath.getPath(), ModelStreamUtils.toStringPresentation(timestamp));
            if (!fileSystem.exists(path)) {
                throw new IllegalStateException("Model " + path.getPath() + " is not exists.");
            }
            AkUtils.getFromFolderForEach(new FilePath(path, fileSystem), fileForEachReaderIterable);
            ArrayList arrayList = new ArrayList();
            Iterator<Row> it = fileForEachReaderIterable.iterator();
            while (it.hasNext()) {
                arrayList.add(it.next());
            }
            return arrayList;
        }

        public void doUpdateModel(List<Row> list, Function<List<Row>, ModelMapper> function, AtomicReference<ModelMapper> atomicReference) {
            if (list == null) {
                return;
            }
            ModelMapper apply = function.apply(list);
            apply.open();
            atomicReference.getAndSet(apply).close();
        }
    }

    public ModelStreamFileScanner(int i, int i2) {
        this.schedulerPoolSize = i;
        this.executorPoolSize = i2;
    }

    public void open() {
        this.scheduler = new ScheduledThreadPoolExecutor(this.schedulerPoolSize, (ThreadFactory) new ExecutorThreadFactory());
        this.executor = new ThreadPoolExecutor(this.executorPoolSize, this.executorPoolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(this.executorPoolSize), new ExecutorThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
    }

    public void close() {
        if (this.scheduler != null) {
            ExecutorUtils.gracefulShutdown(5L, TimeUnit.SECONDS, new ExecutorService[]{this.scheduler});
        }
        if (this.executor != null) {
            ExecutorUtils.gracefulShutdown(5L, TimeUnit.SECONDS, new ExecutorService[]{this.executor});
        }
    }

    public Iterator<Timestamp> scanToFile(ScanTask scanTask, Time time) {
        final LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        commitTask(() -> {
            CompletableFuture supplyAsync = CompletableFuture.supplyAsync(() -> {
                try {
                    return scanTask.doScanFile();
                } catch (IOException e) {
                    throw new ScanFailRuntimeException("Scan file fail.", e);
                }
            }, this.executor);
            scanTask.getClass();
            supplyAsync.thenApplyAsync(scanTask::doUpdateAndRemoveLatest, (Executor) this.executor).thenAcceptAsync(list -> {
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    try {
                        linkedBlockingQueue.put((Timestamp) it.next());
                    } catch (InterruptedException e) {
                        throw new AkUnclassifiedErrorException("Error. ", e);
                    }
                }
            }, (Executor) this.executor).exceptionally(th -> {
                if (th instanceof CancellationException) {
                    return null;
                }
                LOG.warn("Failed to scan model stream.", th);
                return null;
            });
        }, time, this.scheduler);
        return new Iterator<Timestamp>() { // from class: com.alibaba.alink.operator.common.modelstream.ModelStreamFileScanner.1
            @Override // java.util.Iterator
            public boolean hasNext() {
                return true;
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.Iterator
            public Timestamp next() {
                try {
                    return (Timestamp) linkedBlockingQueue.take();
                } catch (InterruptedException e) {
                    throw new AkUnclassifiedErrorException("Error. ", e);
                }
            }
        };
    }

    public void scanToUpdateModel(ScanTask scanTask, Time time, Function<List<Row>, ModelMapper> function, AtomicReference<ModelMapper> atomicReference) {
        commitTask(() -> {
            CompletableFuture supplyAsync = CompletableFuture.supplyAsync(() -> {
                try {
                    return scanTask.doScanFile();
                } catch (IOException e) {
                    throw new ScanFailRuntimeException("Scan file fail.", e);
                }
            }, this.executor);
            scanTask.getClass();
            supplyAsync.thenApplyAsync(scanTask::doUpdateAndRemoveLatest, (Executor) this.executor).thenApplyAsync(list -> {
                try {
                    return scanTask.doReadLatestModel(list);
                } catch (IOException e) {
                    throw new ReadModelStreamModelRuntimeException("Read model stream fail.", e);
                }
            }, (Executor) this.executor).thenAcceptAsync(list2 -> {
                scanTask.doUpdateModel(list2, function, atomicReference);
            }, (Executor) this.executor).exceptionally(th -> {
                if (th instanceof CancellationException) {
                    return null;
                }
                LOG.warn("Failed to scan model stream.", th);
                return null;
            });
        }, time, this.scheduler);
    }

    private static ScheduledFuture<?> commitTask(Runnable runnable, Time time, ScheduledExecutorService scheduledExecutorService) {
        return FutureUtils.scheduleAtFixedRate(runnable, Time.of(0L, time.getUnit()), time, scheduledExecutorService);
    }
}
