package org.apache.flink.runtime.blob;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/blob/PermanentBlobCache.class */
public class PermanentBlobCache extends AbstractBlobCache implements JobPermanentBlobService {
    private static final int DEFAULT_SIZE_LIMIT_MB = 100;
    private final Map<JobID, RefCount> jobRefCounters;
    private final long cleanupInterval;
    private final Timer cleanupTimer;
    private final BlobCacheSizeTracker blobCacheSizeTracker;

    /* loaded from: input_file:org/apache/flink/runtime/blob/PermanentBlobCache$PermanentBlobCleanupTask.class */
    class PermanentBlobCleanupTask extends TimerTask {
        PermanentBlobCleanupTask() {
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            synchronized (PermanentBlobCache.this.jobRefCounters) {
                Iterator it = PermanentBlobCache.this.jobRefCounters.entrySet().iterator();
                long currentTimeMillis = System.currentTimeMillis();
                while (it.hasNext()) {
                    Map.Entry entry = (Map.Entry) it.next();
                    RefCount refCount = (RefCount) entry.getValue();
                    if (refCount.references <= 0 && refCount.keepUntil > 0 && currentTimeMillis >= refCount.keepUntil) {
                        JobID jobID = (JobID) entry.getKey();
                        File file = new File(BlobUtils.getStorageLocationPath(PermanentBlobCache.this.storageDir.deref().getAbsolutePath(), jobID));
                        PermanentBlobCache.this.readWriteLock.writeLock().lock();
                        boolean z = false;
                        try {
                            try {
                                PermanentBlobCache.this.blobCacheSizeTracker.untrackAll(jobID);
                                FileUtils.deleteDirectory(file);
                                z = true;
                                PermanentBlobCache.this.readWriteLock.writeLock().unlock();
                            } catch (Throwable th) {
                                PermanentBlobCache.this.log.warn("Failed to locally delete job directory " + file.getAbsolutePath(), th);
                                PermanentBlobCache.this.readWriteLock.writeLock().unlock();
                            }
                            if (z) {
                                it.remove();
                            }
                        } catch (Throwable th2) {
                            PermanentBlobCache.this.readWriteLock.writeLock().unlock();
                            throw th2;
                        }
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/flink/runtime/blob/PermanentBlobCache$RefCount.class */
    public static class RefCount {
        public int references = 0;
        public long keepUntil = -1;

        RefCount() {
        }
    }

    @VisibleForTesting
    public PermanentBlobCache(Configuration configuration, File file, BlobView blobView, @Nullable InetSocketAddress inetSocketAddress) throws IOException {
        this(configuration, (Reference<File>) Reference.owned(file), blobView, inetSocketAddress);
    }

    @VisibleForTesting
    public PermanentBlobCache(Configuration configuration, File file, BlobView blobView, @Nullable InetSocketAddress inetSocketAddress, BlobCacheSizeTracker blobCacheSizeTracker) throws IOException {
        this(configuration, (Reference<File>) Reference.owned(file), blobView, inetSocketAddress, blobCacheSizeTracker);
    }

    public PermanentBlobCache(Configuration configuration, Reference<File> reference, BlobView blobView, @Nullable InetSocketAddress inetSocketAddress) throws IOException {
        this(configuration, reference, blobView, inetSocketAddress, new BlobCacheSizeTracker(MemorySize.ofMebiBytes(100L).getBytes()));
    }

    @VisibleForTesting
    public PermanentBlobCache(Configuration configuration, Reference<File> reference, BlobView blobView, @Nullable InetSocketAddress inetSocketAddress, BlobCacheSizeTracker blobCacheSizeTracker) throws IOException {
        super(configuration, reference, blobView, LoggerFactory.getLogger(PermanentBlobCache.class), inetSocketAddress);
        this.jobRefCounters = new HashMap();
        this.cleanupTimer = new Timer(true);
        this.cleanupInterval = configuration.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000;
        this.cleanupTimer.schedule(new PermanentBlobCleanupTask(), this.cleanupInterval, this.cleanupInterval);
        this.blobCacheSizeTracker = blobCacheSizeTracker;
        registerDetectedJobs();
    }

    private void registerDetectedJobs() throws IOException {
        if (this.storageDir.deref().exists()) {
            Set<JobID> listExistingJobs = BlobUtils.listExistingJobs(this.storageDir.deref().toPath());
            long currentTimeMillis = System.currentTimeMillis() + this.cleanupInterval;
            Iterator<JobID> it = listExistingJobs.iterator();
            while (it.hasNext()) {
                registerJobWithExpiry(it.next(), currentTimeMillis);
            }
        }
    }

    private void registerJobWithExpiry(JobID jobID, long j) {
        Preconditions.checkNotNull(jobID);
        synchronized (this.jobRefCounters) {
            this.jobRefCounters.computeIfAbsent(jobID, jobID2 -> {
                return new RefCount();
            }).keepUntil = j;
        }
    }

    @Override // org.apache.flink.runtime.blob.JobPermanentBlobService
    public void registerJob(JobID jobID) {
        Preconditions.checkNotNull(jobID);
        synchronized (this.jobRefCounters) {
            RefCount refCount = this.jobRefCounters.get(jobID);
            if (refCount == null) {
                refCount = new RefCount();
                this.jobRefCounters.put(jobID, refCount);
            } else {
                refCount.keepUntil = -1L;
            }
            refCount.references++;
        }
    }

    @Override // org.apache.flink.runtime.blob.JobPermanentBlobService
    public void releaseJob(JobID jobID) {
        Preconditions.checkNotNull(jobID);
        synchronized (this.jobRefCounters) {
            RefCount refCount = this.jobRefCounters.get(jobID);
            if (refCount == null || refCount.references == 0) {
                this.log.warn("improper use of releaseJob() without a matching number of registerJob() calls for jobId " + jobID);
                return;
            }
            refCount.references--;
            if (refCount.references == 0) {
                refCount.keepUntil = System.currentTimeMillis() + this.cleanupInterval;
            }
        }
    }

    public int getNumberOfReferenceHolders(JobID jobID) {
        Preconditions.checkNotNull(jobID);
        synchronized (this.jobRefCounters) {
            RefCount refCount = this.jobRefCounters.get(jobID);
            if (refCount == null) {
                return 0;
            }
            return refCount.references;
        }
    }

    @Override // org.apache.flink.runtime.blob.PermanentBlobService
    public File getFile(JobID jobID, PermanentBlobKey permanentBlobKey) throws IOException {
        Preconditions.checkNotNull(jobID);
        return getFileInternal(jobID, permanentBlobKey);
    }

    @Override // org.apache.flink.runtime.blob.PermanentBlobService
    public byte[] readFile(JobID jobID, PermanentBlobKey permanentBlobKey) throws IOException {
        Preconditions.checkNotNull(jobID);
        Preconditions.checkNotNull(permanentBlobKey);
        File storageLocation = BlobUtils.getStorageLocation(this.storageDir.deref(), jobID, permanentBlobKey);
        this.readWriteLock.readLock().lock();
        try {
            if (storageLocation.exists()) {
                this.blobCacheSizeTracker.update(jobID, permanentBlobKey);
                byte[] readAllBytes = FileUtils.readAllBytes(storageLocation.toPath());
                this.readWriteLock.readLock().unlock();
                return readAllBytes;
            }
            this.readWriteLock.readLock().unlock();
            File createTemporaryFilename = createTemporaryFilename();
            try {
                try {
                } catch (Throwable th) {
                    if (!createTemporaryFilename.delete() && createTemporaryFilename.exists()) {
                        this.log.warn("Could not delete the staging file {} for blob key {} and job {}.", new Object[]{createTemporaryFilename, permanentBlobKey, jobID});
                    }
                    throw th;
                }
            } catch (Exception e) {
                this.log.info("Failed to copy from blob store. Downloading from BLOB server instead.", e);
            }
            if (this.blobView.get(jobID, permanentBlobKey, createTemporaryFilename)) {
                this.readWriteLock.writeLock().lock();
                try {
                    checkLimitAndMoveFile(createTemporaryFilename, jobID, permanentBlobKey, storageLocation, this.log, null);
                    byte[] readAllBytes2 = FileUtils.readAllBytes(storageLocation.toPath());
                    this.readWriteLock.writeLock().unlock();
                    if (!createTemporaryFilename.delete() && createTemporaryFilename.exists()) {
                        this.log.warn("Could not delete the staging file {} for blob key {} and job {}.", new Object[]{createTemporaryFilename, permanentBlobKey, jobID});
                    }
                    return readAllBytes2;
                } finally {
                }
            }
            InetSocketAddress inetSocketAddress = this.serverAddress;
            if (inetSocketAddress == null) {
                throw new IOException("Cannot download from BlobServer, because the server address is unknown.");
            }
            BlobClient.downloadFromBlobServer(jobID, permanentBlobKey, createTemporaryFilename, inetSocketAddress, this.blobClientConfig, this.numFetchRetries);
            this.readWriteLock.writeLock().lock();
            try {
                checkLimitAndMoveFile(createTemporaryFilename, jobID, permanentBlobKey, storageLocation, this.log, null);
                byte[] readAllBytes3 = FileUtils.readAllBytes(storageLocation.toPath());
                this.readWriteLock.writeLock().unlock();
                if (!createTemporaryFilename.delete() && createTemporaryFilename.exists()) {
                    this.log.warn("Could not delete the staging file {} for blob key {} and job {}.", new Object[]{createTemporaryFilename, permanentBlobKey, jobID});
                }
                return readAllBytes3;
            } finally {
            }
        } catch (Throwable th2) {
            this.readWriteLock.readLock().unlock();
            throw th2;
        }
    }

    private void checkLimitAndMoveFile(File file, JobID jobID, BlobKey blobKey, File file2, Logger logger, @Nullable BlobStore blobStore) throws IOException {
        for (Tuple2<JobID, BlobKey> tuple2 : this.blobCacheSizeTracker.checkLimit(file.length())) {
            if (deleteFile(tuple2.f0, tuple2.f1)) {
                this.blobCacheSizeTracker.untrack(tuple2);
            }
        }
        BlobUtils.moveTempFileToStore(file, jobID, blobKey, file2, logger, blobStore);
        this.blobCacheSizeTracker.track(jobID, blobKey, file2.length());
    }

    private boolean deleteFile(JobID jobID, BlobKey blobKey) {
        File file = new File(BlobUtils.getStorageLocationPath(this.storageDir.deref().getAbsolutePath(), jobID, blobKey));
        if (file.delete() || !file.exists()) {
            return true;
        }
        this.log.warn("Failed to delete locally cached BLOB {} at {}", blobKey, file.getAbsolutePath());
        return false;
    }

    @VisibleForTesting
    public File getStorageLocation(JobID jobID, BlobKey blobKey) throws IOException {
        Preconditions.checkNotNull(jobID);
        return BlobUtils.getStorageLocation(this.storageDir.deref(), jobID, blobKey);
    }

    @VisibleForTesting
    Map<JobID, RefCount> getJobRefCounters() {
        return this.jobRefCounters;
    }

    @Override // org.apache.flink.runtime.blob.AbstractBlobCache
    protected void cancelCleanupTask() {
        this.cleanupTimer.cancel();
    }
}
