package org.apache.flink.runtime.io.network.partition;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.io.network.partition.AbstractPartitionTracker;
import org.apache.flink.runtime.io.network.partition.PartitionTrackerFactory;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.class */
public class JobMasterPartitionTrackerImpl extends AbstractPartitionTracker<ResourceID, ResultPartitionDeploymentDescriptor> implements JobMasterPartitionTracker {
    private final JobID jobId;
    private final ShuffleMaster<?> shuffleMaster;
    private final PartitionTrackerFactory.TaskExecutorGatewayLookup taskExecutorGatewayLookup;
    private ResourceManagerGateway resourceManagerGateway;
    private final Map<IntermediateDataSetID, List<ShuffleDescriptor>> clusterPartitionShuffleDescriptors = new HashMap();

    public JobMasterPartitionTrackerImpl(JobID jobID, ShuffleMaster<?> shuffleMaster, PartitionTrackerFactory.TaskExecutorGatewayLookup taskExecutorGatewayLookup) {
        this.jobId = (JobID) Preconditions.checkNotNull(jobID);
        this.shuffleMaster = (ShuffleMaster) Preconditions.checkNotNull(shuffleMaster);
        this.taskExecutorGatewayLookup = taskExecutorGatewayLookup;
    }

    @Override // org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker
    public void startTrackingPartition(ResourceID resourceID, ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor) {
        Preconditions.checkNotNull(resourceID);
        Preconditions.checkNotNull(resultPartitionDeploymentDescriptor);
        if (resultPartitionDeploymentDescriptor.getPartitionType().isReleaseByScheduler()) {
            startTrackingPartition(resourceID, resultPartitionDeploymentDescriptor.getShuffleDescriptor().getResultPartitionID(), resultPartitionDeploymentDescriptor);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flink.runtime.io.network.partition.AbstractPartitionTracker
    public void startTrackingPartition(ResourceID resourceID, ResultPartitionID resultPartitionID, ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor) {
        if (resultPartitionDeploymentDescriptor.getShuffleDescriptor().storesLocalResourcesOn().isPresent()) {
            this.partitionTable.startTrackingPartitions(resourceID, Collections.singletonList(resultPartitionID));
        }
        this.partitionInfos.put(resultPartitionID, new AbstractPartitionTracker.PartitionInfo(resourceID, resultPartitionDeploymentDescriptor));
    }

    @Override // org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker
    public void stopTrackingAndReleasePartitions(Collection<ResultPartitionID> collection, boolean z) {
        stopTrackingAndHandlePartitions(collection, (resourceID, collection2) -> {
            internalReleasePartitions(resourceID, collection2, z);
        });
    }

    @Override // org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker
    public CompletableFuture<Void> stopTrackingAndPromotePartitions(Collection<ResultPartitionID> collection) {
        ArrayList arrayList = new ArrayList();
        stopTrackingAndHandlePartitions(collection, (resourceID, collection2) -> {
            arrayList.add(internalPromotePartitionsOnTaskExecutor(resourceID, collection2));
        });
        return FutureUtils.completeAll(arrayList);
    }

    @Override // org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker
    public Collection<ResultPartitionDeploymentDescriptor> getAllTrackedPartitions() {
        return (Collection) this.partitionInfos.values().stream().map((v0) -> {
            return v0.getMetaInfo();
        }).collect(Collectors.toList());
    }

    @Override // org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker
    public void connectToResourceManager(ResourceManagerGateway resourceManagerGateway) {
        this.resourceManagerGateway = resourceManagerGateway;
    }

    @Override // org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker
    public List<ShuffleDescriptor> getClusterPartitionShuffleDescriptors(IntermediateDataSetID intermediateDataSetID) {
        return this.clusterPartitionShuffleDescriptors.computeIfAbsent(intermediateDataSetID, this::requestShuffleDescriptorsFromResourceManager);
    }

    private List<ShuffleDescriptor> requestShuffleDescriptorsFromResourceManager(IntermediateDataSetID intermediateDataSetID) {
        Preconditions.checkNotNull(this.resourceManagerGateway, "JobMaster is not connected to ResourceManager");
        try {
            return this.resourceManagerGateway.getClusterPartitionsShuffleDescriptors(intermediateDataSetID).get();
        } catch (Throwable th) {
            throw new RuntimeException(String.format("Failed to get shuffle descriptors of intermediate dataset %s from ResourceManager", intermediateDataSetID), th);
        }
    }

    private void stopTrackingAndHandlePartitions(Collection<ResultPartitionID> collection, BiConsumer<ResourceID, Collection<ResultPartitionDeploymentDescriptor>> biConsumer) {
        Preconditions.checkNotNull(collection);
        ((Map) stopTrackingPartitions(collection).stream().collect(Collectors.groupingBy((v0) -> {
            return v0.getKey();
        }, Collectors.mapping((v0) -> {
            return v0.getMetaInfo();
        }, Collectors.toList())))).forEach(biConsumer);
    }

    private void internalReleasePartitions(ResourceID resourceID, Collection<ResultPartitionDeploymentDescriptor> collection, boolean z) {
        internalReleasePartitionsOnTaskExecutor(resourceID, collection);
        if (z) {
            internalReleasePartitionsOnShuffleMaster(collection.stream());
        }
    }

    private CompletableFuture<Acknowledge> internalPromotePartitionsOnTaskExecutor(ResourceID resourceID, Collection<ResultPartitionDeploymentDescriptor> collection) {
        TaskExecutorGateway orElse;
        Set<ResultPartitionID> set = (Set) collection.stream().filter(JobMasterPartitionTrackerImpl::isPartitionWithLocalResources).map(JobMasterPartitionTrackerImpl::getResultPartitionId).collect(Collectors.toSet());
        return (set.isEmpty() || (orElse = this.taskExecutorGatewayLookup.lookup(resourceID).orElse(null)) == null) ? CompletableFuture.completedFuture(null) : orElse.promotePartitions(this.jobId, set);
    }

    private void internalReleasePartitionsOnTaskExecutor(ResourceID resourceID, Collection<ResultPartitionDeploymentDescriptor> collection) {
        Set set = (Set) collection.stream().filter(JobMasterPartitionTrackerImpl::isPartitionWithLocalResources).map(JobMasterPartitionTrackerImpl::getResultPartitionId).collect(Collectors.toSet());
        if (set.isEmpty()) {
            return;
        }
        this.taskExecutorGatewayLookup.lookup(resourceID).ifPresent(taskExecutorGateway -> {
            taskExecutorGateway.releasePartitions(this.jobId, set);
        });
    }

    private void internalReleasePartitionsOnShuffleMaster(Stream<ResultPartitionDeploymentDescriptor> stream) {
        Stream<R> map = stream.map((v0) -> {
            return v0.getShuffleDescriptor();
        });
        ShuffleMaster<?> shuffleMaster = this.shuffleMaster;
        shuffleMaster.getClass();
        map.forEach(shuffleMaster::releasePartitionExternally);
    }

    private static boolean isPartitionWithLocalResources(ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor) {
        return resultPartitionDeploymentDescriptor.getShuffleDescriptor().storesLocalResourcesOn().isPresent();
    }

    private static ResultPartitionID getResultPartitionId(ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor) {
        return resultPartitionDeploymentDescriptor.getShuffleDescriptor().getResultPartitionID();
    }
}
