package org.apache.flink.runtime.rest.handler.taskmanager;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.blob.TransientBlobKey;
import org.apache.flink.runtime.blob.TransientBlobService;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException;
import org.apache.flink.runtime.rest.NotFoundException;
import org.apache.flink.runtime.rest.handler.AbstractHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
import org.apache.flink.shaded.guava18.com.google.common.cache.CacheLoader;
import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
import org.apache.flink.shaded.guava18.com.google.common.cache.RemovalNotification;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.class */
public abstract class AbstractTaskManagerFileHandler<M extends TaskManagerMessageParameters> extends AbstractHandler<RestfulGateway, EmptyRequestBody, M> {
    private final GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever;
    private final TransientBlobService transientBlobService;
    private final LoadingCache<Tuple2<ResourceID, String>, CompletableFuture<TransientBlobKey>> fileBlobKeys;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractTaskManagerFileHandler(@Nonnull GatewayRetriever<? extends RestfulGateway> gatewayRetriever, @Nonnull Time time, @Nonnull Map<String, String> map, @Nonnull UntypedResponseMessageHeaders<EmptyRequestBody, M> untypedResponseMessageHeaders, @Nonnull GatewayRetriever<ResourceManagerGateway> gatewayRetriever2, @Nonnull TransientBlobService transientBlobService, @Nonnull Time time2) {
        super(gatewayRetriever, time, map, untypedResponseMessageHeaders);
        this.resourceManagerGatewayRetriever = (GatewayRetriever) Preconditions.checkNotNull(gatewayRetriever2);
        this.transientBlobService = (TransientBlobService) Preconditions.checkNotNull(transientBlobService);
        this.fileBlobKeys = CacheBuilder.newBuilder().expireAfterWrite(time2.toMilliseconds(), TimeUnit.MILLISECONDS).removalListener(this::removeBlob).build(new CacheLoader<Tuple2<ResourceID, String>, CompletableFuture<TransientBlobKey>>() { // from class: org.apache.flink.runtime.rest.handler.taskmanager.AbstractTaskManagerFileHandler.1
            public CompletableFuture<TransientBlobKey> load(Tuple2<ResourceID, String> tuple2) throws Exception {
                return AbstractTaskManagerFileHandler.this.loadTaskManagerFile(tuple2);
            }
        });
    }

    @Override // org.apache.flink.runtime.rest.handler.AbstractHandler
    protected CompletableFuture<Void> respondToRequest(ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest, HandlerRequest<EmptyRequestBody, M> handlerRequest, RestfulGateway restfulGateway) throws RestHandlerException {
        ResourceID resourceID = (ResourceID) handlerRequest.getPathParameter(TaskManagerIdPathParameter.class);
        try {
            return ((CompletableFuture) this.fileBlobKeys.get(new Tuple2(resourceID, getFileName(handlerRequest)))).thenAcceptAsync(transientBlobKey -> {
                try {
                    try {
                        HandlerUtils.transferFile(channelHandlerContext, this.transientBlobService.getFile(transientBlobKey), httpRequest);
                    } catch (FlinkException e) {
                        throw new CompletionException((Throwable) new FlinkException("Could not transfer file to client.", e));
                    }
                } catch (IOException e2) {
                    throw new CompletionException((Throwable) new FlinkException("Could not retrieve file from transient blob store.", e2));
                }
            }, (Executor) channelHandlerContext.executor()).whenComplete((r12, th) -> {
                if (th != null) {
                    this.log.error("Failed to transfer file from TaskExecutor {}.", resourceID, th);
                    this.fileBlobKeys.invalidate(resourceID);
                    Throwable stripCompletionException = ExceptionUtils.stripCompletionException(th);
                    if (!(stripCompletionException instanceof UnknownTaskExecutorException)) {
                        throw new CompletionException((Throwable) new FlinkException(String.format("Failed to transfer file from TaskExecutor %s.", resourceID), stripCompletionException));
                    }
                    throw new CompletionException((Throwable) new NotFoundException(String.format("Failed to transfer file from TaskExecutor %s because it was unknown.", resourceID), stripCompletionException));
                }
            });
        } catch (ExecutionException e) {
            throw new RestHandlerException("Could not retrieve file blob key future.", HttpResponseStatus.INTERNAL_SERVER_ERROR, ExceptionUtils.stripExecutionException(e));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public CompletableFuture<TransientBlobKey> loadTaskManagerFile(Tuple2<ResourceID, String> tuple2) throws RestHandlerException {
        this.log.debug("Load file from TaskManager {}.", tuple2.f0);
        return requestFileUpload(this.resourceManagerGatewayRetriever.getNow().orElseThrow(() -> {
            this.log.debug("Could not connect to ResourceManager right now.");
            return new RestHandlerException("Cannot connect to ResourceManager right now. Please try to refresh.", HttpResponseStatus.NOT_FOUND);
        }), tuple2);
    }

    protected abstract CompletableFuture<TransientBlobKey> requestFileUpload(ResourceManagerGateway resourceManagerGateway, Tuple2<ResourceID, String> tuple2);

    private void removeBlob(RemovalNotification<Tuple2<ResourceID, String>, CompletableFuture<TransientBlobKey>> removalNotification) {
        this.log.debug("Remove cached file for TaskExecutor {}.", removalNotification.getKey());
        CompletableFuture completableFuture = (CompletableFuture) removalNotification.getValue();
        if (completableFuture != null) {
            TransientBlobService transientBlobService = this.transientBlobService;
            transientBlobService.getClass();
            completableFuture.thenAccept(transientBlobService::deleteFromCache);
        }
    }

    protected String getFileName(HandlerRequest<EmptyRequestBody, M> handlerRequest) {
        return null;
    }
}
