package org.apache.flink.runtime.dispatcher.cleanup;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils;
import org.apache.flink.runtime.dispatcher.JobCancellationFailedException;
import org.apache.flink.runtime.dispatcher.UnavailableDispatcherOperationException;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobManagerRunnerResult;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.state.SharedStateRegistryFactory;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.server.quorum.QuorumStats;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.class */
public class CheckpointResourcesCleanupRunner implements JobManagerRunner {
    private static final Logger LOG = LoggerFactory.getLogger(CheckpointResourcesCleanupRunner.class);
    private final JobResult jobResult;
    private final CheckpointRecoveryFactory checkpointRecoveryFactory;
    private final SharedStateRegistryFactory sharedStateRegistryFactory;
    private final Configuration jobManagerConfiguration;
    private final Executor cleanupExecutor;
    private final long initializationTimestamp;
    private final CheckpointsCleaner checkpointsCleaner = new CheckpointsCleaner();
    private final CompletableFuture<JobManagerRunnerResult> resultFuture = new CompletableFuture<>();
    private final CompletableFuture<Void> cleanupFuture = this.resultFuture.thenCompose(jobManagerRunnerResult -> {
        return runCleanupAsync();
    });

    public CheckpointResourcesCleanupRunner(JobResult jobResult, CheckpointRecoveryFactory checkpointRecoveryFactory, SharedStateRegistryFactory sharedStateRegistryFactory, Configuration configuration, Executor executor, long j) {
        this.jobResult = (JobResult) Preconditions.checkNotNull(jobResult);
        this.checkpointRecoveryFactory = (CheckpointRecoveryFactory) Preconditions.checkNotNull(checkpointRecoveryFactory);
        this.sharedStateRegistryFactory = (SharedStateRegistryFactory) Preconditions.checkNotNull(sharedStateRegistryFactory);
        this.jobManagerConfiguration = (Configuration) Preconditions.checkNotNull(configuration);
        this.cleanupExecutor = (Executor) Preconditions.checkNotNull(executor);
        this.initializationTimestamp = j;
    }

    private CompletableFuture<Void> runCleanupAsync() {
        return CompletableFuture.runAsync(() -> {
            try {
                cleanupCheckpoints();
            } catch (Exception e) {
                throw new CompletionException(e);
            }
        }, this.cleanupExecutor).thenCompose(r3 -> {
            return this.checkpointsCleaner.closeAsync();
        });
    }

    @Override // org.apache.flink.util.AutoCloseableAsync
    public CompletableFuture<Void> closeAsync() {
        return this.cleanupFuture;
    }

    @Override // org.apache.flink.runtime.jobmaster.JobManagerRunner
    public void start() throws Exception {
        this.resultFuture.complete(JobManagerRunnerResult.forSuccess(createExecutionGraphInfoFromJobResult()));
    }

    private void cleanupCheckpoints() throws Exception {
        CompletedCheckpointStore createCompletedCheckpointStore = createCompletedCheckpointStore();
        CheckpointIDCounter createCheckpointIDCounter = createCheckpointIDCounter();
        Exception exc = null;
        try {
            createCompletedCheckpointStore.shutdown(getJobStatus(), this.checkpointsCleaner);
        } catch (Exception e) {
            exc = e;
        }
        try {
            createCheckpointIDCounter.shutdown(getJobStatus()).get();
        } catch (Exception e2) {
            exc = (Exception) ExceptionUtils.firstOrSuppressed(e2, exc);
        }
        if (exc != null) {
            throw exc;
        }
    }

    private CompletedCheckpointStore createCompletedCheckpointStore() throws Exception {
        return this.checkpointRecoveryFactory.createRecoveredCompletedCheckpointStore(getJobID(), DefaultCompletedCheckpointStoreUtils.getMaximumNumberOfRetainedCheckpoints(this.jobManagerConfiguration, LOG), this.sharedStateRegistryFactory, this.cleanupExecutor, RestoreMode.CLAIM);
    }

    private CheckpointIDCounter createCheckpointIDCounter() throws Exception {
        return this.checkpointRecoveryFactory.createCheckpointIDCounter(getJobID());
    }

    @Override // org.apache.flink.runtime.jobmaster.JobManagerRunner
    public CompletableFuture<JobMasterGateway> getJobMasterGateway() {
        return FutureUtils.completedExceptionally(new UnavailableDispatcherOperationException("Unable to get JobMasterGateway for job in cleanup phase. The requested operation is not available in that stage."));
    }

    @Override // org.apache.flink.runtime.jobmaster.JobManagerRunner
    public CompletableFuture<JobManagerRunnerResult> getResultFuture() {
        return this.resultFuture;
    }

    @Override // org.apache.flink.runtime.jobmaster.JobManagerRunner
    public JobID getJobID() {
        return this.jobResult.getJobId();
    }

    @Override // org.apache.flink.runtime.jobmaster.JobManagerRunner
    public CompletableFuture<Acknowledge> cancel(Time time) {
        return FutureUtils.completedExceptionally(new JobCancellationFailedException("Cleanup tasks are not meant to be cancelled."));
    }

    @Override // org.apache.flink.runtime.jobmaster.JobManagerRunner
    public CompletableFuture<JobStatus> requestJobStatus(Time time) {
        return CompletableFuture.completedFuture(getJobStatus());
    }

    @Override // org.apache.flink.runtime.jobmaster.JobManagerRunner
    public CompletableFuture<JobDetails> requestJobDetails(Time time) {
        return requestJob(time).thenApply(executionGraphInfo -> {
            return JobDetails.createDetailsForJob(executionGraphInfo.getArchivedExecutionGraph());
        });
    }

    @Override // org.apache.flink.runtime.jobmaster.JobManagerRunner
    public CompletableFuture<ExecutionGraphInfo> requestJob(Time time) {
        return CompletableFuture.completedFuture(createExecutionGraphInfoFromJobResult());
    }

    @Override // org.apache.flink.runtime.jobmaster.JobManagerRunner
    public boolean isInitialized() {
        return true;
    }

    private ExecutionGraphInfo createExecutionGraphInfoFromJobResult() {
        return generateExecutionGraphInfo(this.jobResult, this.initializationTimestamp);
    }

    private JobStatus getJobStatus() {
        return getJobStatus(this.jobResult);
    }

    private static JobStatus getJobStatus(JobResult jobResult) {
        return jobResult.getApplicationStatus().deriveJobStatus();
    }

    private static ExecutionGraphInfo generateExecutionGraphInfo(JobResult jobResult, long j) {
        return new ExecutionGraphInfo(ArchivedExecutionGraph.createSparseArchivedExecutionGraph(jobResult.getJobId(), QuorumStats.Provider.UNKNOWN_STATE, getJobStatus(jobResult), jobResult.getSerializedThrowable().orElse(null), null, j));
    }
}
