package org.apache.flink.client.deployment.application.executors;

import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.time.Duration;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.cli.ClientOptions;
import org.apache.flink.client.deployment.executors.PipelineExecutorUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.FunctionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.class */
public class EmbeddedExecutor implements PipelineExecutor {
    private static final Logger LOG = LoggerFactory.getLogger(EmbeddedExecutor.class);
    public static final String NAME = "embedded";
    private final Collection<JobID> submittedJobIds;
    private final DispatcherGateway dispatcherGateway;
    private final EmbeddedJobClientCreator jobClientCreator;

    public EmbeddedExecutor(Collection<JobID> collection, DispatcherGateway dispatcherGateway, EmbeddedJobClientCreator embeddedJobClientCreator) {
        this.submittedJobIds = (Collection) Preconditions.checkNotNull(collection);
        this.dispatcherGateway = (DispatcherGateway) Preconditions.checkNotNull(dispatcherGateway);
        this.jobClientCreator = (EmbeddedJobClientCreator) Preconditions.checkNotNull(embeddedJobClientCreator);
    }

    @Override // org.apache.flink.core.execution.PipelineExecutor
    public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration, ClassLoader classLoader) throws MalformedURLException {
        Preconditions.checkNotNull(pipeline);
        Preconditions.checkNotNull(configuration);
        Optional map = configuration.getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID).map(JobID::fromHexString);
        return (map.isPresent() && this.submittedJobIds.contains(map.get())) ? getJobClientFuture((JobID) map.get(), classLoader) : submitAndGetJobClientFuture(pipeline, configuration, classLoader);
    }

    private CompletableFuture<JobClient> getJobClientFuture(JobID jobID, ClassLoader classLoader) {
        LOG.info("Job {} was recovered successfully.", jobID);
        return CompletableFuture.completedFuture(this.jobClientCreator.getJobClient(jobID, classLoader));
    }

    private CompletableFuture<JobClient> submitAndGetJobClientFuture(Pipeline pipeline, Configuration configuration, ClassLoader classLoader) throws MalformedURLException {
        Time milliseconds = Time.milliseconds(((Duration) configuration.get(ClientOptions.CLIENT_TIMEOUT)).toMillis());
        JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration, classLoader);
        JobID jobID = jobGraph.getJobID();
        this.submittedJobIds.add(jobID);
        LOG.info("Job {} is submitted.", jobID);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Effective Configuration: {}", configuration);
        }
        return submitJob(configuration, this.dispatcherGateway, jobGraph, milliseconds).thenApplyAsync(FunctionUtils.uncheckedFunction(jobID2 -> {
            ClientUtils.waitUntilJobInitializationFinished(() -> {
                return this.dispatcherGateway.requestJobStatus(jobID2, milliseconds).get();
            }, () -> {
                return this.dispatcherGateway.requestJobResult(jobID2, milliseconds).get();
            }, classLoader);
            return jobID2;
        })).thenApplyAsync((Function<? super U, ? extends U>) jobID3 -> {
            return this.jobClientCreator.getJobClient(jobID, classLoader);
        });
    }

    private static CompletableFuture<JobID> submitJob(Configuration configuration, DispatcherGateway dispatcherGateway, JobGraph jobGraph, Time time) {
        Preconditions.checkNotNull(jobGraph);
        LOG.info("Submitting Job with JobId={}.", jobGraph.getJobID());
        return dispatcherGateway.getBlobServerPort(time).thenApply(num -> {
            return new InetSocketAddress(dispatcherGateway.getHostname(), num.intValue());
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) inetSocketAddress -> {
            try {
                org.apache.flink.runtime.client.ClientUtils.extractAndUploadJobGraphFiles(jobGraph, () -> {
                    return new BlobClient(inetSocketAddress, configuration);
                });
                return dispatcherGateway.submitJob(jobGraph, time);
            } catch (FlinkException e) {
                throw new CompletionException(e);
            }
        }).thenApply(acknowledge -> {
            return jobGraph.getJobID();
        });
    }
}
