package org.apache.flink.api.connector.source.util.ratelimit;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.util.Preconditions;

@Experimental
/* loaded from: input_file:org/apache/flink/api/connector/source/util/ratelimit/RateLimitedSourceReader.class */
public class RateLimitedSourceReader<E, SplitT extends SourceSplit> implements SourceReader<E, SplitT> {
    private final SourceReader<E, SplitT> sourceReader;
    private final RateLimiter rateLimiter;
    private CompletableFuture<Void> availabilityFuture = null;

    public RateLimitedSourceReader(SourceReader<E, SplitT> sourceReader, RateLimiter rateLimiter) {
        Preconditions.checkNotNull(sourceReader);
        Preconditions.checkNotNull(rateLimiter);
        this.sourceReader = sourceReader;
        this.rateLimiter = rateLimiter;
    }

    @Override // org.apache.flink.api.connector.source.SourceReader
    public void start() {
        this.sourceReader.start();
    }

    @Override // org.apache.flink.api.connector.source.SourceReader
    public InputStatus pollNext(ReaderOutput<E> readerOutput) throws Exception {
        if (this.availabilityFuture == null) {
            return InputStatus.NOTHING_AVAILABLE;
        }
        this.availabilityFuture = null;
        InputStatus pollNext = this.sourceReader.pollNext(readerOutput);
        return pollNext == InputStatus.MORE_AVAILABLE ? InputStatus.NOTHING_AVAILABLE : pollNext;
    }

    @Override // org.apache.flink.api.connector.source.SourceReader
    public CompletableFuture<Void> isAvailable() {
        if (this.availabilityFuture == null) {
            this.availabilityFuture = this.rateLimiter.acquire().toCompletableFuture().thenCombine((CompletionStage) this.sourceReader.isAvailable(), (r2, r3) -> {
                return null;
            });
        }
        return this.availabilityFuture;
    }

    @Override // org.apache.flink.api.connector.source.SourceReader
    public void addSplits(List<SplitT> list) {
        this.sourceReader.addSplits(list);
    }

    @Override // org.apache.flink.api.connector.source.SourceReader
    public void notifyNoMoreSplits() {
        this.sourceReader.notifyNoMoreSplits();
    }

    @Override // org.apache.flink.api.connector.source.SourceReader
    public List<SplitT> snapshotState(long j) {
        return this.sourceReader.snapshotState(j);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.sourceReader.close();
    }

    @Override // org.apache.flink.api.connector.source.SourceReader, org.apache.flink.api.common.state.CheckpointListener
    public void notifyCheckpointComplete(long j) throws Exception {
        this.rateLimiter.notifyCheckpointComplete(j);
    }
}
