package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.util.ExceptionUtils;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.class */
public final class StreamMultipleInputProcessor implements StreamInputProcessor {
    private final MultipleInputSelectionHandler inputSelectionHandler;
    private final StreamOneInputProcessor<?>[] inputProcessors;
    private int lastReadInputIndex = 1;
    private boolean isPrepared;

    public StreamMultipleInputProcessor(MultipleInputSelectionHandler multipleInputSelectionHandler, StreamOneInputProcessor<?>[] streamOneInputProcessorArr) {
        this.inputSelectionHandler = multipleInputSelectionHandler;
        this.inputProcessors = streamOneInputProcessorArr;
    }

    public CompletableFuture<?> getAvailableFuture() {
        if (this.inputSelectionHandler.isAnyInputAvailable() || this.inputSelectionHandler.areAllInputsFinished()) {
            return AVAILABLE;
        }
        CompletableFuture<?> completableFuture = new CompletableFuture<>();
        for (int i = 0; i < this.inputProcessors.length; i++) {
            if (!this.inputSelectionHandler.isInputFinished(i) && this.inputSelectionHandler.isInputSelected(i)) {
                FutureUtils.assertNoException(this.inputProcessors[i].getAvailableFuture().thenRun(() -> {
                    completableFuture.complete(null);
                }));
            }
        }
        return completableFuture;
    }

    @Override // org.apache.flink.streaming.runtime.io.StreamInputProcessor
    public InputStatus processInput() throws Exception {
        int selectNextReadingInputIndex = this.isPrepared ? selectNextReadingInputIndex() : selectFirstReadingInputIndex();
        if (selectNextReadingInputIndex == -1) {
            return InputStatus.NOTHING_AVAILABLE;
        }
        this.lastReadInputIndex = selectNextReadingInputIndex;
        InputStatus processInput = this.inputProcessors[selectNextReadingInputIndex].processInput();
        this.inputSelectionHandler.nextSelection();
        return this.inputSelectionHandler.updateStatus(processInput, selectNextReadingInputIndex);
    }

    private int selectFirstReadingInputIndex() {
        this.inputSelectionHandler.nextSelection();
        this.isPrepared = true;
        return selectNextReadingInputIndex();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        IOException iOException = null;
        for (StreamOneInputProcessor<?> streamOneInputProcessor : this.inputProcessors) {
            try {
                streamOneInputProcessor.close();
            } catch (IOException e) {
                iOException = (IOException) ExceptionUtils.firstOrSuppressed(e, iOException);
            }
        }
        if (iOException != null) {
            throw iOException;
        }
    }

    @Override // org.apache.flink.streaming.runtime.io.StreamInputProcessor
    public CompletableFuture<Void> prepareSnapshot(ChannelStateWriter channelStateWriter, long j) throws CheckpointException {
        CompletableFuture[] completableFutureArr = new CompletableFuture[this.inputProcessors.length];
        for (int i = 0; i < completableFutureArr.length; i++) {
            completableFutureArr[i] = this.inputProcessors[i].prepareSnapshot(channelStateWriter, j);
        }
        return CompletableFuture.allOf(completableFutureArr);
    }

    private int selectNextReadingInputIndex() {
        if (!this.inputSelectionHandler.isAnyInputAvailable()) {
            fullCheckAndSetAvailable();
        }
        int selectNextInputIndex = this.inputSelectionHandler.selectNextInputIndex(this.lastReadInputIndex);
        if (selectNextInputIndex == -1) {
            return -1;
        }
        if (this.inputSelectionHandler.shouldSetAvailableForAnotherInput()) {
            fullCheckAndSetAvailable();
        }
        return selectNextInputIndex;
    }

    private void fullCheckAndSetAvailable() {
        for (int i = 0; i < this.inputProcessors.length; i++) {
            StreamOneInputProcessor<?> streamOneInputProcessor = this.inputProcessors[i];
            if (streamOneInputProcessor.isApproximatelyAvailable() || streamOneInputProcessor.isAvailable()) {
                this.inputSelectionHandler.setAvailableInput(i);
            }
        }
    }
}
