package org.apache.flink.runtime.executiongraph.failover;

import java.util.List;
import java.util.concurrent.Executor;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.concurrent.AcceptFunction;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/failover/RestartIndividualStrategy.class */
public class RestartIndividualStrategy extends FailoverStrategy {
    private static final Logger LOG = LoggerFactory.getLogger(RestartIndividualStrategy.class);
    private final ExecutionGraph executionGraph;
    private final Executor callbackExecutor;
    private final SimpleCounter numTaskFailures;

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/failover/RestartIndividualStrategy$Factory.class */
    public static class Factory implements FailoverStrategy.Factory {
        @Override // org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory
        public RestartIndividualStrategy create(ExecutionGraph executionGraph) {
            return new RestartIndividualStrategy(executionGraph);
        }
    }

    public RestartIndividualStrategy(ExecutionGraph executionGraph) {
        this(executionGraph, executionGraph.getFutureExecutor());
    }

    public RestartIndividualStrategy(ExecutionGraph executionGraph, Executor executor) {
        this.executionGraph = (ExecutionGraph) Preconditions.checkNotNull(executionGraph);
        this.callbackExecutor = (Executor) Preconditions.checkNotNull(executor);
        this.numTaskFailures = new SimpleCounter();
    }

    @Override // org.apache.flink.runtime.executiongraph.failover.FailoverStrategy
    public void onTaskFailure(Execution execution, Throwable th) {
        if (th instanceof NoResourceAvailableException) {
            LOG.info("Not enough resources to schedule {} - triggering full recovery.", execution);
            this.executionGraph.failGlobal(th);
            return;
        }
        LOG.info("Recovering task failure for {} (#{}) via individual restart.", execution.getVertex().getTaskNameWithSubtaskIndex(), Integer.valueOf(execution.getAttemptNumber()));
        this.numTaskFailures.inc();
        Future<ExecutionState> terminationFuture = execution.getTerminationFuture();
        final ExecutionVertex vertex = execution.getVertex();
        final long globalModVersion = execution.getGlobalModVersion();
        terminationFuture.thenAcceptAsync(new AcceptFunction<ExecutionState>() { // from class: org.apache.flink.runtime.executiongraph.failover.RestartIndividualStrategy.1
            @Override // org.apache.flink.runtime.concurrent.AcceptFunction
            public void accept(ExecutionState executionState) {
                try {
                    vertex.resetForNewExecution(System.currentTimeMillis(), globalModVersion).scheduleForExecution();
                } catch (GlobalModVersionMismatch e) {
                } catch (Exception e2) {
                    RestartIndividualStrategy.this.executionGraph.failGlobal(new Exception("Error during fine grained recovery - triggering full recovery", e2));
                }
            }
        }, this.callbackExecutor);
    }

    @Override // org.apache.flink.runtime.executiongraph.failover.FailoverStrategy
    public void notifyNewVertices(List<ExecutionJobVertex> list) {
        for (ExecutionJobVertex executionJobVertex : list) {
            List<IntermediateResult> inputs = executionJobVertex.getInputs();
            IntermediateResult[] producedDataSets = executionJobVertex.getProducedDataSets();
            if ((inputs != null && inputs.size() > 0) || (producedDataSets != null && producedDataSets.length > 0)) {
                throw new FlinkRuntimeException("Incompatible failover strategy - strategy '" + getStrategyName() + "' can only handle jobs with only disconnected tasks.");
            }
        }
    }

    @Override // org.apache.flink.runtime.executiongraph.failover.FailoverStrategy
    public String getStrategyName() {
        return "Individual Task Restart";
    }

    @Override // org.apache.flink.runtime.executiongraph.failover.FailoverStrategy
    public void registerMetrics(MetricGroup metricGroup) {
        metricGroup.counter("task_failures", (String) this.numTaskFailures);
    }
}
