package org.apache.flink.runtime.scheduler.adaptive;

import java.time.Duration;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
import org.apache.flink.runtime.scheduler.adaptive.StateWithExecutionGraph;
import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationManager;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/Executing.class */
public class Executing extends StateWithExecutionGraph implements ResourceConsumer {
    private final Context context;
    private final ClassLoader userCodeClassLoader;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/Executing$Context.class */
    public interface Context extends StateWithExecutionGraph.Context {
        void goToCanceling(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler);

        FailureResult howToHandleFailure(Throwable th);

        boolean canScaleUp(ExecutionGraph executionGraph);

        void goToRestarting(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, Duration duration);

        void goToFailing(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, Throwable th);

        CompletableFuture<String> goToStopWithSavepoint(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, CheckpointScheduling checkpointScheduling, CompletableFuture<String> completableFuture);

        ScheduledFuture<?> runIfState(State state, Runnable runnable, Duration duration);
    }

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/Executing$Factory.class */
    static class Factory implements StateFactory<Executing> {
        private final Context context;
        private final Logger log;
        private final ExecutionGraph executionGraph;
        private final ExecutionGraphHandler executionGraphHandler;
        private final OperatorCoordinatorHandler operatorCoordinatorHandler;
        private final ClassLoader userCodeClassLoader;

        /* JADX INFO: Access modifiers changed from: package-private */
        public Factory(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, Logger logger, Context context, ClassLoader classLoader) {
            this.context = context;
            this.log = logger;
            this.executionGraph = executionGraph;
            this.executionGraphHandler = executionGraphHandler;
            this.operatorCoordinatorHandler = operatorCoordinatorHandler;
            this.userCodeClassLoader = classLoader;
        }

        @Override // org.apache.flink.runtime.scheduler.adaptive.StateFactory
        public Class<Executing> getStateClass() {
            return Executing.class;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.flink.runtime.scheduler.adaptive.StateFactory
        public Executing getState() {
            return new Executing(this.executionGraph, this.executionGraphHandler, this.operatorCoordinatorHandler, this.log, this.context, this.userCodeClassLoader);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/Executing$FailureResult.class */
    public static final class FailureResult {

        @Nullable
        private final Duration backoffTime;
        private final Throwable failureCause;

        private FailureResult(Throwable th, @Nullable Duration duration) {
            this.backoffTime = duration;
            this.failureCause = th;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public boolean canRestart() {
            return this.backoffTime != null;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Duration getBackoffTime() {
            Preconditions.checkState(canRestart(), "Failure result must be restartable to return a backoff time.");
            return this.backoffTime;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Throwable getFailureCause() {
            return this.failureCause;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static FailureResult canRestart(Throwable th, Duration duration) {
            return new FailureResult(th, duration);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static FailureResult canNotRestart(Throwable th) {
            return new FailureResult(th, null);
        }
    }

    Executing(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, Logger logger, Context context, ClassLoader classLoader) {
        super(context, executionGraph, executionGraphHandler, operatorCoordinatorHandler, logger);
        this.context = context;
        this.userCodeClassLoader = classLoader;
        Preconditions.checkState(executionGraph.getState() == JobStatus.RUNNING, "Assuming running execution graph");
        deploy();
        context.runIfState(this, this::notifyNewResourcesAvailable, Duration.ZERO);
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.State
    public JobStatus getJobStatus() {
        return JobStatus.RUNNING;
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.State
    public void cancel() {
        this.context.goToCanceling(getExecutionGraph(), getExecutionGraphHandler(), getOperatorCoordinatorHandler());
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.State
    public void handleGlobalFailure(Throwable th) {
        handleAnyFailure(th);
    }

    private void handleAnyFailure(Throwable th) {
        FailureResult howToHandleFailure = this.context.howToHandleFailure(th);
        if (howToHandleFailure.canRestart()) {
            getLogger().info("Restarting job.", howToHandleFailure.getFailureCause());
            this.context.goToRestarting(getExecutionGraph(), getExecutionGraphHandler(), getOperatorCoordinatorHandler(), howToHandleFailure.getBackoffTime());
        } else {
            getLogger().info("Failing job.", howToHandleFailure.getFailureCause());
            this.context.goToFailing(getExecutionGraph(), getExecutionGraphHandler(), getOperatorCoordinatorHandler(), howToHandleFailure.getFailureCause());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flink.runtime.scheduler.adaptive.StateWithExecutionGraph
    public boolean updateTaskExecutionState(TaskExecutionStateTransition taskExecutionStateTransition) {
        boolean updateState = getExecutionGraph().updateState(taskExecutionStateTransition);
        if (updateState && taskExecutionStateTransition.getExecutionState() == ExecutionState.FAILED) {
            Throwable error = taskExecutionStateTransition.getError(this.userCodeClassLoader);
            handleAnyFailure(error == null ? new FlinkException("Unknown failure cause. Probably related to FLINK-21376.") : error);
        }
        return updateState;
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.StateWithExecutionGraph
    void onGloballyTerminalState(JobStatus jobStatus) {
        this.context.goToFinished(ArchivedExecutionGraph.createFrom(getExecutionGraph()));
    }

    private void deploy() {
        Iterator<ExecutionJobVertex> it = getExecutionGraph().getVerticesTopologically().iterator();
        while (it.hasNext()) {
            for (ExecutionVertex executionVertex : it.next().getTaskVertices()) {
                if (executionVertex.getExecutionState() == ExecutionState.CREATED || executionVertex.getExecutionState() == ExecutionState.SCHEDULED) {
                    deploySafely(executionVertex);
                }
            }
        }
    }

    private void deploySafely(ExecutionVertex executionVertex) {
        try {
            executionVertex.deploy();
        } catch (JobException e) {
            handleDeploymentFailure(executionVertex, e);
        }
    }

    private void handleDeploymentFailure(ExecutionVertex executionVertex, JobException jobException) {
        executionVertex.markFailed(jobException);
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.ResourceConsumer
    public void notifyNewResourcesAvailable() {
        if (this.context.canScaleUp(getExecutionGraph())) {
            getLogger().info("New resources are available. Restarting job to scale up.");
            this.context.goToRestarting(getExecutionGraph(), getExecutionGraphHandler(), getOperatorCoordinatorHandler(), Duration.ofMillis(0L));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<String> stopWithSavepoint(@Nullable String str, boolean z) {
        ExecutionGraph executionGraph = getExecutionGraph();
        StopWithSavepointTerminationManager.checkStopWithSavepointPreconditions(executionGraph.getCheckpointCoordinator(), str, executionGraph.getJobID(), getLogger());
        getLogger().info("Triggering stop-with-savepoint for job {}.", executionGraph.getJobID());
        CheckpointScheduling checkpointSchedulingProvider = new CheckpointSchedulingProvider(executionGraph);
        checkpointSchedulingProvider.stopCheckpointScheduler();
        return this.context.goToStopWithSavepoint(executionGraph, getExecutionGraphHandler(), getOperatorCoordinatorHandler(), checkpointSchedulingProvider, executionGraph.getCheckpointCoordinator().triggerSynchronousSavepoint(z, str).thenApply((v0) -> {
            return v0.getExternalPointer();
        }));
    }
}
