package org.apache.flink.runtime.scheduler.adaptivebatch;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.BatchExecutionOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.executiongraph.InternalExecutionGraphAccessor;
import org.apache.flink.runtime.executiongraph.JobVertexInputInfo;
import org.apache.flink.runtime.failure.FailureEnricherUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmaster.event.ExecutionJobVertexInitializedEvent;
import org.apache.flink.runtime.jobmaster.event.ExecutionVertexFinishedEvent;
import org.apache.flink.runtime.jobmaster.event.ExecutionVertexResetEvent;
import org.apache.flink.runtime.jobmaster.event.JobEvent;
import org.apache.flink.runtime.jobmaster.event.JobEventManager;
import org.apache.flink.runtime.jobmaster.event.JobEventReplayHandler;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder;
import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.shuffle.DefaultShuffleMasterSnapshotContext;
import org.apache.flink.runtime.shuffle.PartitionWithMetrics;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleMasterSnapshot;
import org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptivebatch/DefaultBatchJobRecoveryHandler.class */
public class DefaultBatchJobRecoveryHandler implements BatchJobRecoveryHandler, JobEventReplayHandler {
    private final JobEventManager jobEventManager;
    private BatchJobRecoveryContext context;
    private long lastSnapshotRelativeTime;
    private static final ResourceID UNKNOWN_PRODUCER = ResourceID.generate();
    private final long snapshotMinPauseMills;
    private Clock clock;
    private final Duration previousWorkerRecoveryTimeout;
    private final Logger log = LoggerFactory.getLogger(getClass());
    private final Set<JobVertexID> needToSnapshotJobVertices = new HashSet();
    private final Map<ExecutionVertexID, ExecutionVertexFinishedEvent> executionVertexFinishedEventMap = new LinkedHashMap();
    private final List<ExecutionJobVertexInitializedEvent> jobVertexInitializedEvents = new ArrayList();
    private final Set<JobVertexID> jobVerticesWithUnRecoveredCoordinators = new HashSet();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptivebatch/DefaultBatchJobRecoveryHandler$PartitionReservationStatus.class */
    public enum PartitionReservationStatus {
        RELEASE,
        RESERVE,
        OPTIONAL
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptivebatch/DefaultBatchJobRecoveryHandler$ReconcileResult.class */
    public static class ReconcileResult {
        private final Set<ResultPartitionID> partitionsToRelease;
        private final Set<ResultPartitionID> partitionsMissing;
        private final Set<ResultPartitionID> partitionsToReserve;

        ReconcileResult(Set<ResultPartitionID> set, Set<ResultPartitionID> set2, Set<ResultPartitionID> set3) {
            this.partitionsToRelease = (Set) Preconditions.checkNotNull(set);
            this.partitionsMissing = (Set) Preconditions.checkNotNull(set2);
            this.partitionsToReserve = (Set) Preconditions.checkNotNull(set3);
        }
    }

    public DefaultBatchJobRecoveryHandler(JobEventManager jobEventManager, Configuration configuration) {
        this.jobEventManager = jobEventManager;
        this.previousWorkerRecoveryTimeout = (Duration) configuration.get(BatchExecutionOptions.JOB_RECOVERY_PREVIOUS_WORKER_RECOVERY_TIMEOUT);
        this.snapshotMinPauseMills = ((Duration) configuration.get(BatchExecutionOptions.JOB_RECOVERY_SNAPSHOT_MIN_PAUSE)).toMillis();
    }

    @Override // org.apache.flink.runtime.scheduler.adaptivebatch.BatchJobRecoveryHandler
    public void initialize(BatchJobRecoveryContext batchJobRecoveryContext) {
        this.context = (BatchJobRecoveryContext) Preconditions.checkNotNull(batchJobRecoveryContext);
        this.clock = SystemClock.getInstance();
        try {
            this.jobEventManager.start();
        } catch (Throwable th) {
            batchJobRecoveryContext.failJob(th, System.currentTimeMillis(), FailureEnricherUtils.EMPTY_FAILURE_LABELS);
        }
    }

    @Override // org.apache.flink.runtime.scheduler.adaptivebatch.BatchJobRecoveryHandler
    public void stop(boolean z) {
        this.jobEventManager.stop(z);
    }

    @Override // org.apache.flink.runtime.scheduler.adaptivebatch.BatchJobRecoveryHandler
    public void startRecovering() {
        this.context.getMainThreadExecutor().assertRunningInMainThread();
        startRecoveringInternal();
        this.context.getShuffleMaster().notifyPartitionRecoveryStarted(this.context.getExecutionGraph().getJobID());
        if (this.jobEventManager.replay(this)) {
            this.log.info("Replay all job events successfully.");
            recoverPartitions().whenComplete((r3, th) -> {
                if (th != null) {
                    recoverFailed();
                }
                try {
                    recoverFinished();
                } catch (Exception e) {
                    recoverFailed();
                }
            });
        } else {
            this.log.warn("Fail to replay log for {}, will start the job as a new one.", this.context.getExecutionGraph().getJobID());
            recoverFailed();
        }
    }

    @Override // org.apache.flink.runtime.scheduler.adaptivebatch.BatchJobRecoveryHandler
    public boolean needRecover() {
        try {
            return this.jobEventManager.hasJobEvents();
        } catch (Throwable th) {
            this.context.failJob(th, System.currentTimeMillis(), FailureEnricherUtils.EMPTY_FAILURE_LABELS);
            return false;
        }
    }

    @Override // org.apache.flink.runtime.scheduler.adaptivebatch.BatchJobRecoveryHandler
    public boolean isRecovering() {
        return this.context.getExecutionGraph().getState() == JobStatus.RECONCILING;
    }

    private void restoreShuffleMaster(List<ShuffleMasterSnapshot> list) {
        Preconditions.checkState(this.context.getShuffleMaster().supportsBatchSnapshot());
        this.context.getShuffleMaster().restoreState(list);
    }

    private void startRecoveringInternal() {
        this.log.info("Try to recover status from previously failed job master.");
        this.context.getExecutionGraph().transitionState(JobStatus.CREATED, JobStatus.RECONCILING);
    }

    private void restoreOperatorCoordinators(Map<OperatorID, byte[]> map, Map<OperatorID, JobVertexID> map2) throws Exception {
        Iterator<Map.Entry<OperatorID, byte[]>> it = map.entrySet().iterator();
        while (it.hasNext()) {
            ExecutionJobVertex executionJobVertex = getExecutionJobVertex((JobVertexID) Preconditions.checkNotNull(map2.get(it.next().getKey())));
            this.log.info("Restore operator coordinators of {} from job event.", executionJobVertex.getName());
            for (OperatorCoordinatorHolder operatorCoordinatorHolder : executionJobVertex.getOperatorCoordinators()) {
                if (operatorCoordinatorHolder.coordinator().supportsBatchSnapshot()) {
                    operatorCoordinatorHolder.resetToCheckpoint(-1L, map.get(operatorCoordinatorHolder.operatorId()));
                }
            }
        }
        determineVerticesForResetAfterRestoreOpCoordinator();
    }

    @Override // org.apache.flink.runtime.jobmaster.event.JobEventReplayHandler
    public void startReplay() {
    }

    @Override // org.apache.flink.runtime.jobmaster.event.JobEventReplayHandler
    public void replayOneEvent(JobEvent jobEvent) {
        if (jobEvent instanceof ExecutionVertexFinishedEvent) {
            ExecutionVertexFinishedEvent executionVertexFinishedEvent = (ExecutionVertexFinishedEvent) jobEvent;
            this.executionVertexFinishedEventMap.put(executionVertexFinishedEvent.getExecutionVertexId(), executionVertexFinishedEvent);
        } else if (!(jobEvent instanceof ExecutionVertexResetEvent)) {
            if (!(jobEvent instanceof ExecutionJobVertexInitializedEvent)) {
                throw new IllegalStateException("Unsupported job event " + jobEvent);
            }
            this.jobVertexInitializedEvents.add((ExecutionJobVertexInitializedEvent) jobEvent);
        } else {
            Iterator<ExecutionVertexID> it = ((ExecutionVertexResetEvent) jobEvent).getExecutionVertexIds().iterator();
            while (it.hasNext()) {
                this.executionVertexFinishedEventMap.remove(it.next());
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v90, types: [java.util.List] */
    /* JADX WARN: Type inference failed for: r7v0, types: [org.apache.flink.runtime.scheduler.adaptivebatch.DefaultBatchJobRecoveryHandler] */
    @Override // org.apache.flink.runtime.jobmaster.event.JobEventReplayHandler
    public void finalizeReplay() throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        ArrayList arrayList = new ArrayList();
        for (ExecutionJobVertexInitializedEvent executionJobVertexInitializedEvent : this.jobVertexInitializedEvents) {
            ExecutionJobVertex executionJobVertex = getExecutionJobVertex(executionJobVertexInitializedEvent.getJobVertexId());
            this.context.initializeJobVertex(executionJobVertex, executionJobVertexInitializedEvent.getParallelism(), executionJobVertexInitializedEvent.getJobVertexInputInfos(), currentTimeMillis);
            arrayList.add(executionJobVertex);
        }
        this.context.updateTopology(arrayList);
        LinkedList linkedList = new LinkedList(this.executionVertexFinishedEventMap.values());
        while (!linkedList.isEmpty() && !((ExecutionVertexFinishedEvent) linkedList.getLast()).hasOperatorCoordinatorAndShuffleMasterSnapshots()) {
            linkedList.removeLast();
        }
        if (linkedList.isEmpty()) {
            return;
        }
        HashMap hashMap = new HashMap();
        ArrayList arrayList2 = new ArrayList();
        Iterator it = linkedList.iterator();
        while (it.hasNext()) {
            ExecutionVertexFinishedEvent executionVertexFinishedEvent = (ExecutionVertexFinishedEvent) it.next();
            ExecutionJobVertex jobVertex = this.context.getExecutionGraph().getJobVertex(executionVertexFinishedEvent.getExecutionVertexId().getJobVertexId());
            Preconditions.checkState(jobVertex.isInitialized());
            jobVertex.getTaskVertices()[executionVertexFinishedEvent.getExecutionVertexId().getSubtaskIndex()].getCurrentExecutionAttempt().recoverExecution(executionVertexFinishedEvent.getExecutionAttemptId(), executionVertexFinishedEvent.getTaskManagerLocation(), executionVertexFinishedEvent.getUserAccumulators(), executionVertexFinishedEvent.getIOMetrics());
            for (Map.Entry<OperatorID, CompletableFuture<byte[]>> entry : executionVertexFinishedEvent.getOperatorCoordinatorSnapshotFutures().entrySet()) {
                Preconditions.checkState(entry.getValue().isDone());
                hashMap.put(entry.getKey(), entry.getValue().get());
            }
            if (executionVertexFinishedEvent.getShuffleMasterSnapshotFuture() != null) {
                Preconditions.checkState(executionVertexFinishedEvent.getShuffleMasterSnapshotFuture().isDone());
                ShuffleMasterSnapshot shuffleMasterSnapshot = executionVertexFinishedEvent.getShuffleMasterSnapshotFuture().get();
                if (shuffleMasterSnapshot.isIncremental()) {
                    arrayList2.add(shuffleMasterSnapshot);
                } else {
                    arrayList2 = Arrays.asList(shuffleMasterSnapshot);
                }
            }
        }
        HashMap hashMap2 = new HashMap();
        for (ExecutionJobVertex executionJobVertex2 : this.context.getExecutionGraph().getAllVertices().values()) {
            if (executionJobVertex2.isInitialized()) {
                Iterator<OperatorCoordinatorHolder> it2 = executionJobVertex2.getOperatorCoordinators().iterator();
                while (it2.hasNext()) {
                    hashMap2.put(it2.next().operatorId(), executionJobVertex2.getJobVertexId());
                }
            }
        }
        try {
            restoreOperatorCoordinators(hashMap, hashMap2);
            restoreShuffleMaster(arrayList2);
        } catch (Exception e) {
            this.log.warn("Restore coordinator operator failed.", e);
            throw e;
        }
    }

    @Override // org.apache.flink.runtime.scheduler.adaptivebatch.BatchJobRecoveryHandler
    public void onExecutionVertexReset(Collection<ExecutionVertexID> collection) {
        Preconditions.checkState(!isRecovering());
        this.jobEventManager.writeEvent(new ExecutionVertexResetEvent(new ArrayList(collection)), false);
    }

    @Override // org.apache.flink.runtime.scheduler.adaptivebatch.BatchJobRecoveryHandler
    public void onExecutionJobVertexInitialization(JobVertexID jobVertexID, int i, Map<IntermediateDataSetID, JobVertexInputInfo> map) {
        Preconditions.checkState(!isRecovering());
        this.jobEventManager.writeEvent(new ExecutionJobVertexInitializedEvent(jobVertexID, i, map), false);
    }

    @Override // org.apache.flink.runtime.scheduler.adaptivebatch.BatchJobRecoveryHandler
    public void onExecutionFinished(ExecutionVertexID executionVertexID) {
        Preconditions.checkState(!isRecovering());
        Execution currentExecutionAttempt = getExecutionVertex(executionVertexID).getCurrentExecutionAttempt();
        boolean z = currentExecutionAttempt.getVertex().getJobVertex().getAggregateState() == ExecutionState.FINISHED;
        this.needToSnapshotJobVertices.add(executionVertexID.getJobVertexId());
        HashMap hashMap = new HashMap();
        CompletableFuture<ShuffleMasterSnapshot> completableFuture = null;
        long relativeTimeMillis = this.clock.relativeTimeMillis();
        if (z || relativeTimeMillis - this.lastSnapshotRelativeTime >= this.snapshotMinPauseMills) {
            hashMap.putAll(snapshotOperatorCoordinators());
            this.lastSnapshotRelativeTime = relativeTimeMillis;
            this.needToSnapshotJobVertices.clear();
            completableFuture = snapshotShuffleMaster();
        }
        this.jobEventManager.writeEvent(new ExecutionVertexFinishedEvent(currentExecutionAttempt.getAttemptId(), currentExecutionAttempt.getAssignedResourceLocation(), hashMap, completableFuture, currentExecutionAttempt.getIOMetrics(), currentExecutionAttempt.getUserAccumulators()), z);
    }

    private Map<OperatorID, CompletableFuture<byte[]>> snapshotOperatorCoordinators() {
        HashMap hashMap = new HashMap();
        Iterator<JobVertexID> it = this.needToSnapshotJobVertices.iterator();
        while (it.hasNext()) {
            ExecutionJobVertex executionJobVertex = (ExecutionJobVertex) Preconditions.checkNotNull(getExecutionJobVertex(it.next()));
            this.log.info("Snapshot operator coordinators of {} to job event, checkpointId {}.", executionJobVertex.getName(), -1L);
            for (OperatorCoordinatorHolder operatorCoordinatorHolder : executionJobVertex.getOperatorCoordinators()) {
                if (operatorCoordinatorHolder.coordinator().supportsBatchSnapshot()) {
                    CompletableFuture<byte[]> completableFuture = new CompletableFuture<>();
                    operatorCoordinatorHolder.checkpointCoordinator(-1L, completableFuture);
                    hashMap.put(operatorCoordinatorHolder.operatorId(), completableFuture);
                }
            }
        }
        return hashMap;
    }

    private CompletableFuture<ShuffleMasterSnapshot> snapshotShuffleMaster() {
        Preconditions.checkState(this.context.getShuffleMaster().supportsBatchSnapshot());
        CompletableFuture<ShuffleMasterSnapshot> completableFuture = new CompletableFuture<>();
        this.context.getShuffleMaster().snapshotState(completableFuture, new DefaultShuffleMasterSnapshotContext());
        return completableFuture;
    }

    private void determineVerticesForResetAfterRestoreOpCoordinator() throws Exception {
        Set<ExecutionVertexID> hashSet = new HashSet<>();
        for (ExecutionJobVertex executionJobVertex : this.context.getExecutionGraph().getAllVertices().values()) {
            if (executionJobVertex.isInitialized() && !executionJobVertex.getOperatorCoordinators().isEmpty()) {
                boolean allMatch = executionJobVertex.getOperatorCoordinators().stream().allMatch(operatorCoordinatorHolder -> {
                    return operatorCoordinatorHolder.coordinator().supportsBatchSnapshot();
                });
                Set set = (Set) Arrays.stream(executionJobVertex.getTaskVertices()).filter(executionVertex -> {
                    return executionVertex.getExecutionState() != ExecutionState.FINISHED;
                }).map(executionVertex2 -> {
                    executionVertex2.getCurrentExecutionAttempt().transitionState(ExecutionState.CANCELED);
                    return executionVertex2.getID();
                }).collect(Collectors.toSet());
                if (allMatch) {
                    this.log.info("All operator coordinators of jobVertex {} support batch snapshot, add {} unfinished tasks to revise.", executionJobVertex.getName(), Integer.valueOf(set.size()));
                    hashSet.addAll(set);
                } else if (set.isEmpty()) {
                    this.log.info("JobVertex {} is finished, but not all of its operator coordinators support batch snapshot. Therefore, if any single task within it requires a restart in the future, all tasks associated with this JobVertex need to be restarted as well.", executionJobVertex.getName());
                    this.jobVerticesWithUnRecoveredCoordinators.add(executionJobVertex.getJobVertexId());
                } else {
                    this.log.info("Restart all tasks of jobVertex {} because it has not been finished and not all of its operator coordinators support batch snapshot.", executionJobVertex.getName());
                    hashSet.addAll((Collection) Arrays.stream(executionJobVertex.getTaskVertices()).map((v0) -> {
                        return v0.getID();
                    }).collect(Collectors.toSet()));
                }
            }
        }
        resetVerticesInRecovering(hashSet, false);
    }

    private void resetVerticesInRecovering(Set<ExecutionVertexID> set, boolean z) throws Exception {
        Preconditions.checkState(isRecovering());
        HashSet hashSet = new HashSet();
        while (!set.isEmpty()) {
            for (ExecutionVertexID executionVertexID : set) {
                if (!hashSet.contains(executionVertexID)) {
                    hashSet.addAll(this.context.getTasksNeedingRestart(executionVertexID, z));
                }
            }
            Stream map = hashSet.stream().map((v0) -> {
                return v0.getJobVertexId();
            });
            Set<JobVertexID> set2 = this.jobVerticesWithUnRecoveredCoordinators;
            set2.getClass();
            Set set3 = (Set) map.filter((v1) -> {
                return r1.contains(v1);
            }).collect(Collectors.toSet());
            this.jobVerticesWithUnRecoveredCoordinators.removeAll(set3);
            set = (Set) set3.stream().flatMap(jobVertexID -> {
                return Arrays.stream(getExecutionJobVertex(jobVertexID).getTaskVertices()).map((v0) -> {
                    return v0.getID();
                });
            }).collect(Collectors.toSet());
        }
        this.context.resetVerticesInRecovering((Set) hashSet.stream().filter(executionVertexID2 -> {
            return getExecutionVertex(executionVertexID2).getExecutionState() != ExecutionState.CREATED;
        }).collect(Collectors.toSet()));
    }

    private void recoverFailed() {
        this.log.warn(String.format("Job %s recover failed from JM failover, fail global.", this.context.getExecutionGraph().getJobID()));
        this.context.getExecutionGraph().transitionState(JobStatus.RECONCILING, JobStatus.RUNNING);
        this.jobEventManager.stop(true);
        try {
            this.jobEventManager.start();
            this.context.onRecoveringFailed();
        } catch (Throwable th) {
            this.context.failJob(th, System.currentTimeMillis(), FailureEnricherUtils.EMPTY_FAILURE_LABELS);
        }
    }

    private void recoverFinished() {
        this.log.info("Job {} successfully recovered from JM failover", this.context.getExecutionGraph().getJobID());
        this.context.getExecutionGraph().transitionState(JobStatus.RECONCILING, JobStatus.RUNNING);
        checkExecutionGraphState();
        this.context.onRecoveringFinished(this.jobVerticesWithUnRecoveredCoordinators);
    }

    private void checkExecutionGraphState() {
        Iterator<ExecutionVertex> it = this.context.getExecutionGraph().getAllExecutionVertices().iterator();
        while (it.hasNext()) {
            ExecutionState executionState = it.next().getExecutionState();
            Preconditions.checkState(executionState == ExecutionState.CREATED || executionState == ExecutionState.FINISHED);
        }
    }

    private CompletableFuture<Void> recoverPartitions() {
        this.context.getMainThreadExecutor().assertRunningInMainThread();
        return reconcilePartitions().thenAccept(tuple2 -> {
            ReconcileResult reconcileResult = (ReconcileResult) tuple2.f0;
            Collection collection = (Collection) tuple2.f1;
            this.log.info("Partitions to be released: {}, missed partitions: {}, partitions to be reserved: {}.", new Object[]{reconcileResult.partitionsToRelease, reconcileResult.partitionsMissing, reconcileResult.partitionsToReserve});
            ((InternalExecutionGraphAccessor) this.context.getExecutionGraph()).getPartitionTracker().stopTrackingAndReleasePartitions(reconcileResult.partitionsToRelease);
            HashMap hashMap = new HashMap();
            collection.stream().filter(partitionWithMetrics -> {
                return reconcileResult.partitionsToReserve.contains(partitionWithMetrics.getPartition().getResultPartitionID());
            }).forEach(partitionWithMetrics2 -> {
                ShuffleDescriptor partition = partitionWithMetrics2.getPartition();
                ResourceID resourceID = UNKNOWN_PRODUCER;
                if (partition.storesLocalResourcesOn().isPresent()) {
                    resourceID = partition.storesLocalResourcesOn().get();
                }
                ((InternalExecutionGraphAccessor) this.context.getExecutionGraph()).getPartitionTracker().startTrackingPartition(resourceID, Execution.createResultPartitionDeploymentDescriptor(this.context.getExecutionGraph().getResultPartitionOrThrow(partition.getResultPartitionID().getPartitionId()), partition));
                hashMap.put(partition.getResultPartitionID().getPartitionId(), partitionWithMetrics2.getPartitionMetrics().getPartitionBytes());
            });
            HashMap hashMap2 = new HashMap();
            ((InternalExecutionGraphAccessor) this.context.getExecutionGraph()).getPartitionTracker().getAllTrackedNonClusterPartitions().forEach(resultPartitionDeploymentDescriptor -> {
                ExecutionVertexID executionVertexId = resultPartitionDeploymentDescriptor.getShuffleDescriptor().getResultPartitionID().getProducerId().getExecutionVertexId();
                if (!hashMap2.containsKey(executionVertexId)) {
                    hashMap2.put(executionVertexId, new HashMap());
                }
                ((Map) hashMap2.get(executionVertexId)).put(resultPartitionDeploymentDescriptor.getPartitionId(), resultPartitionDeploymentDescriptor);
            });
            hashMap2.forEach((executionVertexID, map) -> {
                getExecutionVertex(executionVertexID).getCurrentExecutionAttempt().recoverProducedPartitions(map);
            });
            this.context.updateResultPartitionBytesMetrics(hashMap);
            try {
                resetVerticesInRecovering((Set) reconcileResult.partitionsMissing.stream().map((v0) -> {
                    return v0.getPartitionId();
                }).map(this::getProducer).map((v0) -> {
                    return v0.getID();
                }).collect(Collectors.toSet()), true);
            } catch (Exception e) {
                throw new CompletionException(e);
            }
        });
    }

    private CompletableFuture<Tuple2<ReconcileResult, Collection<PartitionWithMetrics>>> reconcilePartitions() {
        List<IntermediateResultPartition> list = (List) this.context.getExecutionGraph().getAllIntermediateResults().values().stream().flatMap(intermediateResult -> {
            return Arrays.stream(intermediateResult.getPartitions());
        }).collect(Collectors.toList());
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        for (IntermediateResultPartition intermediateResultPartition : list) {
            PartitionReservationStatus partitionReservationStatus = getPartitionReservationStatus(intermediateResultPartition);
            if (partitionReservationStatus.equals(PartitionReservationStatus.RESERVE)) {
                hashSet.add(createResultPartitionId(intermediateResultPartition.getPartitionId()));
            } else if (partitionReservationStatus.equals(PartitionReservationStatus.RELEASE)) {
                hashSet2.add(createResultPartitionId(intermediateResultPartition.getPartitionId()));
            }
        }
        return this.context.getShuffleMaster().getPartitionWithMetrics(this.context.getExecutionGraph().getJobID(), this.previousWorkerRecoveryTimeout, hashSet).thenApplyAsync(collection -> {
            Set set = (Set) collection.stream().map((v0) -> {
                return v0.getPartition();
            }).map((v0) -> {
                return v0.getResultPartitionID();
            }).collect(Collectors.toSet());
            return Tuple2.of(new ReconcileResult(Sets.intersection(hashSet2, set), Sets.difference(hashSet, set), Sets.intersection(hashSet, set)), collection);
        }, (Executor) this.context.getMainThreadExecutor());
    }

    private ResultPartitionID createResultPartitionId(IntermediateResultPartitionID intermediateResultPartitionID) {
        return new ResultPartitionID(intermediateResultPartitionID, getProducer(intermediateResultPartitionID).getPartitionProducer().getAttemptId());
    }

    private ExecutionVertex getProducer(IntermediateResultPartitionID intermediateResultPartitionID) {
        return this.context.getExecutionGraph().getResultPartitionOrThrow(intermediateResultPartitionID).getProducer();
    }

    private PartitionReservationStatus getPartitionReservationStatus(IntermediateResultPartition intermediateResultPartition) {
        if (!(getProducer(intermediateResultPartition.getPartitionId()).getExecutionState() == ExecutionState.FINISHED)) {
            return PartitionReservationStatus.RELEASE;
        }
        if (intermediateResultPartition.getIntermediateResult().getConsumerVertices().stream().allMatch(jobVertexID -> {
            return getExecutionJobVertex(jobVertexID).isInitialized();
        }) && !getConsumers(intermediateResultPartition.getPartitionId()).stream().anyMatch(executionVertex -> {
            return executionVertex.getExecutionState() != ExecutionState.FINISHED;
        })) {
            return PartitionReservationStatus.OPTIONAL;
        }
        return PartitionReservationStatus.RESERVE;
    }

    private List<ExecutionVertex> getConsumers(IntermediateResultPartitionID intermediateResultPartitionID) {
        List<ConsumerVertexGroup> consumerVertexGroups = this.context.getExecutionGraph().getResultPartitionOrThrow(intermediateResultPartitionID).getConsumerVertexGroups();
        ArrayList arrayList = new ArrayList();
        Iterator<ConsumerVertexGroup> it = consumerVertexGroups.iterator();
        while (it.hasNext()) {
            Iterator<ExecutionVertexID> it2 = it.next().iterator();
            while (it2.hasNext()) {
                arrayList.add(getExecutionVertex(it2.next()));
            }
        }
        return arrayList;
    }

    private ExecutionVertex getExecutionVertex(ExecutionVertexID executionVertexID) {
        return this.context.getExecutionGraph().getAllVertices().get(executionVertexID.getJobVertexId()).getTaskVertices()[executionVertexID.getSubtaskIndex()];
    }

    private ExecutionJobVertex getExecutionJobVertex(JobVertexID jobVertexID) {
        return this.context.getExecutionGraph().getAllVertices().get(jobVertexID);
    }
}
