package org.apache.flink.runtime.io.network.partition;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.io.network.partition.PartitionTrackerFactory;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.taskexecutor.partition.PartitionTable;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.class */
public class PartitionTrackerImpl implements PartitionTracker {
    private final JobID jobId;
    private final PartitionTable<ResourceID> partitionTable = new PartitionTable<>();
    private final Map<ResultPartitionID, PartitionInfo> partitionInfos = new HashMap();
    private final ShuffleMaster<?> shuffleMaster;
    private final PartitionTrackerFactory.TaskExecutorGatewayLookup taskExecutorGatewayLookup;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl$PartitionInfo.class */
    public static final class PartitionInfo {
        public final ResourceID producingTaskExecutorResourceId;
        public final ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor;

        private PartitionInfo(ResourceID resourceID, ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor) {
            this.producingTaskExecutorResourceId = resourceID;
            this.resultPartitionDeploymentDescriptor = resultPartitionDeploymentDescriptor;
        }
    }

    public PartitionTrackerImpl(JobID jobID, ShuffleMaster<?> shuffleMaster, PartitionTrackerFactory.TaskExecutorGatewayLookup taskExecutorGatewayLookup) {
        this.jobId = (JobID) Preconditions.checkNotNull(jobID);
        this.shuffleMaster = (ShuffleMaster) Preconditions.checkNotNull(shuffleMaster);
        this.taskExecutorGatewayLookup = taskExecutorGatewayLookup;
    }

    @Override // org.apache.flink.runtime.io.network.partition.PartitionTracker
    public void startTrackingPartition(ResourceID resourceID, ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor) {
        Preconditions.checkNotNull(resourceID);
        Preconditions.checkNotNull(resultPartitionDeploymentDescriptor);
        if (resultPartitionDeploymentDescriptor.getPartitionType().isBlocking()) {
            ResultPartitionID resultPartitionID = resultPartitionDeploymentDescriptor.getShuffleDescriptor().getResultPartitionID();
            this.partitionInfos.put(resultPartitionID, new PartitionInfo(resourceID, resultPartitionDeploymentDescriptor));
            this.partitionTable.startTrackingPartitions(resourceID, Collections.singletonList(resultPartitionID));
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.PartitionTracker
    public void stopTrackingPartitionsFor(ResourceID resourceID) {
        Preconditions.checkNotNull(resourceID);
        Iterator<ResultPartitionID> it = this.partitionTable.stopTrackingPartitions(resourceID).iterator();
        while (it.hasNext()) {
            internalStopTrackingPartition(it.next());
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.PartitionTracker
    public void stopTrackingAndReleasePartitions(Collection<ResultPartitionID> collection) {
        Preconditions.checkNotNull(collection);
        ((Map) collection.stream().map(this::internalStopTrackingPartition).filter((v0) -> {
            return v0.isPresent();
        }).map((v0) -> {
            return v0.get();
        }).collect(Collectors.groupingBy(partitionInfo -> {
            return partitionInfo.producingTaskExecutorResourceId;
        }, Collectors.mapping(partitionInfo2 -> {
            return partitionInfo2.resultPartitionDeploymentDescriptor;
        }, Collectors.toList())))).forEach((v1, v2) -> {
            internalReleasePartitions(v1, v2);
        });
    }

    @Override // org.apache.flink.runtime.io.network.partition.PartitionTracker
    public void stopTrackingPartitions(Collection<ResultPartitionID> collection) {
        Preconditions.checkNotNull(collection);
        collection.forEach(this::internalStopTrackingPartition);
    }

    @Override // org.apache.flink.runtime.io.network.partition.PartitionTracker
    public void stopTrackingAndReleasePartitionsFor(ResourceID resourceID) {
        Preconditions.checkNotNull(resourceID);
        stopTrackingAndReleasePartitions(this.partitionTable.stopTrackingPartitions(resourceID));
    }

    @Override // org.apache.flink.runtime.io.network.partition.PartitionTracker
    public boolean isTrackingPartitionsFor(ResourceID resourceID) {
        Preconditions.checkNotNull(resourceID);
        return this.partitionTable.hasTrackedPartitions(resourceID);
    }

    @Override // org.apache.flink.runtime.io.network.partition.PartitionTracker
    public boolean isPartitionTracked(ResultPartitionID resultPartitionID) {
        Preconditions.checkNotNull(resultPartitionID);
        return this.partitionInfos.containsKey(resultPartitionID);
    }

    private Optional<PartitionInfo> internalStopTrackingPartition(ResultPartitionID resultPartitionID) {
        PartitionInfo remove = this.partitionInfos.remove(resultPartitionID);
        if (remove == null) {
            return Optional.empty();
        }
        this.partitionTable.stopTrackingPartitions(remove.producingTaskExecutorResourceId, Collections.singletonList(resultPartitionID));
        return Optional.of(remove);
    }

    private void internalReleasePartitions(ResourceID resourceID, Collection<ResultPartitionDeploymentDescriptor> collection) {
        internalReleasePartitionsOnTaskExecutor(resourceID, collection);
        internalReleasePartitionsOnShuffleMaster(collection);
    }

    private void internalReleasePartitionsOnTaskExecutor(ResourceID resourceID, Collection<ResultPartitionDeploymentDescriptor> collection) {
        List list = (List) collection.stream().map((v0) -> {
            return v0.getShuffleDescriptor();
        }).filter(shuffleDescriptor -> {
            return shuffleDescriptor.storesLocalResourcesOn().isPresent();
        }).map((v0) -> {
            return v0.getResultPartitionID();
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            return;
        }
        this.taskExecutorGatewayLookup.lookup(resourceID).ifPresent(taskExecutorGateway -> {
            taskExecutorGateway.releasePartitions(this.jobId, list);
        });
    }

    private void internalReleasePartitionsOnShuffleMaster(Collection<ResultPartitionDeploymentDescriptor> collection) {
        Stream<R> map = collection.stream().map((v0) -> {
            return v0.getShuffleDescriptor();
        });
        ShuffleMaster<?> shuffleMaster = this.shuffleMaster;
        shuffleMaster.getClass();
        map.forEach(shuffleMaster::releasePartitionExternally);
    }
}
