package org.apache.flink.runtime.checkpoint;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.class */
public class CheckpointCoordinatorTriggeringTest extends TestLogger {
    private static final String TASK_MANAGER_LOCATION_INFO = "Unknown location";
    private ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor;

    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest$TestingMasterHook.class */
    private static class TestingMasterHook implements MasterTriggerRestoreHook<String> {
        private final SimpleVersionedSerializer<String> serializer;
        private final CompletableFuture<String> checkpointFuture;
        private final OneShotLatch triggerCheckpointLatch;

        private TestingMasterHook(CompletableFuture<String> completableFuture) {
            this(completableFuture, new OneShotLatch());
        }

        private TestingMasterHook(CompletableFuture<String> completableFuture, OneShotLatch oneShotLatch) {
            this.serializer = new CheckpointCoordinatorTestingUtils.StringSerializer();
            this.checkpointFuture = completableFuture;
            this.triggerCheckpointLatch = oneShotLatch;
        }

        public String getIdentifier() {
            return "testing master hook";
        }

        @Nullable
        public CompletableFuture<String> triggerCheckpoint(long j, long j2, Executor executor) {
            this.triggerCheckpointLatch.trigger();
            return this.checkpointFuture;
        }

        public void restoreCheckpoint(long j, @Nullable String str) {
        }

        @Nullable
        public SimpleVersionedSerializer<String> createCheckpointDataSerializer() {
            return this.serializer;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest$UnstableCheckpointIDCounter.class */
    private static class UnstableCheckpointIDCounter implements CheckpointIDCounter {
        private final Predicate<Long> checkpointFailurePredicate;
        private long id = 0;

        public UnstableCheckpointIDCounter(Predicate<Long> predicate) {
            this.checkpointFailurePredicate = (Predicate) Preconditions.checkNotNull(predicate);
        }

        public void start() {
        }

        public void shutdown(JobStatus jobStatus) throws Exception {
        }

        public long getAndIncrement() {
            Predicate<Long> predicate = this.checkpointFailurePredicate;
            long j = this.id;
            this.id = j + 1;
            if (predicate.test(Long.valueOf(j))) {
                throw new RuntimeException("CheckpointIDCounter#getAndIncrement fails by design");
            }
            return this.id;
        }

        public long get() {
            return this.id;
        }

        public void setCount(long j) {
        }
    }

    @Before
    public void setUp() throws Exception {
        this.manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
    }

    @Test
    public void testPeriodicTriggering() {
        try {
            JobID jobID = new JobID();
            final long currentTimeMillis = System.currentTimeMillis();
            ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID3 = new ExecutionAttemptID();
            ExecutionVertex mockExecutionVertex = CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID);
            ExecutionVertex mockExecutionVertex2 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID2);
            ExecutionVertex mockExecutionVertex3 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID3);
            final AtomicInteger atomicInteger = new AtomicInteger();
            ((Execution) Mockito.doAnswer(new Answer<Void>() { // from class: org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTriggeringTest.1
                private long lastId = -1;
                private long lastTs = -1;

                /* renamed from: answer, reason: merged with bridge method [inline-methods] */
                public Void m17answer(InvocationOnMock invocationOnMock) throws Throwable {
                    long longValue = ((Long) invocationOnMock.getArguments()[0]).longValue();
                    long longValue2 = ((Long) invocationOnMock.getArguments()[1]).longValue();
                    Assert.assertTrue(longValue > this.lastId);
                    Assert.assertTrue(longValue2 >= this.lastTs);
                    Assert.assertTrue(longValue2 >= currentTimeMillis);
                    this.lastId = longValue;
                    this.lastTs = longValue2;
                    atomicInteger.incrementAndGet();
                    return null;
                }
            }).when(mockExecutionVertex.getCurrentExecutionAttempt())).triggerCheckpoint(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), (CheckpointOptions) ArgumentMatchers.any(CheckpointOptions.class));
            CheckpointCoordinator build = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setJobId(jobID).setCheckpointCoordinatorConfiguration(new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setCheckpointInterval(10L).setCheckpointTimeout(200000L).setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build()).setTasksToTrigger(new ExecutionVertex[]{mockExecutionVertex}).setTasksToWaitFor(new ExecutionVertex[]{mockExecutionVertex2}).setTasksToCommitTo(new ExecutionVertex[]{mockExecutionVertex3}).setCompletedCheckpointStore(new StandaloneCompletedCheckpointStore(2)).setTimer(this.manuallyTriggeredScheduledExecutor).build();
            build.startCheckpointScheduler();
            do {
                this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
                this.manuallyTriggeredScheduledExecutor.triggerAll();
            } while (atomicInteger.get() < 5);
            Assert.assertEquals(5L, atomicInteger.get());
            build.stopCheckpointScheduler();
            this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertEquals(5L, atomicInteger.get());
            atomicInteger.set(0);
            build.startCheckpointScheduler();
            do {
                this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
                this.manuallyTriggeredScheduledExecutor.triggerAll();
            } while (atomicInteger.get() < 5);
            Assert.assertEquals(5L, atomicInteger.get());
            build.stopCheckpointScheduler();
            this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertEquals(5L, atomicInteger.get());
            build.shutdown(JobStatus.FINISHED);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testMinTimeBetweenCheckpointsInterval() throws Exception {
        JobID jobID = new JobID();
        ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
        ExecutionVertex mockExecutionVertex = CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID);
        Execution currentExecutionAttempt = mockExecutionVertex.getCurrentExecutionAttempt();
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        ((Execution) Mockito.doAnswer(invocationOnMock -> {
            linkedBlockingQueue.add((Long) invocationOnMock.getArguments()[0]);
            return null;
        }).when(currentExecutionAttempt)).triggerCheckpoint(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), (CheckpointOptions) ArgumentMatchers.any(CheckpointOptions.class));
        CheckpointCoordinator build = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setJobId(jobID).setCheckpointCoordinatorConfiguration(new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setCheckpointInterval(12L).setCheckpointTimeout(200000L).setMinPauseBetweenCheckpoints(50L).setMaxConcurrentCheckpoints(1).build()).setTasks(new ExecutionVertex[]{mockExecutionVertex}).setCompletedCheckpointStore(new StandaloneCompletedCheckpointStore(2)).setTimer(this.manuallyTriggeredScheduledExecutor).build();
        try {
            build.startCheckpointScheduler();
            this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertEquals(1L, ((Long) linkedBlockingQueue.take()).longValue());
            AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(jobID, executionAttemptID, 1L);
            long nanoTime = System.nanoTime();
            build.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO);
            this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            while (linkedBlockingQueue.isEmpty()) {
                Thread.sleep(12L);
                this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
                this.manuallyTriggeredScheduledExecutor.triggerAll();
            }
            Long l = (Long) linkedBlockingQueue.take();
            long nanoTime2 = System.nanoTime();
            Assert.assertEquals(2L, l.longValue());
            long j = (nanoTime2 - nanoTime) / 1000000;
            if (j + 1 < 50) {
                Assert.fail("checkpoint came too early: delay was " + j + " but should have been at least 50");
            }
        } finally {
            build.stopCheckpointScheduler();
            build.shutdown(JobStatus.FINISHED);
        }
    }

    @Test
    public void testStopPeriodicScheduler() throws Exception {
        CheckpointCoordinator createCheckpointCoordinator = createCheckpointCoordinator();
        CompletableFuture<CompletedCheckpoint> triggerPeriodicCheckpoint = triggerPeriodicCheckpoint(createCheckpointCoordinator);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        try {
            triggerPeriodicCheckpoint.get();
            Assert.fail("The triggerCheckpoint call expected an exception");
        } catch (ExecutionException e) {
            Optional findThrowable = ExceptionUtils.findThrowable(e, CheckpointException.class);
            Assert.assertTrue(findThrowable.isPresent());
            Assert.assertEquals(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN, ((CheckpointException) findThrowable.get()).getCheckpointFailureReason());
        }
        CompletableFuture triggerCheckpoint = createCheckpointCoordinator.triggerCheckpoint(CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), (String) null, false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertFalse(triggerCheckpoint.isCompletedExceptionally());
    }

    @Test
    public void testTriggerCheckpointWithShuttingDownCoordinator() throws Exception {
        CheckpointCoordinator createCheckpointCoordinator = createCheckpointCoordinator();
        createCheckpointCoordinator.startCheckpointScheduler();
        CompletableFuture<CompletedCheckpoint> triggerPeriodicCheckpoint = triggerPeriodicCheckpoint(createCheckpointCoordinator);
        createCheckpointCoordinator.shutdown(JobStatus.FAILED);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        try {
            triggerPeriodicCheckpoint.get();
            Assert.fail("Should not reach here");
        } catch (ExecutionException e) {
            Optional findThrowable = ExceptionUtils.findThrowable(e, CheckpointException.class);
            Assert.assertTrue(findThrowable.isPresent());
            Assert.assertEquals(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN, ((CheckpointException) findThrowable.get()).getCheckpointFailureReason());
        }
    }

    @Test
    public void testTriggerCheckpointBeforePreviousOneCompleted() throws Exception {
        ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        CheckpointCoordinator createCheckpointCoordinator = createCheckpointCoordinator(CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID, (executionAttemptID2, jobID, j, j2, checkpointOptions) -> {
            atomicInteger.incrementAndGet();
        }));
        createCheckpointCoordinator.startCheckpointScheduler();
        CompletableFuture<CompletedCheckpoint> triggerPeriodicCheckpoint = triggerPeriodicCheckpoint(createCheckpointCoordinator);
        Assert.assertTrue(createCheckpointCoordinator.isTriggering());
        Assert.assertEquals(0L, createCheckpointCoordinator.getTriggerRequestQueue().size());
        CompletableFuture<CompletedCheckpoint> triggerPeriodicCheckpoint2 = triggerPeriodicCheckpoint(createCheckpointCoordinator);
        Assert.assertTrue(createCheckpointCoordinator.isTriggering());
        Assert.assertEquals(1L, createCheckpointCoordinator.getTriggerRequestQueue().size());
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertFalse(triggerPeriodicCheckpoint.isCompletedExceptionally());
        Assert.assertFalse(triggerPeriodicCheckpoint2.isCompletedExceptionally());
        Assert.assertFalse(createCheckpointCoordinator.isTriggering());
        Assert.assertEquals(0L, createCheckpointCoordinator.getTriggerRequestQueue().size());
        Assert.assertEquals(2L, atomicInteger.get());
    }

    @Test
    public void testTriggerCheckpointRequestQueuedWithFailure() throws Exception {
        ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        CheckpointCoordinator build = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTasks(new ExecutionVertex[]{CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID, (executionAttemptID2, jobID, j, j2, checkpointOptions) -> {
            atomicInteger.incrementAndGet();
        })}).setCheckpointIDCounter(new UnstableCheckpointIDCounter(l -> {
            return l.longValue() == 0;
        })).setTimer(this.manuallyTriggeredScheduledExecutor).build();
        build.startCheckpointScheduler();
        CompletableFuture<CompletedCheckpoint> triggerNonPeriodicCheckpoint = triggerNonPeriodicCheckpoint(build);
        Assert.assertTrue(build.isTriggering());
        Assert.assertEquals(0L, build.getTriggerRequestQueue().size());
        CompletableFuture<CompletedCheckpoint> triggerNonPeriodicCheckpoint2 = triggerNonPeriodicCheckpoint(build);
        CompletableFuture<CompletedCheckpoint> triggerNonPeriodicCheckpoint3 = triggerNonPeriodicCheckpoint(build);
        Assert.assertTrue(build.isTriggering());
        Assert.assertEquals(2L, build.getTriggerRequestQueue().size());
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertTrue(triggerNonPeriodicCheckpoint.isCompletedExceptionally());
        Assert.assertFalse(triggerNonPeriodicCheckpoint2.isCompletedExceptionally());
        Assert.assertFalse(triggerNonPeriodicCheckpoint3.isCompletedExceptionally());
        Assert.assertFalse(build.isTriggering());
        Assert.assertEquals(0L, build.getTriggerRequestQueue().size());
        Assert.assertEquals(2L, atomicInteger.get());
    }

    @Test
    public void testTriggerCheckpointRequestCancelled() throws Exception {
        ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        CheckpointCoordinator createCheckpointCoordinator = createCheckpointCoordinator(CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID, (executionAttemptID2, jobID, j, j2, checkpointOptions) -> {
            atomicInteger.incrementAndGet();
        }));
        CompletableFuture completableFuture = new CompletableFuture();
        createCheckpointCoordinator.addMasterHook(new TestingMasterHook(completableFuture));
        createCheckpointCoordinator.startCheckpointScheduler();
        CompletableFuture<CompletedCheckpoint> triggerPeriodicCheckpoint = triggerPeriodicCheckpoint(createCheckpointCoordinator);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertTrue(createCheckpointCoordinator.isTriggering());
        this.manuallyTriggeredScheduledExecutor.triggerNonPeriodicScheduledTasks();
        Assert.assertTrue(createCheckpointCoordinator.isTriggering());
        try {
            triggerPeriodicCheckpoint.get();
            Assert.fail("Should not reach here");
        } catch (ExecutionException e) {
            Optional findThrowable = ExceptionUtils.findThrowable(e, CheckpointException.class);
            Assert.assertTrue(findThrowable.isPresent());
            Assert.assertEquals(CheckpointFailureReason.CHECKPOINT_EXPIRED, ((CheckpointException) findThrowable.get()).getCheckpointFailureReason());
        }
        completableFuture.complete("finish master hook");
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertFalse(createCheckpointCoordinator.isTriggering());
        Assert.assertEquals(0L, atomicInteger.get());
        Assert.assertEquals(0L, createCheckpointCoordinator.getTriggerRequestQueue().size());
    }

    @Test
    public void testTriggerCheckpointInitializationFailed() throws Exception {
        CheckpointCoordinator build = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTasks(new ExecutionVertex[]{CheckpointCoordinatorTestingUtils.mockExecutionVertex(new ExecutionAttemptID())}).setCheckpointIDCounter(new UnstableCheckpointIDCounter(l -> {
            return l.longValue() == 0;
        })).setTimer(this.manuallyTriggeredScheduledExecutor).build();
        build.startCheckpointScheduler();
        CompletableFuture<CompletedCheckpoint> triggerPeriodicCheckpoint = triggerPeriodicCheckpoint(build);
        Assert.assertTrue(build.isTriggering());
        Assert.assertEquals(0L, build.getTriggerRequestQueue().size());
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        try {
            triggerPeriodicCheckpoint.get();
            Assert.fail("This checkpoint should fail through UnstableCheckpointIDCounter");
        } catch (ExecutionException e) {
            Optional findThrowable = ExceptionUtils.findThrowable(e, CheckpointException.class);
            Assert.assertTrue(findThrowable.isPresent());
            Assert.assertEquals(CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, ((CheckpointException) findThrowable.get()).getCheckpointFailureReason());
        }
        Assert.assertFalse(build.isTriggering());
        Assert.assertEquals(0L, build.getTriggerRequestQueue().size());
        CompletableFuture<CompletedCheckpoint> triggerPeriodicCheckpoint2 = triggerPeriodicCheckpoint(build);
        Assert.assertTrue(build.isTriggering());
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertFalse(triggerPeriodicCheckpoint2.isCompletedExceptionally());
        Assert.assertFalse(build.isTriggering());
        Assert.assertEquals(0L, build.getTriggerRequestQueue().size());
    }

    @Test
    public void testTriggerCheckpointSnapshotMasterHookFailed() throws Exception {
        ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        CheckpointCoordinator createCheckpointCoordinator = createCheckpointCoordinator(CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID, (executionAttemptID2, jobID, j, j2, checkpointOptions) -> {
            atomicInteger.incrementAndGet();
        }));
        CompletableFuture completableFuture = new CompletableFuture();
        createCheckpointCoordinator.addMasterHook(new TestingMasterHook(completableFuture));
        createCheckpointCoordinator.startCheckpointScheduler();
        CompletableFuture<CompletedCheckpoint> triggerPeriodicCheckpoint = triggerPeriodicCheckpoint(createCheckpointCoordinator);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertTrue(createCheckpointCoordinator.isTriggering());
        completableFuture.completeExceptionally(new Exception("by design"));
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertFalse(createCheckpointCoordinator.isTriggering());
        try {
            triggerPeriodicCheckpoint.get();
            Assert.fail("Should not reach here");
        } catch (ExecutionException e) {
            Optional findThrowable = ExceptionUtils.findThrowable(e, CheckpointException.class);
            Assert.assertTrue(findThrowable.isPresent());
            Assert.assertEquals(CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, ((CheckpointException) findThrowable.get()).getCheckpointFailureReason());
        }
        Assert.assertEquals(0L, atomicInteger.get());
        Assert.assertEquals(0L, createCheckpointCoordinator.getTriggerRequestQueue().size());
    }

    @Test
    public void discardingTriggeringCheckpointWillExecuteNextCheckpointRequest() throws Exception {
        ExecutionVertex mockExecutionVertex = CheckpointCoordinatorTestingUtils.mockExecutionVertex(new ExecutionAttemptID());
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        CheckpointCoordinator build = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTasks(new ExecutionVertex[]{mockExecutionVertex}).setTimer(new ScheduledExecutorServiceAdapter(newSingleThreadScheduledExecutor)).setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().build()).build();
        CompletableFuture completableFuture = new CompletableFuture();
        OneShotLatch oneShotLatch = new OneShotLatch();
        build.addMasterHook(new TestingMasterHook(completableFuture, oneShotLatch));
        try {
            build.triggerCheckpoint(false);
            CompletableFuture triggerCheckpoint = build.triggerCheckpoint(false);
            oneShotLatch.await();
            completableFuture.complete("Completed");
            build.abortPendingCheckpoints(new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED));
            try {
                triggerCheckpoint.get();
                Assert.fail("Expected the second checkpoint to fail.");
            } catch (ExecutionException e) {
                Assert.assertThat(ExceptionUtils.stripExecutionException(e), CoreMatchers.instanceOf(CheckpointException.class));
            }
            build.shutdown(JobStatus.FINISHED);
            ExecutorUtils.gracefulShutdown(10L, TimeUnit.SECONDS, new ExecutorService[]{newSingleThreadScheduledExecutor});
        } catch (Throwable th) {
            build.shutdown(JobStatus.FINISHED);
            ExecutorUtils.gracefulShutdown(10L, TimeUnit.SECONDS, new ExecutorService[]{newSingleThreadScheduledExecutor});
            throw th;
        }
    }

    private CheckpointCoordinator createCheckpointCoordinator() {
        return new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer(this.manuallyTriggeredScheduledExecutor).build();
    }

    private CheckpointCoordinator createCheckpointCoordinator(ExecutionVertex executionVertex) {
        return new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTasks(new ExecutionVertex[]{executionVertex}).setTimer(this.manuallyTriggeredScheduledExecutor).build();
    }

    private CompletableFuture<CompletedCheckpoint> triggerPeriodicCheckpoint(CheckpointCoordinator checkpointCoordinator) {
        return checkpointCoordinator.triggerCheckpoint(CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), (String) null, true);
    }

    private CompletableFuture<CompletedCheckpoint> triggerNonPeriodicCheckpoint(CheckpointCoordinator checkpointCoordinator) {
        return checkpointCoordinator.triggerCheckpoint(CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), (String) null, false);
    }
}
