package org.apache.flink.core.testutils;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;

/* loaded from: input_file:org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.class */
public class ManuallyTriggeredScheduledExecutorService implements ScheduledExecutorService {
    private final ArrayDeque<Runnable> queuedRunnables = new ArrayDeque<>();
    private final ConcurrentLinkedQueue<ScheduledTask<?>> nonPeriodicScheduledTasks = new ConcurrentLinkedQueue<>();
    private final ConcurrentLinkedQueue<ScheduledTask<?>> periodicScheduledTasks = new ConcurrentLinkedQueue<>();
    private boolean shutdown;

    @Override // java.util.concurrent.Executor
    public void execute(@Nonnull Runnable runnable) {
        synchronized (this.queuedRunnables) {
            this.queuedRunnables.addLast(runnable);
        }
    }

    @Override // java.util.concurrent.ScheduledExecutorService
    public ScheduledFuture<?> schedule(Runnable runnable, long j, TimeUnit timeUnit) {
        return insertNonPeriodicTask(runnable, j, timeUnit);
    }

    @Override // java.util.concurrent.ScheduledExecutorService
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long j, TimeUnit timeUnit) {
        return insertNonPeriodicTask(callable, j, timeUnit);
    }

    @Override // java.util.concurrent.ScheduledExecutorService
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable runnable, long j, long j2, TimeUnit timeUnit) {
        return insertPeriodicRunnable(runnable, j, j2, timeUnit);
    }

    @Override // java.util.concurrent.ScheduledExecutorService
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable runnable, long j, long j2, TimeUnit timeUnit) {
        return insertPeriodicRunnable(runnable, j, j2, timeUnit);
    }

    @Override // java.util.concurrent.ExecutorService
    public void shutdown() {
        this.shutdown = true;
    }

    @Override // java.util.concurrent.ExecutorService
    public List<Runnable> shutdownNow() {
        shutdown();
        return Collections.emptyList();
    }

    @Override // java.util.concurrent.ExecutorService
    public boolean isShutdown() {
        return false;
    }

    @Override // java.util.concurrent.ExecutorService
    public boolean isTerminated() {
        return this.shutdown;
    }

    @Override // java.util.concurrent.ExecutorService
    public boolean awaitTermination(long j, TimeUnit timeUnit) {
        return true;
    }

    @Override // java.util.concurrent.ExecutorService
    public <T> Future<T> submit(Callable<T> callable) {
        throw new UnsupportedOperationException();
    }

    @Override // java.util.concurrent.ExecutorService
    public <T> Future<T> submit(Runnable runnable, T t) {
        throw new UnsupportedOperationException();
    }

    @Override // java.util.concurrent.ExecutorService
    public Future<?> submit(Runnable runnable) {
        throw new UnsupportedOperationException();
    }

    @Override // java.util.concurrent.ExecutorService
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> collection) {
        throw new UnsupportedOperationException();
    }

    @Override // java.util.concurrent.ExecutorService
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> collection, long j, TimeUnit timeUnit) {
        throw new UnsupportedOperationException();
    }

    @Override // java.util.concurrent.ExecutorService
    public <T> T invokeAny(Collection<? extends Callable<T>> collection) {
        throw new UnsupportedOperationException();
    }

    @Override // java.util.concurrent.ExecutorService
    public <T> T invokeAny(Collection<? extends Callable<T>> collection, long j, TimeUnit timeUnit) {
        throw new UnsupportedOperationException();
    }

    public void triggerAllNonPeriodicTasks() {
        while (true) {
            if (numQueuedRunnables() <= 0 && this.nonPeriodicScheduledTasks.isEmpty()) {
                return;
            }
            triggerAll();
            triggerNonPeriodicScheduledTasks();
        }
    }

    public void triggerAll() {
        while (numQueuedRunnables() > 0) {
            trigger();
        }
    }

    public void trigger() {
        Runnable removeFirst;
        synchronized (this.queuedRunnables) {
            removeFirst = this.queuedRunnables.removeFirst();
        }
        removeFirst.run();
    }

    public int numQueuedRunnables() {
        int size;
        synchronized (this.queuedRunnables) {
            size = this.queuedRunnables.size();
        }
        return size;
    }

    public Collection<ScheduledFuture<?>> getScheduledTasks() {
        ArrayList arrayList = new ArrayList(this.nonPeriodicScheduledTasks.size() + this.periodicScheduledTasks.size());
        arrayList.addAll(getNonPeriodicScheduledTask());
        arrayList.addAll(getPeriodicScheduledTask());
        return arrayList;
    }

    public Collection<ScheduledFuture<?>> getPeriodicScheduledTask() {
        return (Collection) this.periodicScheduledTasks.stream().filter(scheduledTask -> {
            return !scheduledTask.isCancelled();
        }).collect(Collectors.toList());
    }

    public Collection<ScheduledFuture<?>> getNonPeriodicScheduledTask() {
        return (Collection) this.nonPeriodicScheduledTasks.stream().filter(scheduledTask -> {
            return !scheduledTask.isCancelled();
        }).collect(Collectors.toList());
    }

    public void triggerScheduledTasks() {
        triggerPeriodicScheduledTasks();
        triggerNonPeriodicScheduledTasks();
    }

    public void triggerNonPeriodicScheduledTask() {
        ScheduledTask<?> remove = this.nonPeriodicScheduledTasks.remove();
        if (remove != null) {
            remove.execute();
        }
    }

    public void triggerNonPeriodicScheduledTasksWithRecursion() {
        while (!this.nonPeriodicScheduledTasks.isEmpty()) {
            ScheduledTask<?> poll = this.nonPeriodicScheduledTasks.poll();
            if (!poll.isCancelled()) {
                poll.execute();
            }
        }
    }

    public void triggerNonPeriodicScheduledTasks() {
        Iterator<ScheduledTask<?>> it = this.nonPeriodicScheduledTasks.iterator();
        while (it.hasNext()) {
            ScheduledTask<?> next = it.next();
            if (!next.isCancelled()) {
                next.execute();
            }
            it.remove();
        }
    }

    public void triggerPeriodicScheduledTasks() {
        Iterator<ScheduledTask<?>> it = this.periodicScheduledTasks.iterator();
        while (it.hasNext()) {
            ScheduledTask<?> next = it.next();
            if (!next.isCancelled()) {
                next.execute();
            }
        }
    }

    private ScheduledFuture<?> insertPeriodicRunnable(Runnable runnable, long j, long j2, TimeUnit timeUnit) {
        ScheduledTask<?> scheduledTask = new ScheduledTask<>(() -> {
            runnable.run();
            return null;
        }, timeUnit.convert(j, TimeUnit.MILLISECONDS), timeUnit.convert(j2, TimeUnit.MILLISECONDS));
        this.periodicScheduledTasks.offer(scheduledTask);
        return scheduledTask;
    }

    private ScheduledFuture<?> insertNonPeriodicTask(Runnable runnable, long j, TimeUnit timeUnit) {
        return insertNonPeriodicTask(() -> {
            runnable.run();
            return null;
        }, j, timeUnit);
    }

    private <V> ScheduledFuture<V> insertNonPeriodicTask(Callable<V> callable, long j, TimeUnit timeUnit) {
        ScheduledTask<?> scheduledTask = new ScheduledTask<>(callable, timeUnit.convert(j, TimeUnit.MILLISECONDS));
        this.nonPeriodicScheduledTasks.offer(scheduledTask);
        return scheduledTask;
    }
}
