package org.apache.flink.runtime.resourcemanager.active;

import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.io.network.partition.NoOpResourceManagerPartitionTracker;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.TaskExecutorRegistration;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
import org.apache.flink.runtime.resourcemanager.active.TestingResourceManagerDriver;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.TestingSlotManagerBuilder;
import org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorMemoryConfiguration;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.RunnableWithException;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.class */
public class ActiveResourceManagerTest extends TestLogger {

    @ClassRule
    public static final TestingRpcServiceResource RPC_SERVICE_RESOURCE = new TestingRpcServiceResource();
    private static final long TIMEOUT_SEC = 5;
    private static final Time TIMEOUT_TIME = Time.seconds(TIMEOUT_SEC);
    private static final WorkerResourceSpec WORKER_RESOURCE_SPEC = WorkerResourceSpec.ZERO;
    private static final TaskExecutorMemoryConfiguration TESTING_CONFIG = new TaskExecutorMemoryConfiguration(1L, 2L, 3L, 4L, Long.valueOf(TIMEOUT_SEC), 6L, 7L, 8L, 21L, 36L);

    /* loaded from: input_file:org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest$Context.class */
    private static class Context {
        final Configuration flinkConfig;
        final TestingResourceManagerDriver.Builder driverBuilder;
        final TestingSlotManagerBuilder slotManagerBuilder;
        private ActiveResourceManager<ResourceID> resourceManager;
        private TestingFatalErrorHandler fatalErrorHandler;

        private Context() {
            this.flinkConfig = new Configuration();
            this.driverBuilder = new TestingResourceManagerDriver.Builder();
            this.slotManagerBuilder = new TestingSlotManagerBuilder();
        }

        ActiveResourceManager<ResourceID> getResourceManager() {
            return this.resourceManager;
        }

        TestingFatalErrorHandler getFatalErrorHandler() {
            return this.fatalErrorHandler;
        }

        void runTest(RunnableWithException runnableWithException) throws Exception {
            this.fatalErrorHandler = new TestingFatalErrorHandler();
            this.resourceManager = createAndStartResourceManager(this.flinkConfig, this.driverBuilder.build(), this.slotManagerBuilder.createSlotManager());
            try {
                runnableWithException.run();
            } finally {
                this.resourceManager.close();
            }
        }

        private ActiveResourceManager<ResourceID> createAndStartResourceManager(Configuration configuration, ResourceManagerDriver<ResourceID> resourceManagerDriver, SlotManager slotManager) throws Exception {
            TestingRpcService testingRpcService = ActiveResourceManagerTest.RPC_SERVICE_RESOURCE.getTestingRpcService();
            MockResourceManagerRuntimeServices mockResourceManagerRuntimeServices = new MockResourceManagerRuntimeServices(testingRpcService, ActiveResourceManagerTest.TIMEOUT_TIME, slotManager);
            ActiveResourceManager<ResourceID> activeResourceManager = new ActiveResourceManager<>(resourceManagerDriver, configuration, testingRpcService, ResourceID.generate(), mockResourceManagerRuntimeServices.highAvailabilityServices, mockResourceManagerRuntimeServices.heartbeatServices, mockResourceManagerRuntimeServices.slotManager, NoOpResourceManagerPartitionTracker::get, mockResourceManagerRuntimeServices.jobLeaderIdService, new ClusterInformation("localhost", 1234), this.fatalErrorHandler, UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup(), ForkJoinPool.commonPool());
            activeResourceManager.start();
            mockResourceManagerRuntimeServices.grantLeadership();
            return activeResourceManager;
        }

        void runInMainThread(Runnable runnable) {
            this.resourceManager.runInMainThread(() -> {
                runnable.run();
                return null;
            }, ActiveResourceManagerTest.TIMEOUT_TIME);
        }

        <T> CompletableFuture<T> runInMainThread(Callable<T> callable) {
            return this.resourceManager.runInMainThread(callable, ActiveResourceManagerTest.TIMEOUT_TIME);
        }

        CompletableFuture<RegistrationResponse> registerTaskExecutor(ResourceID resourceID) {
            return registerTaskExecutor(resourceID, new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway());
        }

        CompletableFuture<RegistrationResponse> registerTaskExecutor(ResourceID resourceID, TaskExecutorGateway taskExecutorGateway) {
            ActiveResourceManagerTest.RPC_SERVICE_RESOURCE.getTestingRpcService().registerGateway(resourceID.toString(), taskExecutorGateway);
            return this.resourceManager.getSelfGateway(ResourceManagerGateway.class).registerTaskExecutor(new TaskExecutorRegistration(resourceID.toString(), resourceID, 1234, 23456, new HardwareDescription(1, 2L, 3L, 4L), ActiveResourceManagerTest.TESTING_CONFIG, ResourceProfile.ZERO, ResourceProfile.ZERO), ActiveResourceManagerTest.TIMEOUT_TIME);
        }
    }

    @Test
    public void testStartNewWorker() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.resourcemanager.active.ActiveResourceManagerTest.1
            {
                ResourceID generate = ResourceID.generate();
                CompletableFuture completableFuture = new CompletableFuture();
                this.driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
                    completableFuture.complete(taskExecutorProcessSpec);
                    return CompletableFuture.completedFuture(generate);
                });
                runTest(() -> {
                    CompletableFuture runInMainThread = runInMainThread(() -> {
                        return Boolean.valueOf(getResourceManager().startNewWorker(ActiveResourceManagerTest.WORKER_RESOURCE_SPEC));
                    });
                    TaskExecutorProcessSpec taskExecutorProcessSpec2 = (TaskExecutorProcessSpec) completableFuture.get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS);
                    Assert.assertThat(runInMainThread.get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS), Matchers.is(true));
                    Assert.assertThat(taskExecutorProcessSpec2, Matchers.is(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(this.flinkConfig, ActiveResourceManagerTest.WORKER_RESOURCE_SPEC)));
                    Assert.assertThat(registerTaskExecutor(generate).get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS), Matchers.instanceOf(RegistrationResponse.Success.class));
                });
            }
        };
    }

    @Test
    public void testStartNewWorkerFailedRequesting() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.resourcemanager.active.ActiveResourceManagerTest.2
            {
                ResourceID generate = ResourceID.generate();
                AtomicInteger atomicInteger = new AtomicInteger(0);
                ArrayList arrayList = new ArrayList();
                arrayList.add(new CompletableFuture());
                arrayList.add(new CompletableFuture());
                ArrayList arrayList2 = new ArrayList();
                arrayList2.add(new CompletableFuture());
                arrayList2.add(new CompletableFuture());
                this.driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
                    int andIncrement = atomicInteger.getAndIncrement();
                    Assert.assertThat(Integer.valueOf(andIncrement), Matchers.lessThan(2));
                    ((CompletableFuture) arrayList2.get(andIncrement)).complete(taskExecutorProcessSpec);
                    return (CompletableFuture) arrayList.get(andIncrement);
                });
                this.slotManagerBuilder.setGetRequiredResourcesSupplier(() -> {
                    return Collections.singletonMap(ActiveResourceManagerTest.WORKER_RESOURCE_SPEC, 1);
                });
                runTest(() -> {
                    CompletableFuture runInMainThread = runInMainThread(() -> {
                        return Boolean.valueOf(getResourceManager().startNewWorker(ActiveResourceManagerTest.WORKER_RESOURCE_SPEC));
                    });
                    TaskExecutorProcessSpec taskExecutorProcessSpec2 = (TaskExecutorProcessSpec) ((CompletableFuture) arrayList2.get(0)).get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS);
                    Assert.assertThat(runInMainThread.get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS), Matchers.is(true));
                    Assert.assertThat(taskExecutorProcessSpec2, Matchers.is(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(this.flinkConfig, ActiveResourceManagerTest.WORKER_RESOURCE_SPEC)));
                    runInMainThread(() -> {
                        return Boolean.valueOf(((CompletableFuture) arrayList.get(0)).completeExceptionally(new Throwable("testing error")));
                    });
                    Assert.assertThat((TaskExecutorProcessSpec) ((CompletableFuture) arrayList2.get(1)).get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS), Matchers.is(taskExecutorProcessSpec2));
                    runInMainThread(() -> {
                        return Boolean.valueOf(((CompletableFuture) arrayList.get(1)).complete(generate));
                    });
                    Assert.assertThat(registerTaskExecutor(generate).get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS), Matchers.instanceOf(RegistrationResponse.Success.class));
                });
            }
        };
    }

    @Test
    public void testWorkerTerminatedBeforeRegister() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.resourcemanager.active.ActiveResourceManagerTest.3
            {
                AtomicInteger atomicInteger = new AtomicInteger(0);
                ArrayList arrayList = new ArrayList();
                arrayList.add(ResourceID.generate());
                arrayList.add(ResourceID.generate());
                ArrayList arrayList2 = new ArrayList();
                arrayList2.add(new CompletableFuture());
                arrayList2.add(new CompletableFuture());
                this.driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
                    int andIncrement = atomicInteger.getAndIncrement();
                    Assert.assertThat(Integer.valueOf(andIncrement), Matchers.lessThan(2));
                    ((CompletableFuture) arrayList2.get(andIncrement)).complete(taskExecutorProcessSpec);
                    return CompletableFuture.completedFuture(arrayList.get(andIncrement));
                });
                this.slotManagerBuilder.setGetRequiredResourcesSupplier(() -> {
                    return Collections.singletonMap(ActiveResourceManagerTest.WORKER_RESOURCE_SPEC, 1);
                });
                runTest(() -> {
                    CompletableFuture runInMainThread = runInMainThread(() -> {
                        return Boolean.valueOf(getResourceManager().startNewWorker(ActiveResourceManagerTest.WORKER_RESOURCE_SPEC));
                    });
                    TaskExecutorProcessSpec taskExecutorProcessSpec2 = (TaskExecutorProcessSpec) ((CompletableFuture) arrayList2.get(0)).get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS);
                    Assert.assertThat(runInMainThread.get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS), Matchers.is(true));
                    Assert.assertThat(taskExecutorProcessSpec2, Matchers.is(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(this.flinkConfig, ActiveResourceManagerTest.WORKER_RESOURCE_SPEC)));
                    runInMainThread(() -> {
                        getResourceManager().onWorkerTerminated((ResourceID) arrayList.get(0), "terminate for testing");
                    });
                    Assert.assertThat((TaskExecutorProcessSpec) ((CompletableFuture) arrayList2.get(1)).get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS), Matchers.is(taskExecutorProcessSpec2));
                    Assert.assertThat(registerTaskExecutor((ResourceID) arrayList.get(1)).get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS), Matchers.instanceOf(RegistrationResponse.Success.class));
                });
            }
        };
    }

    @Test
    public void testWorkerTerminatedAfterRegister() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.resourcemanager.active.ActiveResourceManagerTest.4
            {
                AtomicInteger atomicInteger = new AtomicInteger(0);
                ArrayList arrayList = new ArrayList();
                arrayList.add(ResourceID.generate());
                arrayList.add(ResourceID.generate());
                ArrayList arrayList2 = new ArrayList();
                arrayList2.add(new CompletableFuture());
                arrayList2.add(new CompletableFuture());
                this.driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
                    int andIncrement = atomicInteger.getAndIncrement();
                    Assert.assertThat(Integer.valueOf(andIncrement), Matchers.lessThan(2));
                    ((CompletableFuture) arrayList2.get(andIncrement)).complete(taskExecutorProcessSpec);
                    return CompletableFuture.completedFuture(arrayList.get(andIncrement));
                });
                this.slotManagerBuilder.setGetRequiredResourcesSupplier(() -> {
                    return Collections.singletonMap(ActiveResourceManagerTest.WORKER_RESOURCE_SPEC, 1);
                });
                runTest(() -> {
                    CompletableFuture runInMainThread = runInMainThread(() -> {
                        return Boolean.valueOf(getResourceManager().startNewWorker(ActiveResourceManagerTest.WORKER_RESOURCE_SPEC));
                    });
                    TaskExecutorProcessSpec taskExecutorProcessSpec2 = (TaskExecutorProcessSpec) ((CompletableFuture) arrayList2.get(0)).get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS);
                    Assert.assertThat(runInMainThread.get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS), Matchers.is(true));
                    Assert.assertThat(taskExecutorProcessSpec2, Matchers.is(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(this.flinkConfig, ActiveResourceManagerTest.WORKER_RESOURCE_SPEC)));
                    Assert.assertThat(registerTaskExecutor((ResourceID) arrayList.get(0)).get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS), Matchers.instanceOf(RegistrationResponse.Success.class));
                    runInMainThread(() -> {
                        getResourceManager().onWorkerTerminated((ResourceID) arrayList.get(0), "terminate for testing");
                    });
                    Assert.assertThat((TaskExecutorProcessSpec) ((CompletableFuture) arrayList2.get(1)).get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS), Matchers.is(taskExecutorProcessSpec2));
                    Assert.assertThat(registerTaskExecutor((ResourceID) arrayList.get(1)).get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS), Matchers.instanceOf(RegistrationResponse.Success.class));
                });
            }
        };
    }

    @Test
    public void testWorkerTerminatedNoLongerRequired() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.resourcemanager.active.ActiveResourceManagerTest.5
            {
                ResourceID generate = ResourceID.generate();
                AtomicInteger atomicInteger = new AtomicInteger(0);
                ArrayList arrayList = new ArrayList();
                arrayList.add(new CompletableFuture());
                arrayList.add(new CompletableFuture());
                this.driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
                    int andIncrement = atomicInteger.getAndIncrement();
                    Assert.assertThat(Integer.valueOf(andIncrement), Matchers.lessThan(2));
                    ((CompletableFuture) arrayList.get(andIncrement)).complete(taskExecutorProcessSpec);
                    return CompletableFuture.completedFuture(generate);
                });
                runTest(() -> {
                    CompletableFuture runInMainThread = runInMainThread(() -> {
                        return Boolean.valueOf(getResourceManager().startNewWorker(ActiveResourceManagerTest.WORKER_RESOURCE_SPEC));
                    });
                    TaskExecutorProcessSpec taskExecutorProcessSpec2 = (TaskExecutorProcessSpec) ((CompletableFuture) arrayList.get(0)).get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS);
                    Assert.assertThat(runInMainThread.get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS), Matchers.is(true));
                    Assert.assertThat(taskExecutorProcessSpec2, Matchers.is(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(this.flinkConfig, ActiveResourceManagerTest.WORKER_RESOURCE_SPEC)));
                    Assert.assertThat(registerTaskExecutor(generate).get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS), Matchers.instanceOf(RegistrationResponse.Success.class));
                    runInMainThread(() -> {
                        getResourceManager().onWorkerTerminated(generate, "terminate for testing");
                        return null;
                    }).get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS);
                    Assert.assertFalse(((CompletableFuture) arrayList.get(1)).isDone());
                });
            }
        };
    }

    @Test
    public void testCloseTaskManagerConnectionOnWorkerTerminated() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.resourcemanager.active.ActiveResourceManagerTest.6
            {
                ResourceID generate = ResourceID.generate();
                CompletableFuture completableFuture = new CompletableFuture();
                CompletableFuture completableFuture2 = new CompletableFuture();
                TestingTaskExecutorGateway createTestingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setDisconnectResourceManagerConsumer(exc -> {
                    completableFuture2.complete(null);
                }).createTestingTaskExecutorGateway();
                this.driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
                    completableFuture.complete(taskExecutorProcessSpec);
                    return CompletableFuture.completedFuture(generate);
                });
                runTest(() -> {
                    runInMainThread(() -> {
                        return Boolean.valueOf(getResourceManager().startNewWorker(ActiveResourceManagerTest.WORKER_RESOURCE_SPEC));
                    }).thenCompose(bool -> {
                        return registerTaskExecutor(generate, createTestingTaskExecutorGateway);
                    }).thenRun(() -> {
                        runInMainThread(() -> {
                            getResourceManager().onWorkerTerminated(generate, "terminate for testing");
                        });
                    });
                    completableFuture2.get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS);
                });
            }
        };
    }

    @Test
    public void testRecoverWorkerFromPreviousAttempt() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.resourcemanager.active.ActiveResourceManagerTest.7
            {
                ResourceID generate = ResourceID.generate();
                runTest(() -> {
                    runInMainThread(() -> {
                        getResourceManager().onPreviousAttemptWorkersRecovered(Collections.singleton(generate));
                    });
                    Assert.assertThat(registerTaskExecutor(generate).get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS), Matchers.instanceOf(RegistrationResponse.Success.class));
                });
            }
        };
    }

    @Test
    public void testRegisterUnknownWorker() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.resourcemanager.active.ActiveResourceManagerTest.8
            {
                runTest(() -> {
                    Assert.assertThat(registerTaskExecutor(ResourceID.generate()).get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS), Matchers.instanceOf(RegistrationResponse.Rejection.class));
                });
            }
        };
    }

    @Test
    public void testOnError() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.resourcemanager.active.ActiveResourceManagerTest.9
            {
                Throwable th = new Throwable("Testing fatal error");
                runTest(() -> {
                    runInMainThread(() -> {
                        getResourceManager().onError(th);
                    });
                    Assert.assertThat(getFatalErrorHandler().getErrorFuture().get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS), Matchers.is(th));
                });
            }
        };
    }
}
