package org.apache.flink.runtime.scheduler.adaptivebatch;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.BatchExecutionOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blocklist.BlocklistOperations;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertexInputInfo;
import org.apache.flink.runtime.executiongraph.IOMetrics;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.executiongraph.JobVertexInputInfo;
import org.apache.flink.runtime.executiongraph.MarkPartitionFinishedStrategy;
import org.apache.flink.runtime.executiongraph.ParallelismAndInputInfos;
import org.apache.flink.runtime.executiongraph.ResultPartitionBytes;
import org.apache.flink.runtime.executiongraph.VertexInputInfoComputationUtils;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.FailureHandlingResult;
import org.apache.flink.runtime.executiongraph.failover.RestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.failover.RestartPipelinedRegionFailoverStrategy;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.forwardgroup.ForwardGroup;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalResult;
import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalTopology;
import org.apache.flink.runtime.jobgraph.topology.LogicalResult;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.scheduler.DefaultExecutionDeployer;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
import org.apache.flink.runtime.scheduler.ExecutionOperations;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.ExecutionVertexVersion;
import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
import org.apache.flink.runtime.scheduler.VertexParallelismStore;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.source.coordinator.SourceCoordinator;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.class */
public class AdaptiveBatchScheduler extends DefaultScheduler {
    private final DefaultLogicalTopology logicalTopology;
    private final VertexParallelismAndInputInfosDecider vertexParallelismAndInputInfosDecider;
    private final Map<JobVertexID, ForwardGroup> forwardGroupsByJobVertexId;
    private final Map<IntermediateDataSetID, BlockingResultInfo> blockingResultInfos;
    private final JobManagerOptions.HybridPartitionDataConsumeConstraint hybridPartitionDataConsumeConstraint;
    private final Map<JobVertexID, CompletableFuture<Integer>> sourceParallelismFuturesByJobVertexId;
    private final SpeculativeExecutionHandler speculativeExecutionHandler;
    private Set<JobVertexID> jobVerticesWithUnRecoveredCoordinators;
    private final BatchJobRecoveryHandler jobRecoveryHandler;

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler$DefaultBatchJobRecoveryContext.class */
    private class DefaultBatchJobRecoveryContext implements BatchJobRecoveryContext {
        private final FailoverStrategy restartStrategyOnResultConsumable;
        private final FailoverStrategy restartStrategyNotOnResultConsumable;

        private DefaultBatchJobRecoveryContext() {
            this.restartStrategyOnResultConsumable = new RestartPipelinedRegionFailoverStrategy.Factory().create(AdaptiveBatchScheduler.this.getSchedulingTopology(), AdaptiveBatchScheduler.this.getResultPartitionAvailabilityChecker());
            this.restartStrategyNotOnResultConsumable = new RestartPipelinedRegionFailoverStrategy.Factory().create(AdaptiveBatchScheduler.this.getSchedulingTopology(), intermediateResultPartitionID -> {
                return true;
            });
        }

        @Override // org.apache.flink.runtime.scheduler.adaptivebatch.BatchJobRecoveryContext
        public ExecutionGraph getExecutionGraph() {
            return AdaptiveBatchScheduler.this.getExecutionGraph();
        }

        @Override // org.apache.flink.runtime.scheduler.adaptivebatch.BatchJobRecoveryContext
        public ShuffleMaster<?> getShuffleMaster() {
            return AdaptiveBatchScheduler.this.shuffleMaster;
        }

        @Override // org.apache.flink.runtime.scheduler.adaptivebatch.BatchJobRecoveryContext
        public Set<ExecutionVertexID> getTasksNeedingRestart(ExecutionVertexID executionVertexID, boolean z) {
            return z ? this.restartStrategyOnResultConsumable.getTasksNeedingRestart(executionVertexID, null) : this.restartStrategyNotOnResultConsumable.getTasksNeedingRestart(executionVertexID, null);
        }

        @Override // org.apache.flink.runtime.scheduler.adaptivebatch.BatchJobRecoveryContext
        public ComponentMainThreadExecutor getMainThreadExecutor() {
            return AdaptiveBatchScheduler.this.getMainThreadExecutor();
        }

        @Override // org.apache.flink.runtime.scheduler.adaptivebatch.BatchJobRecoveryContext
        public void resetVerticesInRecovering(Set<ExecutionVertexID> set) throws Exception {
            Iterator<ExecutionVertexID> it = set.iterator();
            while (it.hasNext()) {
                AdaptiveBatchScheduler.this.notifyCoordinatorsAboutTaskFailure(AdaptiveBatchScheduler.this.getExecutionVertex(it.next()).getCurrentExecutionAttempt(), null);
            }
            AdaptiveBatchScheduler.this.resetForNewExecutions(set);
            AdaptiveBatchScheduler.this.restoreState(set, false);
        }

        @Override // org.apache.flink.runtime.scheduler.adaptivebatch.BatchJobRecoveryContext
        public void updateResultPartitionBytesMetrics(Map<IntermediateResultPartitionID, ResultPartitionBytes> map) {
            AdaptiveBatchScheduler.this.updateResultPartitionBytesMetrics(map);
        }

        @Override // org.apache.flink.runtime.scheduler.adaptivebatch.BatchJobRecoveryContext
        public void initializeJobVertex(ExecutionJobVertex executionJobVertex, int i, Map<IntermediateDataSetID, JobVertexInputInfo> map, long j) throws JobException {
            AdaptiveBatchScheduler.this.initializeJobVertex(executionJobVertex, i, map, j);
        }

        @Override // org.apache.flink.runtime.scheduler.adaptivebatch.BatchJobRecoveryContext
        public void updateTopology(List<ExecutionJobVertex> list) {
            AdaptiveBatchScheduler.this.updateTopology(list);
        }

        @Override // org.apache.flink.runtime.scheduler.adaptivebatch.BatchJobRecoveryContext
        public void onRecoveringFinished(Set<JobVertexID> set) {
            AdaptiveBatchScheduler.this.jobVerticesWithUnRecoveredCoordinators = new HashSet(set);
            AdaptiveBatchScheduler.this.tryComputeSourceParallelismThenRunAsync((r3, th) -> {
                AdaptiveBatchScheduler.this.schedulingStrategy.scheduleAllVerticesIfPossible();
            });
        }

        @Override // org.apache.flink.runtime.scheduler.adaptivebatch.BatchJobRecoveryContext
        public void onRecoveringFailed() {
            AdaptiveBatchScheduler.this.initializeVerticesIfPossible();
            AdaptiveBatchScheduler.this.handleGlobalFailure(new FlinkRuntimeException("Recover failed from JM failover, fail global."));
        }

        @Override // org.apache.flink.runtime.scheduler.adaptivebatch.BatchJobRecoveryContext
        public void failJob(Throwable th, long j, CompletableFuture<Map<String, String>> completableFuture) {
            AdaptiveBatchScheduler.this.failJob(th, j, completableFuture);
        }
    }

    public AdaptiveBatchScheduler(Logger logger, JobGraph jobGraph, Executor executor, Configuration configuration, Consumer<ComponentMainThreadExecutor> consumer, ScheduledExecutor scheduledExecutor, ClassLoader classLoader, CheckpointsCleaner checkpointsCleaner, CheckpointRecoveryFactory checkpointRecoveryFactory, JobManagerJobMetricGroup jobManagerJobMetricGroup, SchedulingStrategyFactory schedulingStrategyFactory, FailoverStrategy.Factory factory, RestartBackoffTimeStrategy restartBackoffTimeStrategy, ExecutionOperations executionOperations, ExecutionVertexVersioner executionVertexVersioner, ExecutionSlotAllocatorFactory executionSlotAllocatorFactory, long j, ComponentMainThreadExecutor componentMainThreadExecutor, JobStatusListener jobStatusListener, Collection<FailureEnricher> collection, ExecutionGraphFactory executionGraphFactory, ShuffleMaster<?> shuffleMaster, Time time, VertexParallelismAndInputInfosDecider vertexParallelismAndInputInfosDecider, int i, BlocklistOperations blocklistOperations, JobManagerOptions.HybridPartitionDataConsumeConstraint hybridPartitionDataConsumeConstraint, Map<JobVertexID, ForwardGroup> map, BatchJobRecoveryHandler batchJobRecoveryHandler) throws Exception {
        super(logger, jobGraph, executor, configuration, consumer, scheduledExecutor, classLoader, checkpointsCleaner, checkpointRecoveryFactory, jobManagerJobMetricGroup, schedulingStrategyFactory, factory, restartBackoffTimeStrategy, executionOperations, executionVertexVersioner, executionSlotAllocatorFactory, j, componentMainThreadExecutor, jobStatusListener, collection, executionGraphFactory, shuffleMaster, time, computeVertexParallelismStoreForDynamicGraph(jobGraph.getVertices(), i), new DefaultExecutionDeployer.Factory());
        this.jobVerticesWithUnRecoveredCoordinators = new HashSet();
        this.logicalTopology = DefaultLogicalTopology.fromJobGraph(jobGraph);
        this.vertexParallelismAndInputInfosDecider = (VertexParallelismAndInputInfosDecider) Preconditions.checkNotNull(vertexParallelismAndInputInfosDecider);
        this.forwardGroupsByJobVertexId = (Map) Preconditions.checkNotNull(map);
        this.blockingResultInfos = new HashMap();
        this.hybridPartitionDataConsumeConstraint = hybridPartitionDataConsumeConstraint;
        this.sourceParallelismFuturesByJobVertexId = new HashMap();
        this.speculativeExecutionHandler = createSpeculativeExecutionHandler(logger, configuration, executionVertexVersioner, blocklistOperations);
        this.jobRecoveryHandler = batchJobRecoveryHandler;
    }

    private SpeculativeExecutionHandler createSpeculativeExecutionHandler(Logger logger, Configuration configuration, ExecutionVertexVersioner executionVertexVersioner, BlocklistOperations blocklistOperations) {
        return ((Boolean) configuration.get(BatchExecutionOptions.SPECULATIVE_ENABLED)).booleanValue() ? new DefaultSpeculativeExecutionHandler(configuration, blocklistOperations, this::getExecutionVertex, () -> {
            return getExecutionGraph().getRegisteredExecutions();
        }, (list, collection) -> {
            this.executionDeployer.allocateSlotsAndDeploy(list, executionVertexVersioner.getExecutionVertexVersions(collection));
        }, logger) : new DummySpeculativeExecutionHandler();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.runtime.scheduler.DefaultScheduler, org.apache.flink.runtime.scheduler.SchedulerBase
    public void startSchedulingInternal() {
        this.speculativeExecutionHandler.init(getExecutionGraph(), getMainThreadExecutor(), this.jobManagerJobMetricGroup);
        this.jobRecoveryHandler.initialize(new DefaultBatchJobRecoveryContext());
        if (this.jobRecoveryHandler.needRecover()) {
            this.jobRecoveryHandler.startRecovering();
        } else {
            tryComputeSourceParallelismThenRunAsync((r4, th) -> {
                if (getExecutionGraph().getState() == JobStatus.CREATED) {
                    initializeVerticesIfPossible();
                    super.startSchedulingInternal();
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.runtime.scheduler.DefaultScheduler
    public void maybeRestartTasks(FailureHandlingResult failureHandlingResult) {
        FailureHandlingResult failureHandlingResult2 = failureHandlingResult;
        if (failureHandlingResult.canRestart()) {
            Set<ExecutionVertexID> verticesToRestart = failureHandlingResult.getVerticesToRestart();
            Stream<R> map = verticesToRestart.stream().map((v0) -> {
                return v0.getJobVertexId();
            });
            Set<JobVertexID> set = this.jobVerticesWithUnRecoveredCoordinators;
            set.getClass();
            Set set2 = (Set) map.filter((v1) -> {
                return r1.contains(v1);
            }).collect(Collectors.toSet());
            this.jobVerticesWithUnRecoveredCoordinators.removeAll(set2);
            Set set3 = (Set) set2.stream().flatMap(jobVertexID -> {
                return Arrays.stream(getExecutionJobVertex(jobVertexID).getTaskVertices()).map((v0) -> {
                    return v0.getID();
                });
            }).collect(Collectors.toSet());
            set3.addAll(verticesToRestart);
            failureHandlingResult2 = FailureHandlingResult.restartable(failureHandlingResult.getFailedExecution().orElse(null), failureHandlingResult.getError(), failureHandlingResult.getTimestamp(), failureHandlingResult.getFailureLabels(), set3, failureHandlingResult.getRestartDelayMS(), failureHandlingResult.isGlobalFailure(), failureHandlingResult.isRootCause());
        }
        super.maybeRestartTasks(failureHandlingResult2);
    }

    @VisibleForTesting
    boolean isRecovering() {
        return this.jobRecoveryHandler.isRecovering();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.runtime.scheduler.SchedulerBase
    public void resetForNewExecutions(Collection<ExecutionVertexID> collection) {
        super.resetForNewExecutions(collection);
        if (isRecovering()) {
            return;
        }
        this.jobRecoveryHandler.onExecutionVertexReset(collection);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void initializeJobVertex(ExecutionJobVertex executionJobVertex, int i, Map<IntermediateDataSetID, JobVertexInputInfo> map, long j) throws JobException {
        if (executionJobVertex.isParallelismDecided()) {
            Preconditions.checkState(i == executionJobVertex.getParallelism());
        } else {
            changeJobVertexParallelism(executionJobVertex, i);
        }
        Preconditions.checkState(canInitialize(executionJobVertex));
        getExecutionGraph().initializeJobVertex(executionJobVertex, j, map);
        if (isRecovering()) {
            return;
        }
        this.jobRecoveryHandler.onExecutionJobVertexInitialization(executionJobVertex.getJobVertex().getID(), i, map);
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerBase, org.apache.flink.util.AutoCloseableAsync
    public CompletableFuture<Void> closeAsync() {
        this.jobRecoveryHandler.stop(requestJobStatus().isGloballyTerminalState());
        this.speculativeExecutionHandler.stopSlowTaskDetector();
        return super.closeAsync();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.runtime.scheduler.DefaultScheduler, org.apache.flink.runtime.scheduler.SchedulerBase
    public void onTaskFinished(Execution execution, IOMetrics iOMetrics) {
        this.speculativeExecutionHandler.notifyTaskFinished(execution, this::cancelPendingExecutions);
        if (!isRecovering()) {
            this.jobRecoveryHandler.onExecutionFinished(execution.getVertex().getID());
        }
        Preconditions.checkNotNull(iOMetrics);
        updateResultPartitionBytesMetrics(iOMetrics.getResultPartitionBytes());
        ExecutionVertexVersion executionVertexVersion = this.executionVertexVersioner.getExecutionVertexVersion(execution.getVertex().getID());
        tryComputeSourceParallelismThenRunAsync((r8, th) -> {
            if (this.executionVertexVersioner.isModified(executionVertexVersion)) {
                this.log.debug("Initialization of vertices will be skipped, because the execution vertex version has been modified.");
            } else {
                initializeVerticesIfPossible();
                super.onTaskFinished(execution, iOMetrics);
            }
        });
    }

    private CompletableFuture<?> cancelPendingExecutions(ExecutionVertexID executionVertexID) {
        List list = (List) getExecutionVertex(executionVertexID).getCurrentExecutions().stream().filter(execution -> {
            return (execution.getState().isTerminal() || execution.getState() == ExecutionState.CANCELING) ? false : true;
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            return CompletableFuture.completedFuture(null);
        }
        this.log.info("Canceling {} un-finished executions of {} because one of its executions has finished.", Integer.valueOf(list.size()), executionVertexID);
        FutureUtils.ConjunctFuture combineAll = FutureUtils.combineAll((Collection) list.stream().map(this::cancelExecution).collect(Collectors.toList()));
        cancelAllPendingSlotRequestsForVertex(executionVertexID);
        return combineAll;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.runtime.scheduler.DefaultScheduler, org.apache.flink.runtime.scheduler.SchedulerBase
    public void onTaskFailed(Execution execution) {
        this.speculativeExecutionHandler.notifyTaskFailed(execution);
        super.onTaskFailed(execution);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.runtime.scheduler.DefaultScheduler
    public void handleTaskFailure(Execution execution, @Nullable Throwable th) {
        if (this.speculativeExecutionHandler.handleTaskFailure(execution, th, this::handleLocalExecutionAttemptFailure)) {
            return;
        }
        super.handleTaskFailure(execution, th);
    }

    private void handleLocalExecutionAttemptFailure(Execution execution, @Nullable Throwable th) {
        this.executionSlotAllocator.cancel(execution.getAttemptId());
        FailureHandlingResult recordTaskFailure = recordTaskFailure(execution, th);
        if (recordTaskFailure.canRestart()) {
            archiveFromFailureHandlingResult(createFailureHandlingResultSnapshot(recordTaskFailure));
        } else {
            failJob(th, recordTaskFailure.getTimestamp(), recordTaskFailure.getFailureLabels());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateResultPartitionBytesMetrics(Map<IntermediateResultPartitionID, ResultPartitionBytes> map) {
        Preconditions.checkNotNull(map);
        map.forEach((intermediateResultPartitionID, resultPartitionBytes) -> {
            IntermediateResult intermediateResult = getExecutionGraph().getAllIntermediateResults().get(intermediateResultPartitionID.getIntermediateDataSetID());
            Preconditions.checkNotNull(intermediateResult);
            this.blockingResultInfos.compute(intermediateResult.getId(), (intermediateDataSetID, blockingResultInfo) -> {
                if (blockingResultInfo == null) {
                    blockingResultInfo = createFromIntermediateResult(intermediateResult);
                }
                blockingResultInfo.recordPartitionInfo(intermediateResultPartitionID.getPartitionNumber(), resultPartitionBytes);
                return blockingResultInfo;
            });
        });
    }

    @Override // org.apache.flink.runtime.scheduler.DefaultScheduler, org.apache.flink.runtime.scheduler.SchedulerOperations
    public void allocateSlotsAndDeploy(List<ExecutionVertexID> list) {
        enrichInputBytesForExecutionVertices((List) list.stream().map(this::getExecutionVertex).collect(Collectors.toList()));
        super.allocateSlotsAndDeploy(list);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.runtime.scheduler.SchedulerBase
    public void resetForNewExecution(ExecutionVertexID executionVertexID) {
        this.speculativeExecutionHandler.resetForNewExecution(executionVertexID);
        ExecutionVertex executionVertex = getExecutionVertex(executionVertexID);
        if (executionVertex.getExecutionState() == ExecutionState.FINISHED) {
            executionVertex.getProducedPartitions().values().forEach(intermediateResultPartition -> {
                this.blockingResultInfos.computeIfPresent(intermediateResultPartition.getIntermediateResult().getId(), (intermediateDataSetID, blockingResultInfo) -> {
                    blockingResultInfo.resetPartitionInfo(intermediateResultPartition.getPartitionNumber());
                    return blockingResultInfo;
                });
            });
        }
        super.resetForNewExecution(executionVertexID);
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerBase
    protected MarkPartitionFinishedStrategy getMarkPartitionFinishedStrategy() {
        return resultPartitionType -> {
            return resultPartitionType.isBlockingOrBlockingPersistentResultPartition() || this.hybridPartitionDataConsumeConstraint.isOnlyConsumeFinishedPartition();
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void tryComputeSourceParallelismThenRunAsync(BiConsumer<Void, Throwable> biConsumer) {
        FutureUtils.waitForAll(computeDynamicSourceParallelism()).whenCompleteAsync((BiConsumer<? super Void, ? super Throwable>) biConsumer, (Executor) getMainThreadExecutor()).exceptionally(th -> {
            this.log.error("An unexpected error occurred while scheduling.", th);
            handleGlobalFailure(new SuppressRestartsException(th));
            return null;
        });
    }

    public List<CompletableFuture<Integer>> computeDynamicSourceParallelism() {
        ArrayList arrayList = new ArrayList();
        for (ExecutionJobVertex executionJobVertex : getExecutionGraph().getVerticesTopologically()) {
            List<SourceCoordinator<?, ?>> sourceCoordinators = executionJobVertex.getSourceCoordinators();
            if (!sourceCoordinators.isEmpty() && !executionJobVertex.isParallelismDecided()) {
                if (this.sourceParallelismFuturesByJobVertexId.containsKey(executionJobVertex.getJobVertexId())) {
                    arrayList.add(this.sourceParallelismFuturesByJobVertexId.get(executionJobVertex.getJobVertexId()));
                } else if (tryGetConsumedResultsInfo(executionJobVertex).isPresent()) {
                    CompletableFuture<Integer> mergeDynamicParallelismFutures = mergeDynamicParallelismFutures((List) sourceCoordinators.stream().map(sourceCoordinator -> {
                        return sourceCoordinator.inferSourceParallelismAsync(this.vertexParallelismAndInputInfosDecider.computeSourceParallelismUpperBound(executionJobVertex.getJobVertexId(), executionJobVertex.getMaxParallelism()), this.vertexParallelismAndInputInfosDecider.getDataVolumePerTask());
                    }).collect(Collectors.toList()));
                    this.sourceParallelismFuturesByJobVertexId.put(executionJobVertex.getJobVertexId(), mergeDynamicParallelismFutures);
                    arrayList.add(mergeDynamicParallelismFutures);
                }
            }
        }
        return arrayList;
    }

    @VisibleForTesting
    static CompletableFuture<Integer> mergeDynamicParallelismFutures(List<CompletableFuture<Integer>> list) {
        return list.stream().reduce(CompletableFuture.completedFuture(-1), (completableFuture, completableFuture2) -> {
            return completableFuture.thenCombine((CompletionStage) completableFuture2, (v0, v1) -> {
                return Math.max(v0, v1);
            });
        });
    }

    @VisibleForTesting
    public void initializeVerticesIfPossible() {
        ArrayList arrayList = new ArrayList();
        try {
            long currentTimeMillis = System.currentTimeMillis();
            for (ExecutionJobVertex executionJobVertex : getExecutionGraph().getVerticesTopologically()) {
                if (!executionJobVertex.isInitialized()) {
                    if (canInitialize(executionJobVertex)) {
                        int parallelism = executionJobVertex.getParallelism();
                        Map<IntermediateDataSetID, IntermediateResult> allIntermediateResults = getExecutionGraph().getAllIntermediateResults();
                        allIntermediateResults.getClass();
                        initializeJobVertex(executionJobVertex, parallelism, VertexInputInfoComputationUtils.computeVertexInputInfos(executionJobVertex, (v1) -> {
                            return r4.get(v1);
                        }), currentTimeMillis);
                        arrayList.add(executionJobVertex);
                    } else {
                        Optional<List<BlockingResultInfo>> tryGetConsumedResultsInfo = tryGetConsumedResultsInfo(executionJobVertex);
                        if (tryGetConsumedResultsInfo.isPresent()) {
                            ParallelismAndInputInfos tryDecideParallelismAndInputInfos = tryDecideParallelismAndInputInfos(executionJobVertex, tryGetConsumedResultsInfo.get());
                            initializeJobVertex(executionJobVertex, tryDecideParallelismAndInputInfos.getParallelism(), tryDecideParallelismAndInputInfos.getJobVertexInputInfos(), currentTimeMillis);
                            arrayList.add(executionJobVertex);
                        }
                    }
                }
            }
        } catch (JobException e) {
            this.log.error("Unexpected error occurred when initializing ExecutionJobVertex", e);
            handleGlobalFailure(new SuppressRestartsException(e));
        }
        if (arrayList.size() > 0) {
            updateTopology(arrayList);
        }
    }

    private ParallelismAndInputInfos tryDecideParallelismAndInputInfos(ExecutionJobVertex executionJobVertex, List<BlockingResultInfo> list) {
        int parallelism = executionJobVertex.getParallelism();
        ForwardGroup forwardGroup = this.forwardGroupsByJobVertexId.get(executionJobVertex.getJobVertexId());
        if (!executionJobVertex.isParallelismDecided() && forwardGroup != null) {
            Preconditions.checkState(!forwardGroup.isParallelismDecided());
        }
        int i = -1;
        if (this.sourceParallelismFuturesByJobVertexId.containsKey(executionJobVertex.getJobVertexId())) {
            int dynamicSourceParallelism = getDynamicSourceParallelism(executionJobVertex);
            if (list.isEmpty()) {
                parallelism = dynamicSourceParallelism;
            } else {
                i = dynamicSourceParallelism;
            }
        }
        ParallelismAndInputInfos decideParallelismAndInputInfosForVertex = this.vertexParallelismAndInputInfosDecider.decideParallelismAndInputInfosForVertex(executionJobVertex.getJobVertexId(), list, parallelism, i, executionJobVertex.getMaxParallelism());
        if (parallelism == -1) {
            this.log.info("Parallelism of JobVertex: {} ({}) is decided to be {}.", new Object[]{executionJobVertex.getName(), executionJobVertex.getJobVertexId(), Integer.valueOf(decideParallelismAndInputInfosForVertex.getParallelism())});
        } else {
            Preconditions.checkState(decideParallelismAndInputInfosForVertex.getParallelism() == parallelism);
        }
        if (forwardGroup != null && !forwardGroup.isParallelismDecided()) {
            forwardGroup.setParallelism(decideParallelismAndInputInfosForVertex.getParallelism());
            Iterator<JobVertexID> it = forwardGroup.getJobVertexIds().iterator();
            while (it.hasNext()) {
                ExecutionJobVertex executionJobVertex2 = getExecutionJobVertex(it.next());
                if (executionJobVertex2.isParallelismDecided()) {
                    Preconditions.checkState(decideParallelismAndInputInfosForVertex.getParallelism() == executionJobVertex2.getParallelism());
                } else {
                    this.log.info("Parallelism of JobVertex: {} ({}) is decided to be {} according to forward group's parallelism.", new Object[]{executionJobVertex2.getName(), executionJobVertex2.getJobVertexId(), Integer.valueOf(decideParallelismAndInputInfosForVertex.getParallelism())});
                    changeJobVertexParallelism(executionJobVertex2, decideParallelismAndInputInfosForVertex.getParallelism());
                }
            }
        }
        return decideParallelismAndInputInfosForVertex;
    }

    private int getDynamicSourceParallelism(ExecutionJobVertex executionJobVertex) {
        CompletableFuture<Integer> completableFuture = this.sourceParallelismFuturesByJobVertexId.get(executionJobVertex.getJobVertexId());
        int i = -1;
        if (completableFuture != null) {
            i = completableFuture.join().intValue();
            int maxParallelism = executionJobVertex.getMaxParallelism();
            if (i > maxParallelism) {
                this.log.info("The dynamic inferred source parallelism {} is larger than the maximum parallelism {}. Use {} as the upper bound parallelism of source job vertex {}.", new Object[]{Integer.valueOf(i), Integer.valueOf(maxParallelism), Integer.valueOf(maxParallelism), executionJobVertex.getJobVertexId()});
                i = maxParallelism;
            } else if (i > 0) {
                this.log.info("Parallelism of JobVertex: {} ({}) is decided to be {} according to dynamic source parallelism inference.", new Object[]{executionJobVertex.getName(), executionJobVertex.getJobVertexId(), Integer.valueOf(i)});
            } else {
                i = -1;
            }
        }
        return i;
    }

    private void enrichInputBytesForExecutionVertices(List<ExecutionVertex> list) {
        for (ExecutionVertex executionVertex : list) {
            List<IntermediateResult> inputs = executionVertex.getJobVertex().getInputs();
            boolean anyMatch = inputs.stream().anyMatch(intermediateResult -> {
                return intermediateResult.getResultType() == ResultPartitionType.HYBRID_FULL || intermediateResult.getResultType() == ResultPartitionType.HYBRID_SELECTIVE;
            });
            if (!inputs.isEmpty() && !anyMatch) {
                long j = 0;
                for (IntermediateResult intermediateResult2 : inputs) {
                    ExecutionVertexInputInfo executionVertexInputInfo = executionVertex.getExecutionVertexInputInfo(intermediateResult2.getId());
                    j += ((BlockingResultInfo) Preconditions.checkNotNull(getBlockingResultInfo(intermediateResult2.getId()))).getNumBytesProduced(executionVertexInputInfo.getPartitionIndexRange(), executionVertexInputInfo.getSubpartitionIndexRange());
                }
                executionVertex.setInputBytes(j);
            }
        }
    }

    private void changeJobVertexParallelism(ExecutionJobVertex executionJobVertex, int i) {
        if (executionJobVertex.isParallelismDecided()) {
            return;
        }
        executionJobVertex.getJobVertex().setDynamicParallelism(i);
        try {
            getExecutionGraph().setJsonPlan(JsonPlanGenerator.generatePlan(getJobGraph()));
        } catch (Throwable th) {
            this.log.warn("Cannot create JSON plan for job", th);
            getExecutionGraph().setJsonPlan("{}");
        }
        executionJobVertex.setParallelism(i);
    }

    private Optional<List<BlockingResultInfo>> tryGetConsumedResultsInfo(ExecutionJobVertex executionJobVertex) {
        ArrayList arrayList = new ArrayList();
        Iterator<? extends LogicalResult> it = this.logicalTopology.getVertex(executionJobVertex.getJobVertexId()).getConsumedResults().iterator();
        while (it.hasNext()) {
            DefaultLogicalResult defaultLogicalResult = (DefaultLogicalResult) it.next();
            if (!getExecutionJobVertex(defaultLogicalResult.getProducer2().getId()).isFinished()) {
                return Optional.empty();
            }
            arrayList.add((BlockingResultInfo) Preconditions.checkNotNull(this.blockingResultInfos.get(defaultLogicalResult.getId())));
        }
        return Optional.of(arrayList);
    }

    private boolean canInitialize(ExecutionJobVertex executionJobVertex) {
        if (executionJobVertex.isInitialized() || !executionJobVertex.isParallelismDecided()) {
            return false;
        }
        Iterator<JobEdge> it = executionJobVertex.getJobVertex().getInputs().iterator();
        while (it.hasNext()) {
            ExecutionJobVertex jobVertex = getExecutionGraph().getJobVertex(it.next().getSource().getProducer().getID());
            Preconditions.checkNotNull(jobVertex);
            if (!jobVertex.isInitialized()) {
                return false;
            }
        }
        return true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateTopology(List<ExecutionJobVertex> list) {
        Iterator<ExecutionJobVertex> it = list.iterator();
        while (it.hasNext()) {
            initializeOperatorCoordinatorsFor(it.next());
        }
        getExecutionGraph().notifyNewlyInitializedJobVertices(list);
    }

    private void initializeOperatorCoordinatorsFor(ExecutionJobVertex executionJobVertex) {
        this.operatorCoordinatorHandler.registerAndStartNewCoordinators(executionJobVertex.getOperatorCoordinators(), getMainThreadExecutor(), executionJobVertex.getParallelism());
    }

    @VisibleForTesting
    public static VertexParallelismStore computeVertexParallelismStoreForDynamicGraph(Iterable<JobVertex> iterable, int i) {
        resetDynamicParallelism(iterable);
        return computeVertexParallelismStore(iterable, jobVertex -> {
            return jobVertex.getParallelism() > 0 ? Integer.valueOf(getDefaultMaxParallelism(jobVertex)) : Integer.valueOf(i);
        }, Function.identity());
    }

    private static void resetDynamicParallelism(Iterable<JobVertex> iterable) {
        for (JobVertex jobVertex : iterable) {
            if (jobVertex.isDynamicParallelism()) {
                jobVertex.setParallelism(-1);
            }
        }
    }

    private static BlockingResultInfo createFromIntermediateResult(IntermediateResult intermediateResult) {
        Preconditions.checkArgument(intermediateResult != null);
        return intermediateResult.getConsumingDistributionPattern() == DistributionPattern.POINTWISE ? new PointwiseBlockingResultInfo(intermediateResult.getId(), intermediateResult.getNumberOfAssignedPartitions(), intermediateResult.getPartitions()[0].getNumberOfSubpartitions()) : new AllToAllBlockingResultInfo(intermediateResult.getId(), intermediateResult.getNumberOfAssignedPartitions(), intermediateResult.getPartitions()[0].getNumberOfSubpartitions(), intermediateResult.isBroadcast());
    }

    @VisibleForTesting
    BlockingResultInfo getBlockingResultInfo(IntermediateDataSetID intermediateDataSetID) {
        return this.blockingResultInfos.get(intermediateDataSetID);
    }

    @VisibleForTesting
    SpeculativeExecutionHandler getSpeculativeExecutionHandler() {
        return this.speculativeExecutionHandler;
    }
}
