package org.apache.flink.runtime.scheduler;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.executiongraph.SlotProviderStrategy;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.scheduler.ExecutionVertexSchedulingRequirements;
import org.apache.flink.runtime.scheduler.TestingInputsLocationsRetriever;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.class */
public class DefaultExecutionSlotAllocatorTest extends TestLogger {
    private AllocationToggableSlotProvider slotProvider;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest$AllocationToggableSlotProvider.class */
    public static class AllocationToggableSlotProvider implements SlotProvider {
        private final List<Tuple3<SlotRequestId, ScheduledUnit, SlotProfile>> slotAllocationRequests;
        private final List<SlotRequestId> cancelledSlotRequestIds;
        private boolean slotAllocationDisabled;

        private AllocationToggableSlotProvider() {
            this.slotAllocationRequests = new ArrayList();
            this.cancelledSlotRequestIds = new ArrayList();
        }

        public CompletableFuture<LogicalSlot> allocateSlot(SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, Time time) {
            this.slotAllocationRequests.add(Tuple3.of(slotRequestId, scheduledUnit, slotProfile));
            return this.slotAllocationDisabled ? new CompletableFuture<>() : CompletableFuture.completedFuture(new TestingLogicalSlotBuilder().createTestingLogicalSlot());
        }

        public void cancelSlotRequest(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable th) {
            this.cancelledSlotRequestIds.add(slotRequestId);
        }

        public List<Tuple3<SlotRequestId, ScheduledUnit, SlotProfile>> getSlotAllocationRequests() {
            return Collections.unmodifiableList(this.slotAllocationRequests);
        }

        public void disableSlotAllocation() {
            this.slotAllocationDisabled = true;
        }

        List<SlotRequestId> getCancelledSlotRequestIds() {
            return this.cancelledSlotRequestIds;
        }
    }

    @Before
    public void setUp() throws Exception {
        this.slotProvider = new AllocationToggableSlotProvider();
    }

    @Test
    public void testConsumersAssignedToSlotsAfterProducers() {
        ExecutionVertexID executionVertexID = new ExecutionVertexID(new JobVertexID(), 0);
        ExecutionVertexID executionVertexID2 = new ExecutionVertexID(new JobVertexID(), 0);
        TestingInputsLocationsRetriever build = new TestingInputsLocationsRetriever.Builder().connectConsumerToProducer(executionVertexID2, executionVertexID).build();
        DefaultExecutionSlotAllocator createExecutionSlotAllocator = createExecutionSlotAllocator(new TestingStateLocationRetriever(), build);
        build.markScheduled(executionVertexID);
        build.markScheduled(executionVertexID2);
        List allocateSlotsFor = createExecutionSlotAllocator.allocateSlotsFor(ExecutionSlotAllocatorTestUtils.createSchedulingRequirements(executionVertexID, executionVertexID2));
        Assert.assertThat(allocateSlotsFor, Matchers.hasSize(2));
        SlotExecutionVertexAssignment findSlotAssignmentByExecutionVertexId = ExecutionSlotAllocatorTestUtils.findSlotAssignmentByExecutionVertexId(executionVertexID, allocateSlotsFor);
        SlotExecutionVertexAssignment findSlotAssignmentByExecutionVertexId2 = ExecutionSlotAllocatorTestUtils.findSlotAssignmentByExecutionVertexId(executionVertexID2, allocateSlotsFor);
        Assert.assertTrue(findSlotAssignmentByExecutionVertexId.getLogicalSlotFuture().isDone());
        Assert.assertFalse(findSlotAssignmentByExecutionVertexId2.getLogicalSlotFuture().isDone());
        build.assignTaskManagerLocation(executionVertexID);
        Assert.assertTrue(findSlotAssignmentByExecutionVertexId2.getLogicalSlotFuture().isDone());
        Assert.assertThat(createExecutionSlotAllocator.getPendingSlotAssignments().keySet(), Matchers.hasSize(0));
    }

    @Test
    public void testAllocateSlotsParameters() {
        ExecutionVertexID executionVertexID = new ExecutionVertexID(new JobVertexID(), 0);
        AllocationID allocationID = new AllocationID();
        SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
        ResourceProfile fromResources = ResourceProfile.fromResources(0.5d, 250);
        ResourceProfile fromResources2 = ResourceProfile.fromResources(1.0d, 300);
        CoLocationConstraint locationConstraint = new CoLocationGroup().getLocationConstraint(0);
        LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
        TestingStateLocationRetriever testingStateLocationRetriever = new TestingStateLocationRetriever();
        testingStateLocationRetriever.setStateLocation(executionVertexID, localTaskManagerLocation);
        createExecutionSlotAllocator(testingStateLocationRetriever, new TestingInputsLocationsRetriever.Builder().build()).allocateSlotsFor(Arrays.asList(new ExecutionVertexSchedulingRequirements.Builder().withExecutionVertexId(executionVertexID).withPreviousAllocationId(allocationID).withSlotSharingGroupId(slotSharingGroupId).withPhysicalSlotResourceProfile(fromResources2).withTaskResourceProfile(fromResources).withCoLocationConstraint(locationConstraint).build()));
        Assert.assertThat(this.slotProvider.getSlotAllocationRequests(), Matchers.hasSize(1));
        ScheduledUnit scheduledUnit = (ScheduledUnit) this.slotProvider.getSlotAllocationRequests().get(0).f1;
        SlotProfile slotProfile = (SlotProfile) this.slotProvider.getSlotAllocationRequests().get(0).f2;
        Assert.assertEquals(slotSharingGroupId, scheduledUnit.getSlotSharingGroupId());
        Assert.assertEquals(locationConstraint, scheduledUnit.getCoLocationConstraint());
        Assert.assertThat(slotProfile.getPreferredAllocations(), Matchers.contains(new AllocationID[]{allocationID}));
        Assert.assertThat(slotProfile.getPreviousExecutionGraphAllocations(), Matchers.contains(new AllocationID[]{allocationID}));
        Assert.assertEquals(fromResources, slotProfile.getTaskResourceProfile());
        Assert.assertEquals(fromResources2, slotProfile.getPhysicalSlotResourceProfile());
        Assert.assertThat(slotProfile.getPreferredLocations(), Matchers.contains(new TaskManagerLocation[]{localTaskManagerLocation}));
    }

    @Test
    public void testDuplicatedSlotAllocationIsNotAllowed() {
        ExecutionVertexID executionVertexID = new ExecutionVertexID(new JobVertexID(), 0);
        DefaultExecutionSlotAllocator createExecutionSlotAllocator = createExecutionSlotAllocator();
        this.slotProvider.disableSlotAllocation();
        List<ExecutionVertexSchedulingRequirements> createSchedulingRequirements = ExecutionSlotAllocatorTestUtils.createSchedulingRequirements(executionVertexID);
        createExecutionSlotAllocator.allocateSlotsFor(createSchedulingRequirements);
        try {
            createExecutionSlotAllocator.allocateSlotsFor(createSchedulingRequirements);
            Assert.fail("exception should happen");
        } catch (IllegalStateException e) {
        }
    }

    @Test
    public void testSlotAssignmentIsProperlyRegistered() {
        DefaultExecutionSlotAllocator createExecutionSlotAllocator = createExecutionSlotAllocator();
        ExecutionVertexID executionVertexID = new ExecutionVertexID(new JobVertexID(), 0);
        List<ExecutionVertexSchedulingRequirements> createSchedulingRequirements = ExecutionSlotAllocatorTestUtils.createSchedulingRequirements(executionVertexID);
        this.slotProvider.disableSlotAllocation();
        Assert.assertThat(createExecutionSlotAllocator.getPendingSlotAssignments().values(), Matchers.contains(new SlotExecutionVertexAssignment[]{(SlotExecutionVertexAssignment) createExecutionSlotAllocator.allocateSlotsFor(createSchedulingRequirements).iterator().next()}));
        createExecutionSlotAllocator.cancel(executionVertexID);
        Assert.assertThat(createExecutionSlotAllocator.getPendingSlotAssignments().keySet(), Matchers.hasSize(0));
        Assert.assertThat(this.slotProvider.getCancelledSlotRequestIds(), Matchers.contains(new SlotRequestId[]{(SlotRequestId) ((Tuple3) this.slotProvider.slotAllocationRequests.get(0)).f0}));
    }

    private DefaultExecutionSlotAllocator createExecutionSlotAllocator() {
        return createExecutionSlotAllocator(new TestingStateLocationRetriever(), new TestingInputsLocationsRetriever.Builder().build());
    }

    private DefaultExecutionSlotAllocator createExecutionSlotAllocator(StateLocationRetriever stateLocationRetriever, InputsLocationsRetriever inputsLocationsRetriever) {
        return new DefaultExecutionSlotAllocator(SlotProviderStrategy.from(ScheduleMode.EAGER, this.slotProvider, Time.seconds(10L)), new DefaultPreferredLocationsRetriever(stateLocationRetriever, inputsLocationsRetriever));
    }
}
