package org.apache.flink.runtime.executiongraph;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.class */
public class ExecutionGraphBuilder {
    public static ExecutionGraph buildGraph(@Nullable ExecutionGraph executionGraph, JobGraph jobGraph, Configuration configuration, Executor executor, Executor executor2, ClassLoader classLoader, CheckpointRecoveryFactory checkpointRecoveryFactory, Time time, RestartStrategy restartStrategy, MetricGroup metricGroup, int i, Logger logger) throws JobExecutionException, JobException {
        ExecutionGraph executionGraph2;
        Preconditions.checkNotNull(jobGraph, "job graph cannot be null");
        String name = jobGraph.getName();
        JobID jobID = jobGraph.getJobID();
        if (executionGraph != null) {
            executionGraph2 = executionGraph;
        } else {
            try {
                executionGraph2 = new ExecutionGraph(executor, executor2, jobID, name, jobGraph.getJobConfiguration(), jobGraph.getSerializedExecutionConfig(), time, restartStrategy, jobGraph.getUserJarBlobKeys(), jobGraph.getClasspaths(), classLoader, metricGroup);
            } catch (IOException e) {
                throw new JobException("Could not create the execution graph.", e);
            }
        }
        ExecutionGraph executionGraph3 = executionGraph2;
        executionGraph3.setScheduleMode(jobGraph.getScheduleMode());
        executionGraph3.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling());
        try {
            executionGraph3.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));
        } catch (Throwable th) {
            logger.warn("Cannot create JSON plan for job", th);
            executionGraph3.setJsonPlan("{}");
        }
        long nanoTime = System.nanoTime();
        logger.info("Running initialization on master for job {} ({}).", name, jobID);
        for (JobVertex jobVertex : jobGraph.getVertices()) {
            String invokableClassName = jobVertex.getInvokableClassName();
            if (invokableClassName == null || invokableClassName.isEmpty()) {
                throw new JobSubmissionException(jobID, "The vertex " + jobVertex.getID() + " (" + jobVertex.getName() + ") has no invokable class.");
            }
            if (jobVertex.getParallelism() == Integer.MAX_VALUE) {
                jobVertex.setParallelism(i);
            }
            try {
                jobVertex.initializeOnMaster(classLoader);
            } catch (Throwable th2) {
                throw new JobExecutionException(jobID, "Cannot initialize task '" + jobVertex.getName() + "': " + th2.getMessage(), th2);
            }
        }
        logger.info("Successfully ran initialization on master in {} ms.", Long.valueOf((System.nanoTime() - nanoTime) / DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT));
        List<JobVertex> verticesSortedTopologicallyFromSources = jobGraph.getVerticesSortedTopologicallyFromSources();
        if (logger.isDebugEnabled()) {
            logger.debug("Adding {} vertices from job graph {} ({}).", new Object[]{Integer.valueOf(verticesSortedTopologicallyFromSources.size()), name, jobID});
        }
        executionGraph3.attachJobGraph(verticesSortedTopologicallyFromSources);
        if (logger.isDebugEnabled()) {
            logger.debug("Successfully created execution graph from job graph {} ({}).", name, jobID);
        }
        JobSnapshottingSettings snapshotSettings = jobGraph.getSnapshotSettings();
        if (snapshotSettings != null) {
            List<ExecutionJobVertex> idToVertex = idToVertex(snapshotSettings.getVerticesToTrigger(), executionGraph3);
            List<ExecutionJobVertex> idToVertex2 = idToVertex(snapshotSettings.getVerticesToAcknowledge(), executionGraph3);
            List<ExecutionJobVertex> idToVertex3 = idToVertex(snapshotSettings.getVerticesToConfirm(), executionGraph3);
            try {
                CompletedCheckpointStore createCheckpointStore = checkpointRecoveryFactory.createCheckpointStore(jobID, classLoader);
                executionGraph3.enableSnapshotCheckpointing(snapshotSettings.getCheckpointInterval(), snapshotSettings.getCheckpointTimeout(), snapshotSettings.getMinPauseBetweenCheckpoints(), snapshotSettings.getMaxConcurrentCheckpoints(), snapshotSettings.getExternalizedCheckpointSettings(), idToVertex, idToVertex2, idToVertex3, checkpointRecoveryFactory.createCheckpointIDCounter(jobID), createCheckpointStore, configuration.getString(ConfigConstants.CHECKPOINTS_DIRECTORY_KEY, (String) null), new CheckpointStatsTracker(configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE, 10), idToVertex2, snapshotSettings, metricGroup));
            } catch (Exception e2) {
                throw new JobExecutionException(jobID, "Failed to initialize high-availability checkpoint handler", e2);
            }
        }
        return executionGraph3;
    }

    private static List<ExecutionJobVertex> idToVertex(List<JobVertexID> list, ExecutionGraph executionGraph) throws IllegalArgumentException {
        ArrayList arrayList = new ArrayList(list.size());
        for (JobVertexID jobVertexID : list) {
            ExecutionJobVertex jobVertex = executionGraph.getJobVertex(jobVertexID);
            if (jobVertex == null) {
                throw new IllegalArgumentException("The snapshot checkpointing settings refer to non-existent vertex " + jobVertexID);
            }
            arrayList.add(jobVertex);
        }
        return arrayList;
    }

    private ExecutionGraphBuilder() {
    }
}
