package org.apache.flink.runtime.dispatcher;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.shaded.guava18.com.google.common.base.Ticker;
import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
import org.apache.flink.shaded.guava18.com.google.common.cache.CacheLoader;
import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ShutdownHookUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/dispatcher/FileExecutionGraphInfoStore.class */
public class FileExecutionGraphInfoStore implements ExecutionGraphInfoStore {
    private static final Logger LOG = LoggerFactory.getLogger(FileExecutionGraphInfoStore.class);
    private final File storageDir;
    private final Cache<JobID, JobDetails> jobDetailsCache;
    private final LoadingCache<JobID, ExecutionGraphInfo> executionGraphInfoCache;
    private final ScheduledFuture<?> cleanupFuture;
    private final Thread shutdownHook;
    private int numFinishedJobs;
    private int numFailedJobs;
    private int numCanceledJobs;

    public FileExecutionGraphInfoStore(File file, Time time, int i, long j, ScheduledExecutor scheduledExecutor, Ticker ticker) throws IOException {
        File initExecutionGraphStorageDirectory = initExecutionGraphStorageDirectory(file);
        LOG.info("Initializing {}: Storage directory {}, expiration time {}, maximum cache size {} bytes.", new Object[]{FileExecutionGraphInfoStore.class.getSimpleName(), initExecutionGraphStorageDirectory, Long.valueOf(time.toMilliseconds()), Long.valueOf(j)});
        this.storageDir = (File) Preconditions.checkNotNull(initExecutionGraphStorageDirectory);
        Preconditions.checkArgument(initExecutionGraphStorageDirectory.exists() && initExecutionGraphStorageDirectory.isDirectory(), "The storage directory must exist and be a directory.");
        this.jobDetailsCache = CacheBuilder.newBuilder().expireAfterWrite(time.toMilliseconds(), TimeUnit.MILLISECONDS).maximumSize(i).removalListener(removalNotification -> {
            deleteExecutionGraphFile((JobID) removalNotification.getKey());
        }).ticker(ticker).build();
        this.executionGraphInfoCache = CacheBuilder.newBuilder().maximumWeight(j).weigher(this::calculateSize).build(new CacheLoader<JobID, ExecutionGraphInfo>() { // from class: org.apache.flink.runtime.dispatcher.FileExecutionGraphInfoStore.1
            @Override // org.apache.flink.shaded.guava18.com.google.common.cache.CacheLoader
            public ExecutionGraphInfo load(JobID jobID) throws Exception {
                return FileExecutionGraphInfoStore.this.loadExecutionGraph(jobID);
            }
        });
        Cache<JobID, JobDetails> cache = this.jobDetailsCache;
        cache.getClass();
        this.cleanupFuture = scheduledExecutor.scheduleWithFixedDelay(cache::cleanUp, time.toMilliseconds(), time.toMilliseconds(), TimeUnit.MILLISECONDS);
        this.shutdownHook = ShutdownHookUtil.addShutdownHook(this, getClass().getSimpleName(), LOG);
        this.numFinishedJobs = 0;
        this.numFailedJobs = 0;
        this.numCanceledJobs = 0;
    }

    @Override // org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore
    public int size() {
        return Math.toIntExact(this.jobDetailsCache.size());
    }

    @Override // org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore
    @Nullable
    public ExecutionGraphInfo get(JobID jobID) {
        try {
            return this.executionGraphInfoCache.get(jobID);
        } catch (ExecutionException e) {
            LOG.debug("Could not load archived execution graph information for job id {}.", jobID, e);
            return null;
        }
    }

    @Override // org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore
    public void put(ExecutionGraphInfo executionGraphInfo) throws IOException {
        JobID jobId = executionGraphInfo.getJobId();
        ArchivedExecutionGraph archivedExecutionGraph = executionGraphInfo.getArchivedExecutionGraph();
        JobStatus state = archivedExecutionGraph.getState();
        String jobName = archivedExecutionGraph.getJobName();
        Preconditions.checkArgument(state.isTerminalState(), "The job " + jobName + '(' + jobId + ") is not in a terminal state. Instead it is in state " + state + '.');
        switch (state) {
            case FINISHED:
                this.numFinishedJobs++;
                break;
            case CANCELED:
                this.numCanceledJobs++;
                break;
            case FAILED:
                this.numFailedJobs++;
                break;
            case SUSPENDED:
                break;
            default:
                throw new IllegalStateException("The job " + jobName + '(' + jobId + ") should have been in a known terminal state. Instead it was in state " + state + '.');
        }
        storeExecutionGraphInfo(executionGraphInfo);
        this.jobDetailsCache.put(jobId, JobDetails.createDetailsForJob(archivedExecutionGraph));
        this.executionGraphInfoCache.put(jobId, executionGraphInfo);
    }

    @Override // org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore
    public JobsOverview getStoredJobsOverview() {
        return new JobsOverview(0, this.numFinishedJobs, this.numCanceledJobs, this.numFailedJobs);
    }

    @Override // org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore
    public Collection<JobDetails> getAvailableJobDetails() {
        return this.jobDetailsCache.asMap().values();
    }

    @Override // org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore
    @Nullable
    public JobDetails getAvailableJobDetails(JobID jobID) {
        return this.jobDetailsCache.getIfPresent(jobID);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.cleanupFuture.cancel(false);
        this.jobDetailsCache.invalidateAll();
        FileUtils.deleteFileOrDirectory(this.storageDir);
        ShutdownHookUtil.removeShutdownHook(this.shutdownHook, getClass().getSimpleName(), LOG);
    }

    private int calculateSize(JobID jobID, ExecutionGraphInfo executionGraphInfo) {
        File executionGraphFile = getExecutionGraphFile(jobID);
        if (executionGraphFile.exists()) {
            return Math.toIntExact(executionGraphFile.length());
        }
        LOG.debug("Could not find execution graph information file for {}. Estimating the size instead.", jobID);
        ArchivedExecutionGraph archivedExecutionGraph = executionGraphInfo.getArchivedExecutionGraph();
        return (archivedExecutionGraph.getAllVertices().size() * 1000) + (archivedExecutionGraph.getAccumulatorsSerialized().size() * 1000);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ExecutionGraphInfo loadExecutionGraph(JobID jobID) throws IOException, ClassNotFoundException {
        File executionGraphFile = getExecutionGraphFile(jobID);
        if (!executionGraphFile.exists()) {
            throw new FileNotFoundException("Could not find file for archived execution graph " + jobID + ". This indicates that the file either has been deleted or never written.");
        }
        FileInputStream fileInputStream = new FileInputStream(executionGraphFile);
        Throwable th = null;
        try {
            try {
                ExecutionGraphInfo executionGraphInfo = (ExecutionGraphInfo) InstantiationUtil.deserializeObject(fileInputStream, getClass().getClassLoader());
                if (fileInputStream != null) {
                    if (0 != 0) {
                        try {
                            fileInputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        fileInputStream.close();
                    }
                }
                return executionGraphInfo;
            } finally {
            }
        } catch (Throwable th3) {
            if (fileInputStream != null) {
                if (th != null) {
                    try {
                        fileInputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    fileInputStream.close();
                }
            }
            throw th3;
        }
    }

    private void storeExecutionGraphInfo(ExecutionGraphInfo executionGraphInfo) throws IOException {
        FileOutputStream fileOutputStream = new FileOutputStream(getExecutionGraphFile(executionGraphInfo.getJobId()));
        Throwable th = null;
        try {
            try {
                InstantiationUtil.serializeObject(fileOutputStream, executionGraphInfo);
                if (fileOutputStream != null) {
                    if (0 == 0) {
                        fileOutputStream.close();
                        return;
                    }
                    try {
                        fileOutputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (fileOutputStream != null) {
                if (th != null) {
                    try {
                        fileOutputStream.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    fileOutputStream.close();
                }
            }
            throw th4;
        }
    }

    private File getExecutionGraphFile(JobID jobID) {
        return new File(this.storageDir, jobID.toString());
    }

    private void deleteExecutionGraphFile(JobID jobID) {
        Preconditions.checkNotNull(jobID);
        File executionGraphFile = getExecutionGraphFile(jobID);
        try {
            FileUtils.deleteFileOrDirectory(executionGraphFile);
        } catch (IOException e) {
            LOG.debug("Could not delete file {}.", executionGraphFile, e);
        }
        this.executionGraphInfoCache.invalidate(jobID);
        this.jobDetailsCache.invalidate(jobID);
    }

    private static File initExecutionGraphStorageDirectory(File file) throws IOException {
        for (int i = 0; i < 10; i++) {
            File file2 = new File(file, "executionGraphStore-" + UUID.randomUUID());
            if (file2.mkdir()) {
                return file2;
            }
        }
        throw new IOException("Could not create executionGraphStorage directory in " + file + '.');
    }

    @VisibleForTesting
    File getStorageDir() {
        return this.storageDir;
    }

    @VisibleForTesting
    LoadingCache<JobID, ExecutionGraphInfo> getExecutionGraphInfoCache() {
        return this.executionGraphInfoCache;
    }
}
