package org.apache.flink.runtime.scheduler;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.flink.api.common.InputDependencyConstraint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.hooks.TestMasterHook;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.LazyFromSourcesSchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.scheduler.strategy.TestSchedulingStrategy;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/DefaultSchedulerTest.class */
public class DefaultSchedulerTest extends TestLogger {
    private static final int TIMEOUT_MS = 1000;
    private static final JobID TEST_JOB_ID = new JobID();
    private ManuallyTriggeredScheduledExecutor taskRestartExecutor = new ManuallyTriggeredScheduledExecutor();
    private ExecutorService executor;
    private ScheduledExecutorService scheduledExecutorService;
    private Configuration configuration;
    private TestRestartBackoffTimeStrategy testRestartBackoffTimeStrategy;
    private TestExecutionVertexOperationsDecorator testExecutionVertexOperations;
    private ExecutionVertexVersioner executionVertexVersioner;
    private TestExecutionSlotAllocatorFactory executionSlotAllocatorFactory;
    private TestExecutionSlotAllocator testExecutionSlotAllocator;

    @Before
    public void setUp() throws Exception {
        this.executor = Executors.newSingleThreadExecutor();
        this.scheduledExecutorService = new DirectScheduledExecutorService();
        this.configuration = new Configuration();
        this.testRestartBackoffTimeStrategy = new TestRestartBackoffTimeStrategy(true, 0L);
        this.testExecutionVertexOperations = new TestExecutionVertexOperationsDecorator(new DefaultExecutionVertexOperations());
        this.executionVertexVersioner = new ExecutionVertexVersioner();
        this.executionSlotAllocatorFactory = new TestExecutionSlotAllocatorFactory();
        this.testExecutionSlotAllocator = this.executionSlotAllocatorFactory.getTestExecutionSlotAllocator();
    }

    @After
    public void tearDown() throws Exception {
        if (this.scheduledExecutorService != null) {
            ExecutorUtils.gracefulShutdown(1000L, TimeUnit.MILLISECONDS, new ExecutorService[]{this.scheduledExecutorService});
        }
        if (this.executor != null) {
            ExecutorUtils.gracefulShutdown(1000L, TimeUnit.MILLISECONDS, new ExecutorService[]{this.executor});
        }
    }

    @Test
    public void startScheduling() {
        JobGraph singleNonParallelJobVertexJobGraph = singleNonParallelJobVertexJobGraph();
        JobVertex onlyJobVertex = getOnlyJobVertex(singleNonParallelJobVertexJobGraph);
        createSchedulerAndStartScheduling(singleNonParallelJobVertexJobGraph);
        Assert.assertThat(this.testExecutionVertexOperations.getDeployedVertices(), Matchers.contains(new ExecutionVertexID[]{new ExecutionVertexID(onlyJobVertex.getID(), 0)}));
    }

    @Test
    public void scheduledVertexOrderFromSchedulingStrategyIsRespected() throws Exception {
        JobGraph singleJobVertexJobGraph = singleJobVertexJobGraph(10);
        JobVertexID id = getOnlyJobVertex(singleJobVertexJobGraph).getID();
        List<ExecutionVertexID> asList = Arrays.asList(new ExecutionVertexID(id, 4), new ExecutionVertexID(id, 0), new ExecutionVertexID(id, 3), new ExecutionVertexID(id, 1), new ExecutionVertexID(id, 2));
        TestSchedulingStrategy.Factory factory = new TestSchedulingStrategy.Factory();
        createScheduler(singleJobVertexJobGraph, factory);
        factory.getLastCreatedSchedulingStrategy().schedule(asList);
        Assert.assertEquals(asList, this.testExecutionVertexOperations.getDeployedVertices());
    }

    @Test
    public void restartAfterDeploymentFails() {
        JobGraph singleNonParallelJobVertexJobGraph = singleNonParallelJobVertexJobGraph();
        JobVertex onlyJobVertex = getOnlyJobVertex(singleNonParallelJobVertexJobGraph);
        this.testExecutionVertexOperations.enableFailDeploy();
        createSchedulerAndStartScheduling(singleNonParallelJobVertexJobGraph);
        this.testExecutionVertexOperations.disableFailDeploy();
        this.taskRestartExecutor.triggerScheduledTasks();
        List<ExecutionVertexID> deployedVertices = this.testExecutionVertexOperations.getDeployedVertices();
        ExecutionVertexID executionVertexID = new ExecutionVertexID(onlyJobVertex.getID(), 0);
        Assert.assertThat(deployedVertices, Matchers.contains(new ExecutionVertexID[]{executionVertexID, executionVertexID}));
    }

    @Test
    public void scheduleWithLazyStrategy() {
        JobGraph singleNonParallelJobVertexJobGraph = singleNonParallelJobVertexJobGraph();
        singleNonParallelJobVertexJobGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES);
        JobVertex onlyJobVertex = getOnlyJobVertex(singleNonParallelJobVertexJobGraph);
        createSchedulerAndStartScheduling(singleNonParallelJobVertexJobGraph);
        Assert.assertThat(this.testExecutionVertexOperations.getDeployedVertices(), Matchers.contains(new ExecutionVertexID[]{new ExecutionVertexID(onlyJobVertex.getID(), 0)}));
    }

    @Test
    public void restartFailedTask() {
        JobGraph singleNonParallelJobVertexJobGraph = singleNonParallelJobVertexJobGraph();
        JobVertex onlyJobVertex = getOnlyJobVertex(singleNonParallelJobVertexJobGraph);
        DefaultScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling(singleNonParallelJobVertexJobGraph);
        createSchedulerAndStartScheduling.updateTaskExecutionState(new TaskExecutionState(singleNonParallelJobVertexJobGraph.getJobID(), ((ArchivedExecutionVertex) Iterables.getOnlyElement(createSchedulerAndStartScheduling.requestJob().getAllExecutionVertices())).getCurrentExecutionAttempt().getAttemptId(), ExecutionState.FAILED));
        this.taskRestartExecutor.triggerScheduledTasks();
        List<ExecutionVertexID> deployedVertices = this.testExecutionVertexOperations.getDeployedVertices();
        ExecutionVertexID executionVertexID = new ExecutionVertexID(onlyJobVertex.getID(), 0);
        Assert.assertThat(deployedVertices, Matchers.contains(new ExecutionVertexID[]{executionVertexID, executionVertexID}));
    }

    @Test
    public void updateTaskExecutionStateReturnsFalseIfExecutionDoesNotExist() {
        JobGraph singleNonParallelJobVertexJobGraph = singleNonParallelJobVertexJobGraph();
        Assert.assertFalse(createSchedulerAndStartScheduling(singleNonParallelJobVertexJobGraph).updateTaskExecutionState(new TaskExecutionState(singleNonParallelJobVertexJobGraph.getJobID(), new ExecutionAttemptID(), ExecutionState.FAILED)));
    }

    @Test
    public void failJobIfCannotRestart() throws Exception {
        JobGraph singleNonParallelJobVertexJobGraph = singleNonParallelJobVertexJobGraph();
        this.testRestartBackoffTimeStrategy.setCanRestart(false);
        DefaultScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling(singleNonParallelJobVertexJobGraph);
        createSchedulerAndStartScheduling.updateTaskExecutionState(new TaskExecutionState(singleNonParallelJobVertexJobGraph.getJobID(), ((ArchivedExecutionVertex) Iterables.getOnlyElement(createSchedulerAndStartScheduling.requestJob().getAllExecutionVertices())).getCurrentExecutionAttempt().getAttemptId(), ExecutionState.FAILED));
        this.taskRestartExecutor.triggerScheduledTasks();
        waitForTermination(createSchedulerAndStartScheduling);
        Assert.assertThat(createSchedulerAndStartScheduling.requestJobStatus(), Matchers.is(Matchers.equalTo(JobStatus.FAILED)));
    }

    @Test
    public void failJobIfNotEnoughResources() throws Exception {
        JobGraph singleNonParallelJobVertexJobGraph = singleNonParallelJobVertexJobGraph();
        this.testRestartBackoffTimeStrategy.setCanRestart(false);
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        DefaultScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling(singleNonParallelJobVertexJobGraph);
        this.testExecutionSlotAllocator.timeoutPendingRequests();
        waitForTermination(createSchedulerAndStartScheduling);
        JobStatus requestJobStatus = createSchedulerAndStartScheduling.requestJobStatus();
        Assert.assertThat(requestJobStatus, Matchers.is(Matchers.equalTo(JobStatus.FAILED)));
        Throwable deserializeError = createSchedulerAndStartScheduling.requestJob().getFailureInfo().getException().deserializeError(DefaultSchedulerTest.class.getClassLoader());
        Assert.assertTrue(ExceptionUtils.findThrowable(deserializeError, NoResourceAvailableException.class).isPresent());
        Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(deserializeError, "Could not allocate the required slot within slot request timeout.").isPresent());
        Assert.assertThat(requestJobStatus, Matchers.is(Matchers.equalTo(JobStatus.FAILED)));
    }

    @Test
    public void skipDeploymentIfVertexVersionOutdated() {
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        JobGraph nonParallelSourceSinkJobGraph = nonParallelSourceSinkJobGraph();
        List verticesSortedTopologicallyFromSources = nonParallelSourceSinkJobGraph.getVerticesSortedTopologicallyFromSources();
        ExecutionVertexID executionVertexID = new ExecutionVertexID(((JobVertex) verticesSortedTopologicallyFromSources.get(0)).getID(), 0);
        ExecutionVertexID executionVertexID2 = new ExecutionVertexID(((JobVertex) verticesSortedTopologicallyFromSources.get(1)).getID(), 0);
        DefaultScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling(nonParallelSourceSinkJobGraph);
        this.testExecutionSlotAllocator.completePendingRequest(executionVertexID);
        createSchedulerAndStartScheduling.updateTaskExecutionState(new TaskExecutionState(nonParallelSourceSinkJobGraph.getJobID(), ((ArchivedExecutionVertex) createSchedulerAndStartScheduling.requestJob().getAllExecutionVertices().iterator().next()).getCurrentExecutionAttempt().getAttemptId(), ExecutionState.FAILED));
        this.testRestartBackoffTimeStrategy.setCanRestart(false);
        this.testExecutionSlotAllocator.enableAutoCompletePendingRequests();
        this.taskRestartExecutor.triggerScheduledTasks();
        Assert.assertThat(this.testExecutionVertexOperations.getDeployedVertices(), Matchers.containsInAnyOrder(new ExecutionVertexID[]{executionVertexID, executionVertexID2}));
        Assert.assertThat(createSchedulerAndStartScheduling.requestJob().getState(), Matchers.is(Matchers.equalTo(JobStatus.RUNNING)));
    }

    @Test
    public void releaseSlotIfVertexVersionOutdated() {
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        JobGraph singleNonParallelJobVertexJobGraph = singleNonParallelJobVertexJobGraph();
        ExecutionVertexID executionVertexID = new ExecutionVertexID(getOnlyJobVertex(singleNonParallelJobVertexJobGraph).getID(), 0);
        createSchedulerAndStartScheduling(singleNonParallelJobVertexJobGraph);
        this.executionVertexVersioner.recordModification(executionVertexID);
        this.testExecutionSlotAllocator.completePendingRequests();
        Assert.assertThat(this.testExecutionSlotAllocator.getReturnedSlots(), Matchers.hasSize(1));
    }

    @Test
    public void vertexIsResetBeforeRestarted() throws Exception {
        JobGraph singleNonParallelJobVertexJobGraph = singleNonParallelJobVertexJobGraph();
        TestSchedulingStrategy.Factory factory = new TestSchedulingStrategy.Factory();
        DefaultScheduler createScheduler = createScheduler(singleNonParallelJobVertexJobGraph, factory);
        TestSchedulingStrategy lastCreatedSchedulingStrategy = factory.getLastCreatedSchedulingStrategy();
        SchedulingTopology schedulingTopology = lastCreatedSchedulingStrategy.getSchedulingTopology();
        startScheduling(createScheduler);
        SchedulingExecutionVertex schedulingExecutionVertex = (SchedulingExecutionVertex) Iterables.getOnlyElement(schedulingTopology.getVertices());
        lastCreatedSchedulingStrategy.schedule(Collections.singletonList(schedulingExecutionVertex.getId()));
        createScheduler.updateTaskExecutionState(new TaskExecutionState(singleNonParallelJobVertexJobGraph.getJobID(), ((ArchivedExecutionVertex) Iterables.getOnlyElement(createScheduler.requestJob().getAllExecutionVertices())).getCurrentExecutionAttempt().getAttemptId(), ExecutionState.FAILED));
        this.taskRestartExecutor.triggerScheduledTasks();
        Assert.assertThat(lastCreatedSchedulingStrategy.getReceivedVerticesToRestart(), Matchers.hasSize(1));
        Assert.assertThat(schedulingExecutionVertex.getState(), Matchers.is(Matchers.equalTo(ExecutionState.CREATED)));
    }

    @Test
    public void scheduleOnlyIfVertexIsCreated() throws Exception {
        JobGraph singleNonParallelJobVertexJobGraph = singleNonParallelJobVertexJobGraph();
        TestSchedulingStrategy.Factory factory = new TestSchedulingStrategy.Factory();
        DefaultScheduler createScheduler = createScheduler(singleNonParallelJobVertexJobGraph, factory);
        TestSchedulingStrategy lastCreatedSchedulingStrategy = factory.getLastCreatedSchedulingStrategy();
        SchedulingTopology schedulingTopology = lastCreatedSchedulingStrategy.getSchedulingTopology();
        startScheduling(createScheduler);
        ExecutionVertexID id = ((SchedulingExecutionVertex) Iterables.getOnlyElement(schedulingTopology.getVertices())).getId();
        lastCreatedSchedulingStrategy.schedule(Collections.singletonList(id));
        try {
            lastCreatedSchedulingStrategy.schedule(Collections.singletonList(id));
            Assert.fail("IllegalStateException should happen");
        } catch (IllegalStateException e) {
        }
    }

    @Test
    public void handleGlobalFailure() {
        JobGraph singleNonParallelJobVertexJobGraph = singleNonParallelJobVertexJobGraph();
        JobVertex onlyJobVertex = getOnlyJobVertex(singleNonParallelJobVertexJobGraph);
        DefaultScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling(singleNonParallelJobVertexJobGraph);
        createSchedulerAndStartScheduling.handleGlobalFailure(new Exception("forced failure"));
        createSchedulerAndStartScheduling.updateTaskExecutionState(new TaskExecutionState(singleNonParallelJobVertexJobGraph.getJobID(), ((ArchivedExecutionVertex) Iterables.getOnlyElement(createSchedulerAndStartScheduling.requestJob().getAllExecutionVertices())).getCurrentExecutionAttempt().getAttemptId(), ExecutionState.CANCELED));
        this.taskRestartExecutor.triggerScheduledTasks();
        List<ExecutionVertexID> deployedVertices = this.testExecutionVertexOperations.getDeployedVertices();
        ExecutionVertexID executionVertexID = new ExecutionVertexID(onlyJobVertex.getID(), 0);
        Assert.assertThat(deployedVertices, Matchers.contains(new ExecutionVertexID[]{executionVertexID, executionVertexID}));
    }

    @Test
    public void handleGlobalFailureWithLocalFailure() {
        JobGraph singleJobVertexJobGraph = singleJobVertexJobGraph(2);
        JobVertex onlyJobVertex = getOnlyJobVertex(singleJobVertexJobGraph);
        SchedulerTestingUtils.enableCheckpointing(singleJobVertexJobGraph);
        DefaultScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling(singleJobVertexJobGraph);
        List list = (List) StreamSupport.stream(createSchedulerAndStartScheduling.requestJob().getAllExecutionVertices().spliterator(), false).map((v0) -> {
            return v0.getCurrentExecutionAttempt();
        }).map((v0) -> {
            return v0.getAttemptId();
        }).collect(Collectors.toList());
        ExecutionAttemptID executionAttemptID = (ExecutionAttemptID) list.get(0);
        createSchedulerAndStartScheduling.handleGlobalFailure(new Exception("global failure"));
        createSchedulerAndStartScheduling.updateTaskExecutionState(new TaskExecutionState(singleJobVertexJobGraph.getJobID(), executionAttemptID, ExecutionState.FAILED, new Exception("local failure")));
        Iterator it = list.iterator();
        while (it.hasNext()) {
            createSchedulerAndStartScheduling.updateTaskExecutionState(new TaskExecutionState(singleJobVertexJobGraph.getJobID(), (ExecutionAttemptID) it.next(), ExecutionState.CANCELED));
        }
        this.taskRestartExecutor.triggerScheduledTasks();
        ExecutionVertexID executionVertexID = new ExecutionVertexID(onlyJobVertex.getID(), 0);
        ExecutionVertexID executionVertexID2 = new ExecutionVertexID(onlyJobVertex.getID(), 1);
        Assert.assertThat("The execution vertices should be deployed in a specific order reflecting the scheduling start and the global fail-over afterwards.", this.testExecutionVertexOperations.getDeployedVertices(), Matchers.contains(new ExecutionVertexID[]{executionVertexID, executionVertexID2, executionVertexID, executionVertexID2}));
    }

    @Test
    public void testStartingCheckpointSchedulerAfterExecutionGraphFinished() {
        assertCheckpointSchedulingOperationHavingNoEffectAfterJobFinished((v0) -> {
            v0.startCheckpointScheduler();
        });
    }

    @Test
    public void testStoppingCheckpointSchedulerAfterExecutionGraphFinished() {
        assertCheckpointSchedulingOperationHavingNoEffectAfterJobFinished((v0) -> {
            v0.stopCheckpointScheduler();
        });
    }

    private void assertCheckpointSchedulingOperationHavingNoEffectAfterJobFinished(Consumer<DefaultScheduler> consumer) {
        JobGraph singleNonParallelJobVertexJobGraph = singleNonParallelJobVertexJobGraph();
        SchedulerTestingUtils.enableCheckpointing(singleNonParallelJobVertexJobGraph);
        DefaultScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling(singleNonParallelJobVertexJobGraph);
        Assert.assertThat(createSchedulerAndStartScheduling.getCheckpointCoordinator(), Matchers.is(Matchers.notNullValue()));
        createSchedulerAndStartScheduling.updateTaskExecutionState(new TaskExecutionState(singleNonParallelJobVertexJobGraph.getJobID(), ((ExecutionVertex) Iterables.getOnlyElement(createSchedulerAndStartScheduling.getExecutionGraph().getAllExecutionVertices())).getCurrentExecutionAttempt().getAttemptId(), ExecutionState.FINISHED));
        Assert.assertThat(createSchedulerAndStartScheduling.getCheckpointCoordinator(), Matchers.is(Matchers.nullValue()));
        consumer.accept(createSchedulerAndStartScheduling);
        Assert.assertThat(createSchedulerAndStartScheduling.getCheckpointCoordinator(), Matchers.is(Matchers.nullValue()));
    }

    @Test
    public void vertexIsNotAffectedByOutdatedDeployment() {
        JobGraph singleJobVertexJobGraph = singleJobVertexJobGraph(2);
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        DefaultScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling(singleJobVertexJobGraph);
        Iterator it = createSchedulerAndStartScheduling.requestJob().getAllExecutionVertices().iterator();
        ArchivedExecutionVertex archivedExecutionVertex = (ArchivedExecutionVertex) it.next();
        ArchivedExecutionVertex archivedExecutionVertex2 = (ArchivedExecutionVertex) it.next();
        SchedulingExecutionVertex schedulingExecutionVertex = (SchedulingExecutionVertex) createSchedulerAndStartScheduling.getSchedulingTopology().getVertices().iterator().next();
        createSchedulerAndStartScheduling.updateTaskExecutionState(new TaskExecutionState(singleJobVertexJobGraph.getJobID(), archivedExecutionVertex.getCurrentExecutionAttempt().getAttemptId(), ExecutionState.FAILED));
        this.taskRestartExecutor.triggerScheduledTasks();
        createSchedulerAndStartScheduling.updateTaskExecutionState(new TaskExecutionState(singleJobVertexJobGraph.getJobID(), archivedExecutionVertex2.getCurrentExecutionAttempt().getAttemptId(), ExecutionState.FAILED));
        Assert.assertThat(schedulingExecutionVertex.getState(), Matchers.is(Matchers.equalTo(ExecutionState.SCHEDULED)));
    }

    @Test
    public void abortPendingCheckpointsWhenRestartingTasks() throws Exception {
        JobGraph singleNonParallelJobVertexJobGraph = singleNonParallelJobVertexJobGraph();
        SchedulerTestingUtils.enableCheckpointing(singleNonParallelJobVertexJobGraph);
        CountDownLatch checkpointTriggeredLatch = getCheckpointTriggeredLatch();
        DefaultScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling(singleNonParallelJobVertexJobGraph);
        ExecutionAttemptID attemptId = ((ArchivedExecutionVertex) Iterables.getOnlyElement(createSchedulerAndStartScheduling.requestJob().getAllExecutionVertices())).getCurrentExecutionAttempt().getAttemptId();
        createSchedulerAndStartScheduling.updateTaskExecutionState(new TaskExecutionState(singleNonParallelJobVertexJobGraph.getJobID(), attemptId, ExecutionState.RUNNING));
        CheckpointCoordinator checkpointCoordinator = SchedulerTestingUtils.getCheckpointCoordinator(createSchedulerAndStartScheduling);
        checkpointCoordinator.triggerCheckpoint(false);
        checkpointTriggeredLatch.await();
        Assert.assertThat(Integer.valueOf(checkpointCoordinator.getNumberOfPendingCheckpoints()), Matchers.is(Matchers.equalTo(1)));
        createSchedulerAndStartScheduling.updateTaskExecutionState(new TaskExecutionState(singleNonParallelJobVertexJobGraph.getJobID(), attemptId, ExecutionState.FAILED));
        this.taskRestartExecutor.triggerScheduledTasks();
        Assert.assertThat(Integer.valueOf(checkpointCoordinator.getNumberOfPendingCheckpoints()), Matchers.is(Matchers.equalTo(0)));
    }

    @Test
    public void restoreStateWhenRestartingTasks() throws Exception {
        JobGraph singleNonParallelJobVertexJobGraph = singleNonParallelJobVertexJobGraph();
        SchedulerTestingUtils.enableCheckpointing(singleNonParallelJobVertexJobGraph);
        CountDownLatch checkpointTriggeredLatch = getCheckpointTriggeredLatch();
        DefaultScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling(singleNonParallelJobVertexJobGraph);
        ExecutionAttemptID attemptId = ((ArchivedExecutionVertex) Iterables.getOnlyElement(createSchedulerAndStartScheduling.requestJob().getAllExecutionVertices())).getCurrentExecutionAttempt().getAttemptId();
        createSchedulerAndStartScheduling.updateTaskExecutionState(new TaskExecutionState(singleNonParallelJobVertexJobGraph.getJobID(), attemptId, ExecutionState.RUNNING));
        CheckpointCoordinator checkpointCoordinator = SchedulerTestingUtils.getCheckpointCoordinator(createSchedulerAndStartScheduling);
        TestMasterHook fromId = TestMasterHook.fromId("testHook");
        checkpointCoordinator.addMasterHook(fromId);
        checkpointCoordinator.triggerCheckpoint(false);
        checkpointTriggeredLatch.await();
        SchedulerTestingUtils.acknowledgePendingCheckpoint(createSchedulerAndStartScheduling, ((Long) checkpointCoordinator.getPendingCheckpoints().keySet().iterator().next()).longValue());
        createSchedulerAndStartScheduling.updateTaskExecutionState(new TaskExecutionState(singleNonParallelJobVertexJobGraph.getJobID(), attemptId, ExecutionState.FAILED));
        this.taskRestartExecutor.triggerScheduledTasks();
        Assert.assertThat(Integer.valueOf(fromId.getRestoreCount()), Matchers.is(Matchers.equalTo(1)));
    }

    @Test
    public void failGlobalWhenRestoringStateFails() throws Exception {
        JobGraph singleNonParallelJobVertexJobGraph = singleNonParallelJobVertexJobGraph();
        JobVertex onlyJobVertex = getOnlyJobVertex(singleNonParallelJobVertexJobGraph);
        SchedulerTestingUtils.enableCheckpointing(singleNonParallelJobVertexJobGraph);
        CountDownLatch checkpointTriggeredLatch = getCheckpointTriggeredLatch();
        DefaultScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling(singleNonParallelJobVertexJobGraph);
        ExecutionAttemptID attemptId = ((ArchivedExecutionVertex) Iterables.getOnlyElement(createSchedulerAndStartScheduling.requestJob().getAllExecutionVertices())).getCurrentExecutionAttempt().getAttemptId();
        createSchedulerAndStartScheduling.updateTaskExecutionState(new TaskExecutionState(singleNonParallelJobVertexJobGraph.getJobID(), attemptId, ExecutionState.RUNNING));
        CheckpointCoordinator checkpointCoordinator = SchedulerTestingUtils.getCheckpointCoordinator(createSchedulerAndStartScheduling);
        TestMasterHook fromId = TestMasterHook.fromId("testHook");
        fromId.enableFailOnRestore();
        checkpointCoordinator.addMasterHook(fromId);
        checkpointCoordinator.triggerCheckpoint(false);
        checkpointTriggeredLatch.await();
        SchedulerTestingUtils.acknowledgePendingCheckpoint(createSchedulerAndStartScheduling, ((Long) checkpointCoordinator.getPendingCheckpoints().keySet().iterator().next()).longValue());
        createSchedulerAndStartScheduling.updateTaskExecutionState(new TaskExecutionState(singleNonParallelJobVertexJobGraph.getJobID(), attemptId, ExecutionState.FAILED));
        this.taskRestartExecutor.triggerScheduledTasks();
        List<ExecutionVertexID> deployedVertices = this.testExecutionVertexOperations.getDeployedVertices();
        ExecutionVertexID executionVertexID = new ExecutionVertexID(onlyJobVertex.getID(), 0);
        Assert.assertThat(deployedVertices, Matchers.contains(new ExecutionVertexID[]{executionVertexID}));
        fromId.disableFailOnRestore();
        this.taskRestartExecutor.triggerScheduledTasks();
        Assert.assertThat(deployedVertices, Matchers.contains(new ExecutionVertexID[]{executionVertexID, executionVertexID}));
    }

    @Test
    public void testInputConstraintALLPerf() throws Exception {
        JobVertex createVertexWithAllInputConstraints = createVertexWithAllInputConstraints("vertex1", TIMEOUT_MS);
        JobVertex createVertexWithAllInputConstraints2 = createVertexWithAllInputConstraints("vertex2", TIMEOUT_MS);
        JobVertex createVertexWithAllInputConstraints3 = createVertexWithAllInputConstraints("vertex3", TIMEOUT_MS);
        createVertexWithAllInputConstraints2.connectNewDataSetAsInput(createVertexWithAllInputConstraints, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        createVertexWithAllInputConstraints2.connectNewDataSetAsInput(createVertexWithAllInputConstraints3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        DefaultScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling(new JobGraph(new JobVertex[]{createVertexWithAllInputConstraints, createVertexWithAllInputConstraints2, createVertexWithAllInputConstraints3}));
        AccessExecutionJobVertex accessExecutionJobVertex = (AccessExecutionJobVertex) createSchedulerAndStartScheduling.requestJob().getAllVertices().get(createVertexWithAllInputConstraints.getID());
        for (int i = 0; i < 999; i++) {
            finishSubtask(createSchedulerAndStartScheduling, accessExecutionJobVertex, i);
        }
        long nanoTime = System.nanoTime();
        finishSubtask(createSchedulerAndStartScheduling, accessExecutionJobVertex, 999);
        Assert.assertThat(Duration.ofNanos(System.nanoTime() - nanoTime), Matchers.lessThan(Duration.ofSeconds(5L)));
    }

    @Test
    public void failJobWillIncrementVertexVersions() {
        JobGraph singleNonParallelJobVertexJobGraph = singleNonParallelJobVertexJobGraph();
        ExecutionVertexID executionVertexID = new ExecutionVertexID(getOnlyJobVertex(singleNonParallelJobVertexJobGraph).getID(), 0);
        DefaultScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling(singleNonParallelJobVertexJobGraph);
        ExecutionVertexVersion executionVertexVersion = this.executionVertexVersioner.getExecutionVertexVersion(executionVertexID);
        createSchedulerAndStartScheduling.failJob(new FlinkException("Test failure."));
        Assert.assertTrue(this.executionVertexVersioner.isModified(executionVertexVersion));
    }

    @Test
    public void cancelJobWillIncrementVertexVersions() {
        JobGraph singleNonParallelJobVertexJobGraph = singleNonParallelJobVertexJobGraph();
        ExecutionVertexID executionVertexID = new ExecutionVertexID(getOnlyJobVertex(singleNonParallelJobVertexJobGraph).getID(), 0);
        DefaultScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling(singleNonParallelJobVertexJobGraph);
        ExecutionVertexVersion executionVertexVersion = this.executionVertexVersioner.getExecutionVertexVersion(executionVertexID);
        createSchedulerAndStartScheduling.cancel();
        Assert.assertTrue(this.executionVertexVersioner.isModified(executionVertexVersion));
    }

    @Test
    public void suspendJobWillIncrementVertexVersions() {
        JobGraph singleNonParallelJobVertexJobGraph = singleNonParallelJobVertexJobGraph();
        ExecutionVertexID executionVertexID = new ExecutionVertexID(getOnlyJobVertex(singleNonParallelJobVertexJobGraph).getID(), 0);
        DefaultScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling(singleNonParallelJobVertexJobGraph);
        ExecutionVertexVersion executionVertexVersion = this.executionVertexVersioner.getExecutionVertexVersion(executionVertexID);
        createSchedulerAndStartScheduling.suspend(new Exception("forced suspend"));
        Assert.assertTrue(this.executionVertexVersioner.isModified(executionVertexVersion));
    }

    @Test
    public void jobStatusIsRestartingIfOneVertexIsWaitingForRestart() {
        JobGraph singleJobVertexJobGraph = singleJobVertexJobGraph(2);
        JobID jobID = singleJobVertexJobGraph.getJobID();
        DefaultScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling(singleJobVertexJobGraph);
        Iterator it = createSchedulerAndStartScheduling.requestJob().getAllExecutionVertices().iterator();
        ExecutionAttemptID attemptId = ((ArchivedExecutionVertex) it.next()).getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptId2 = ((ArchivedExecutionVertex) it.next()).getCurrentExecutionAttempt().getAttemptId();
        createSchedulerAndStartScheduling.updateTaskExecutionState(new TaskExecutionState(jobID, attemptId, ExecutionState.FAILED, new RuntimeException("expected")));
        JobStatus requestJobStatus = createSchedulerAndStartScheduling.requestJobStatus();
        createSchedulerAndStartScheduling.updateTaskExecutionState(new TaskExecutionState(jobID, attemptId2, ExecutionState.FAILED, new RuntimeException("expected")));
        this.taskRestartExecutor.triggerNonPeriodicScheduledTask();
        JobStatus requestJobStatus2 = createSchedulerAndStartScheduling.requestJobStatus();
        this.taskRestartExecutor.triggerNonPeriodicScheduledTask();
        JobStatus requestJobStatus3 = createSchedulerAndStartScheduling.requestJobStatus();
        Assert.assertThat(requestJobStatus, Matchers.equalTo(JobStatus.RESTARTING));
        Assert.assertThat(requestJobStatus2, Matchers.equalTo(JobStatus.RESTARTING));
        Assert.assertThat(requestJobStatus3, Matchers.equalTo(JobStatus.RUNNING));
    }

    @Test
    public void cancelWhileRestartingShouldWaitForRunningTasks() {
        JobGraph singleJobVertexJobGraph = singleJobVertexJobGraph(2);
        JobID jobID = singleJobVertexJobGraph.getJobID();
        DefaultScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling(singleJobVertexJobGraph);
        SchedulingTopology schedulingTopology = createSchedulerAndStartScheduling.getSchedulingTopology();
        Iterator it = createSchedulerAndStartScheduling.requestJob().getAllExecutionVertices().iterator();
        ExecutionAttemptID attemptId = ((ArchivedExecutionVertex) it.next()).getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptId2 = ((ArchivedExecutionVertex) it.next()).getCurrentExecutionAttempt().getAttemptId();
        ExecutionVertexID executionVertexIdOrThrow = createSchedulerAndStartScheduling.getExecutionVertexIdOrThrow(attemptId2);
        createSchedulerAndStartScheduling.updateTaskExecutionState(new TaskExecutionState(jobID, attemptId, ExecutionState.FAILED, new RuntimeException("expected")));
        createSchedulerAndStartScheduling.cancel();
        ExecutionState state = schedulingTopology.getVertex(executionVertexIdOrThrow).getState();
        JobStatus requestJobStatus = createSchedulerAndStartScheduling.requestJobStatus();
        createSchedulerAndStartScheduling.updateTaskExecutionState(new TaskExecutionState(jobID, attemptId2, ExecutionState.CANCELED, new RuntimeException("expected")));
        Assert.assertThat(state, Matchers.is(Matchers.equalTo(ExecutionState.CANCELING)));
        Assert.assertThat(requestJobStatus, Matchers.is(Matchers.equalTo(JobStatus.CANCELLING)));
        Assert.assertThat(createSchedulerAndStartScheduling.requestJobStatus(), Matchers.is(Matchers.equalTo(JobStatus.CANCELED)));
    }

    @Test
    public void failureInfoIsSetAfterTaskFailure() {
        JobGraph singleNonParallelJobVertexJobGraph = singleNonParallelJobVertexJobGraph();
        JobID jobID = singleNonParallelJobVertexJobGraph.getJobID();
        DefaultScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling(singleNonParallelJobVertexJobGraph);
        createSchedulerAndStartScheduling.updateTaskExecutionState(new TaskExecutionState(jobID, ((ArchivedExecutionVertex) Iterables.getOnlyElement(createSchedulerAndStartScheduling.requestJob().getAllExecutionVertices())).getCurrentExecutionAttempt().getAttemptId(), ExecutionState.FAILED, new RuntimeException("expected exception")));
        ErrorInfo failureInfo = createSchedulerAndStartScheduling.requestJob().getFailureInfo();
        Assert.assertThat(failureInfo, Matchers.is(Matchers.notNullValue()));
        Assert.assertThat(failureInfo.getExceptionAsString(), Matchers.containsString("expected exception"));
    }

    @Test
    public void coLocationConstraintIsResetOnTaskRecovery() {
        JobGraph nonParallelSourceSinkJobGraph = nonParallelSourceSinkJobGraph();
        JobVertex jobVertex = (JobVertex) nonParallelSourceSinkJobGraph.getVerticesSortedTopologicallyFromSources().get(0);
        JobVertex jobVertex2 = (JobVertex) nonParallelSourceSinkJobGraph.getVerticesSortedTopologicallyFromSources().get(1);
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        jobVertex.setSlotSharingGroup(slotSharingGroup);
        jobVertex2.setSlotSharingGroup(slotSharingGroup);
        jobVertex2.setStrictlyCoLocatedWith(jobVertex);
        JobID jobID = nonParallelSourceSinkJobGraph.getJobID();
        DefaultScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling(nonParallelSourceSinkJobGraph);
        ExecutionVertex executionVertex = createSchedulerAndStartScheduling.getExecutionVertex(new ExecutionVertexID(jobVertex.getID(), 0));
        ExecutionAttemptID attemptId = executionVertex.getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptId2 = createSchedulerAndStartScheduling.getExecutionVertex(new ExecutionVertexID(jobVertex2.getID(), 0)).getCurrentExecutionAttempt().getAttemptId();
        executionVertex.getLocationConstraint().setSlotRequestId(new SlotRequestId());
        Assert.assertThat(executionVertex.getLocationConstraint().getSlotRequestId(), Matchers.is(Matchers.notNullValue()));
        createSchedulerAndStartScheduling.updateTaskExecutionState(new TaskExecutionState(jobID, attemptId, ExecutionState.FAILED, new RuntimeException("expected exception")));
        createSchedulerAndStartScheduling.updateTaskExecutionState(new TaskExecutionState(jobID, attemptId2, ExecutionState.CANCELED));
        this.taskRestartExecutor.triggerScheduledTasks();
        Assert.assertThat(executionVertex.getLocationConstraint().getSlotRequestId(), Matchers.is(Matchers.nullValue()));
    }

    private static JobVertex createVertexWithAllInputConstraints(String str, int i) {
        JobVertex jobVertex = new JobVertex(str);
        jobVertex.setParallelism(i);
        jobVertex.setInvokableClass(AbstractInvokable.class);
        jobVertex.setInputDependencyConstraint(InputDependencyConstraint.ALL);
        return jobVertex;
    }

    private static void finishSubtask(DefaultScheduler defaultScheduler, AccessExecutionJobVertex accessExecutionJobVertex, int i) {
        defaultScheduler.updateTaskExecutionState(new TaskExecutionState(defaultScheduler.getJobGraph().getJobID(), accessExecutionJobVertex.getTaskVertices()[i].getCurrentExecutionAttempt().getAttemptId(), ExecutionState.FINISHED));
    }

    private void waitForTermination(DefaultScheduler defaultScheduler) throws Exception {
        defaultScheduler.getTerminationFuture().get(1000L, TimeUnit.MILLISECONDS);
    }

    private static JobGraph singleNonParallelJobVertexJobGraph() {
        return singleJobVertexJobGraph(1);
    }

    private static JobGraph singleJobVertexJobGraph(int i) {
        JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "Testjob");
        jobGraph.setScheduleMode(ScheduleMode.EAGER);
        JobVertex jobVertex = new JobVertex("source");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(i);
        jobGraph.addVertex(jobVertex);
        return jobGraph;
    }

    private static JobGraph nonParallelSourceSinkJobGraph() {
        JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "Testjob");
        jobGraph.setScheduleMode(ScheduleMode.EAGER);
        JobVertex jobVertex = new JobVertex("source");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobGraph.addVertex(jobVertex);
        JobVertex jobVertex2 = new JobVertex("sink");
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        jobGraph.addVertex(jobVertex2);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        return jobGraph;
    }

    private static JobVertex getOnlyJobVertex(JobGraph jobGraph) {
        List verticesSortedTopologicallyFromSources = jobGraph.getVerticesSortedTopologicallyFromSources();
        Preconditions.checkState(verticesSortedTopologicallyFromSources.size() == 1);
        return (JobVertex) verticesSortedTopologicallyFromSources.get(0);
    }

    private DefaultScheduler createSchedulerAndStartScheduling(JobGraph jobGraph) {
        try {
            DefaultScheduler createScheduler = createScheduler(jobGraph, jobGraph.getScheduleMode() == ScheduleMode.LAZY_FROM_SOURCES ? new LazyFromSourcesSchedulingStrategy.Factory() : new EagerSchedulingStrategy.Factory());
            startScheduling(createScheduler);
            return createScheduler;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private DefaultScheduler createScheduler(JobGraph jobGraph, SchedulingStrategyFactory schedulingStrategyFactory) throws Exception {
        return SchedulerTestingUtils.newSchedulerBuilder(jobGraph).setLogger(this.log).setIoExecutor(this.executor).setJobMasterConfiguration(this.configuration).setFutureExecutor(this.scheduledExecutorService).setDelayExecutor(this.taskRestartExecutor).setSchedulingStrategyFactory(schedulingStrategyFactory).setRestartBackoffTimeStrategy(this.testRestartBackoffTimeStrategy).setExecutionVertexOperations(this.testExecutionVertexOperations).setExecutionVertexVersioner(this.executionVertexVersioner).setExecutionSlotAllocatorFactory(this.executionSlotAllocatorFactory).build();
    }

    private void startScheduling(SchedulerNG schedulerNG) {
        schedulerNG.setMainThreadExecutor(ComponentMainThreadExecutorServiceAdapter.forMainThread());
        schedulerNG.startScheduling();
    }

    private CountDownLatch getCheckpointTriggeredLatch() {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        SimpleAckingTaskManagerGateway simpleAckingTaskManagerGateway = new SimpleAckingTaskManagerGateway();
        this.testExecutionSlotAllocator.getLogicalSlotBuilder().setTaskManagerGateway(simpleAckingTaskManagerGateway);
        simpleAckingTaskManagerGateway.setCheckpointConsumer((executionAttemptID, jobID, j, j2, checkpointOptions) -> {
            countDownLatch.countDown();
        });
        return countDownLatch;
    }
}
