package org.apache.flink.runtime.checkpoint.filemerging;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;
import one.profiler.Events;
import org.apache.commons.cli.HelpFormatter;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.EntropyInjector;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.OutputStreamAndPath;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
import org.apache.flink.runtime.checkpoint.filemerging.LogicalFile;
import org.apache.flink.runtime.checkpoint.filemerging.PhysicalFile;
import org.apache.flink.runtime.checkpoint.filemerging.PhysicalFilePool;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filemerging.DirectoryStreamStateHandle;
import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle;
import org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.class */
public abstract class FileMergingSnapshotManagerBase implements FileMergingSnapshotManager {
    private static final Logger LOG = LoggerFactory.getLogger(FileMergingSnapshotManager.class);
    private static final int NUM_GHOST_CHECKPOINT_IDS = 16;
    private final String id;
    protected final Executor ioExecutor;
    protected FileSystem fs;
    protected Path checkpointDir;
    protected Path sharedStateDir;
    protected Path taskOwnedStateDir;
    protected int writeBufferSize;
    protected boolean shouldSyncAfterClosingLogicalFile;
    protected long maxPhysicalFileSize;
    protected PhysicalFilePool.Type filePoolType;
    protected final float maxSpaceAmplification;
    protected Path managedExclusiveStateDir;
    protected DirectoryHandleWithReferenceTrack managedExclusiveStateDirHandle;
    protected FileMergingSnapshotManager.SpaceStat spaceStat;
    protected FileMergingMetricGroup metricGroup;
    protected final Object lock = new Object();

    @GuardedBy(Events.LOCK)
    protected TreeMap<Long, Set<LogicalFile>> uploadedStates = new TreeMap<>();
    private final Map<LogicalFile.LogicalFileId, LogicalFile> knownLogicalFiles = new ConcurrentHashMap();
    private boolean fileSystemInitiated = false;
    protected PhysicalFile.PhysicalFileDeleter physicalFileDeleter = this::deletePhysicalFile;
    private final Object notifyLock = new Object();

    @GuardedBy("notifyLock")
    private final TreeMap<Long, Set<FileMergingSnapshotManager.SubtaskKey>> notifiedSubtaskCheckpoint = new TreeMap<>();

    @GuardedBy("notifyLock")
    private final TreeSet<Long> notifiedCheckpoint = new TreeSet<>();
    private final Map<FileMergingSnapshotManager.SubtaskKey, Path> managedSharedStateDir = new ConcurrentHashMap();
    private final Map<FileMergingSnapshotManager.SubtaskKey, DirectoryHandleWithReferenceTrack> managedSharedStateDirHandles = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase$DirectoryHandleWithReferenceTrack.class */
    public static class DirectoryHandleWithReferenceTrack {
        private final DirectoryStreamStateHandle directoryHandle;
        private final AtomicLong ongoingRefCount = new AtomicLong(0);
        private boolean tracking;

        DirectoryHandleWithReferenceTrack(DirectoryStreamStateHandle directoryStreamStateHandle, boolean z) {
            this.directoryHandle = directoryStreamStateHandle;
            this.tracking = z;
        }

        static DirectoryHandleWithReferenceTrack wrap(DirectoryStreamStateHandle directoryStreamStateHandle, boolean z) {
            return new DirectoryHandleWithReferenceTrack(directoryStreamStateHandle, z);
        }

        DirectoryStreamStateHandle getHandle() {
            return this.directoryHandle;
        }

        void increaseRefCountWhenCheckpointStart(long j) {
            if (this.tracking) {
                FileMergingSnapshotManagerBase.LOG.debug("checkpoint:{} start, increase ref-count to file-merging managed shared dir : {}", Long.valueOf(j), this.directoryHandle.getDirectory());
                this.ongoingRefCount.incrementAndGet();
            }
        }

        void decreaseRefCountWhenCheckpointAbort(long j) {
            if (this.tracking) {
                FileMergingSnapshotManagerBase.LOG.debug("checkpoint:{} aborted, decrease ref-count to file-merging managed shared dir : {}", Long.valueOf(j), this.directoryHandle.getDirectory());
                this.ongoingRefCount.decrementAndGet();
            }
        }

        void handoverOwnershipWhenCheckpointComplete(long j) {
            if (this.tracking) {
                FileMergingSnapshotManagerBase.LOG.debug("checkpoint:{} complete, handover ownership of file-merging managed shared dir to JobManager : {}", Long.valueOf(j), this.directoryHandle.getDirectory());
                this.tracking = false;
            }
        }

        void handoverOwnershipWhenCheckpointSubsumed(long j) {
            if (this.tracking) {
                FileMergingSnapshotManagerBase.LOG.debug("checkpoint:{} subsumed, handover ownership of file-merging managed shared dir to JobManager : {}", Long.valueOf(j), this.directoryHandle.getDirectory());
                this.tracking = false;
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void tryCleanupQuietly() {
            if (this.tracking && this.ongoingRefCount.get() == 0 && this.directoryHandle != null) {
                try {
                    this.directoryHandle.discardState();
                } catch (Exception e) {
                }
            }
        }
    }

    public FileMergingSnapshotManagerBase(String str, long j, PhysicalFilePool.Type type, float f, Executor executor, MetricGroup metricGroup) {
        this.id = str;
        this.maxPhysicalFileSize = j;
        this.filePoolType = type;
        this.maxSpaceAmplification = f < 1.0f ? Float.MAX_VALUE : f;
        this.ioExecutor = executor;
        this.spaceStat = new FileMergingSnapshotManager.SpaceStat();
        this.metricGroup = new FileMergingMetricGroup(metricGroup, this.spaceStat);
    }

    @Override // org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager
    public void initFileSystem(FileSystem fileSystem, Path path, Path path2, Path path3, int i) throws IllegalArgumentException {
        synchronized (this.lock) {
            if (this.fileSystemInitiated) {
                Preconditions.checkArgument(path.equals(this.checkpointDir), "The checkpoint base dir is not deterministic across subtasks.");
                Preconditions.checkArgument(path2.equals(this.sharedStateDir), "The shared checkpoint dir is not deterministic across subtasks.");
                Preconditions.checkArgument(path3.equals(this.taskOwnedStateDir), "The task-owned checkpoint dir is not deterministic across subtasks.");
                return;
            }
            this.fs = fileSystem;
            this.checkpointDir = (Path) Preconditions.checkNotNull(path);
            this.sharedStateDir = (Path) Preconditions.checkNotNull(path2);
            this.taskOwnedStateDir = (Path) Preconditions.checkNotNull(path3);
            this.shouldSyncAfterClosingLogicalFile = shouldSyncAfterClosingLogicalFile(fileSystem);
            Path path4 = new Path(path3, uriEscape(this.id));
            boolean createManagedDirectory = createManagedDirectory(path4);
            this.managedExclusiveStateDir = path4;
            this.managedExclusiveStateDirHandle = DirectoryHandleWithReferenceTrack.wrap(DirectoryStreamStateHandle.of(path4), createManagedDirectory);
            this.writeBufferSize = i;
            this.fileSystemInitiated = true;
        }
    }

    @Override // org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager
    public void registerSubtaskForSharedStates(FileMergingSnapshotManager.SubtaskKey subtaskKey) {
        Path path = new Path(this.sharedStateDir, uriEscape(subtaskKey.getManagedDirName()));
        if (this.managedSharedStateDir.containsKey(subtaskKey)) {
            return;
        }
        boolean createManagedDirectory = createManagedDirectory(path);
        this.managedSharedStateDir.put(subtaskKey, path);
        this.managedSharedStateDirHandles.put(subtaskKey, DirectoryHandleWithReferenceTrack.wrap(DirectoryStreamStateHandle.of(path), createManagedDirectory));
    }

    @Override // org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager
    public void unregisterSubtask(FileMergingSnapshotManager.SubtaskKey subtaskKey) {
        if (this.managedSharedStateDir.containsKey(subtaskKey)) {
            this.managedSharedStateDir.remove(subtaskKey);
            this.managedSharedStateDirHandles.get(subtaskKey).tryCleanupQuietly();
            this.managedSharedStateDirHandles.remove(subtaskKey);
        }
    }

    protected LogicalFile createLogicalFile(@Nonnull PhysicalFile physicalFile, long j, long j2, @Nonnull FileMergingSnapshotManager.SubtaskKey subtaskKey) {
        LogicalFile.LogicalFileId generateRandomId = LogicalFile.LogicalFileId.generateRandomId();
        LogicalFile logicalFile = new LogicalFile(generateRandomId, physicalFile, j, j2, subtaskKey);
        this.knownLogicalFiles.put(generateRandomId, logicalFile);
        if (physicalFile.isOwned()) {
            this.spaceStat.onLogicalFileCreate(j2);
            this.spaceStat.onPhysicalFileUpdate(j2);
        }
        return logicalFile;
    }

    @Nonnull
    protected PhysicalFile createPhysicalFile(FileMergingSnapshotManager.SubtaskKey subtaskKey, CheckpointedStateScope checkpointedStateScope) throws IOException {
        Exception exc = null;
        Path managedDir = getManagedDir(subtaskKey, checkpointedStateScope);
        if (managedDir == null) {
            throw new IOException("Could not get " + checkpointedStateScope + " path for subtask " + subtaskKey + ", the directory may have not been created.");
        }
        for (int i = 0; i < 10; i++) {
            try {
                OutputStreamAndPath createEntropyAware = EntropyInjector.createEntropyAware(this.fs, generatePhysicalFilePath(managedDir), FileSystem.WriteMode.NO_OVERWRITE);
                FSDataOutputStream stream = createEntropyAware.stream();
                Path path = createEntropyAware.path();
                PhysicalFile physicalFile = new PhysicalFile(stream, path, this.physicalFileDeleter, checkpointedStateScope);
                updateFileCreationMetrics(path);
                return physicalFile;
            } catch (Exception e) {
                exc = e;
            }
        }
        throw new IOException("Could not open output stream for state file merging.", exc);
    }

    @Override // org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager
    public FileMergingCheckpointStateOutputStream createCheckpointStateOutputStream(final FileMergingSnapshotManager.SubtaskKey subtaskKey, final long j, final CheckpointedStateScope checkpointedStateScope) {
        return new FileMergingCheckpointStateOutputStream(this.writeBufferSize, new FileMergingCheckpointStateOutputStream.FileMergingSnapshotManagerProxy() { // from class: org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.1
            PhysicalFile physicalFile;
            LogicalFile logicalFile;

            @Override // org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream.FileMergingSnapshotManagerProxy
            public Tuple2<FSDataOutputStream, Path> providePhysicalFile() throws IOException {
                this.physicalFile = FileMergingSnapshotManagerBase.this.getOrCreatePhysicalFileForCheckpoint(subtaskKey, j, checkpointedStateScope);
                return new Tuple2<>(this.physicalFile.getOutputStream(), this.physicalFile.getFilePath());
            }

            @Override // org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream.FileMergingSnapshotManagerProxy
            public SegmentFileStateHandle closeStreamAndCreateStateHandle(Path path, long j2, long j3) throws IOException {
                if (this.physicalFile == null) {
                    return null;
                }
                this.logicalFile = FileMergingSnapshotManagerBase.this.createLogicalFile(this.physicalFile, j2, j3, subtaskKey);
                this.logicalFile.advanceLastCheckpointId(j);
                synchronized (FileMergingSnapshotManagerBase.this.lock) {
                    ((Set) FileMergingSnapshotManagerBase.this.uploadedStates.computeIfAbsent(Long.valueOf(j), l -> {
                        return new HashSet();
                    })).add(this.logicalFile);
                }
                FileMergingSnapshotManagerBase.this.returnPhysicalFileForNextReuse(subtaskKey, j, this.physicalFile);
                return new SegmentFileStateHandle(this.physicalFile.getFilePath(), j2, j3, checkpointedStateScope, this.logicalFile.getFileId());
            }

            @Override // org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream.FileMergingSnapshotManagerProxy
            public void closeStreamExceptionally() throws IOException {
                if (this.physicalFile != null) {
                    if (this.logicalFile != null) {
                        FileMergingSnapshotManagerBase.this.discardSingleLogicalFile(this.logicalFile, j);
                    } else {
                        this.physicalFile.close();
                        this.physicalFile.deleteIfNecessary();
                    }
                }
            }
        });
    }

    private void updateFileCreationMetrics(Path path) {
        this.spaceStat.onPhysicalFileCreate();
        LOG.debug("Create a new physical file {} for checkpoint file merging.", path);
    }

    protected Path generatePhysicalFilePath(Path path) {
        return new Path(path, UUID.randomUUID().toString());
    }

    @VisibleForTesting
    boolean isResponsibleForFile(Path path) {
        Path parent = path.getParent();
        return parent.equals(this.managedExclusiveStateDir) || this.managedSharedStateDir.containsValue(parent);
    }

    protected final void deletePhysicalFile(Path path, long j) {
        this.ioExecutor.execute(() -> {
            try {
                this.fs.delete(path, false);
                this.spaceStat.onPhysicalFileDelete(j);
                LOG.debug("Physical file deleted: {}.", path);
            } catch (IOException e) {
                LOG.warn("Fail to delete file: {}", path);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final PhysicalFilePool createPhysicalPool() {
        switch (this.filePoolType) {
            case NON_BLOCKING:
                return new NonBlockingPhysicalFilePool(this.maxPhysicalFileSize, this::createPhysicalFile);
            case BLOCKING:
                return new BlockingPhysicalFilePool(this.maxPhysicalFileSize, this::createPhysicalFile);
            default:
                throw new UnsupportedOperationException("Unsupported type of physical file pool: " + this.filePoolType);
        }
    }

    @Nonnull
    protected abstract PhysicalFile getOrCreatePhysicalFileForCheckpoint(FileMergingSnapshotManager.SubtaskKey subtaskKey, long j, CheckpointedStateScope checkpointedStateScope) throws IOException;

    protected abstract void returnPhysicalFileForNextReuse(FileMergingSnapshotManager.SubtaskKey subtaskKey, long j, PhysicalFile physicalFile) throws IOException;

    /* JADX INFO: Access modifiers changed from: protected */
    public void discardCheckpoint(long j) throws IOException {
        controlSpace();
    }

    @Override // org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager
    public void notifyCheckpointStart(FileMergingSnapshotManager.SubtaskKey subtaskKey, long j) {
        if (this.fileSystemInitiated) {
            this.managedSharedStateDirHandles.computeIfPresent(subtaskKey, (subtaskKey2, directoryHandleWithReferenceTrack) -> {
                directoryHandleWithReferenceTrack.increaseRefCountWhenCheckpointStart(j);
                return directoryHandleWithReferenceTrack;
            });
            this.managedExclusiveStateDirHandle.increaseRefCountWhenCheckpointStart(j);
        }
    }

    @Override // org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager
    public void notifyCheckpointComplete(FileMergingSnapshotManager.SubtaskKey subtaskKey, long j) throws Exception {
        if (this.fileSystemInitiated) {
            this.managedSharedStateDirHandles.computeIfPresent(subtaskKey, (subtaskKey2, directoryHandleWithReferenceTrack) -> {
                directoryHandleWithReferenceTrack.handoverOwnershipWhenCheckpointComplete(j);
                return directoryHandleWithReferenceTrack;
            });
            this.managedExclusiveStateDirHandle.handoverOwnershipWhenCheckpointComplete(j);
        }
    }

    @Override // org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager
    public void notifyCheckpointAborted(FileMergingSnapshotManager.SubtaskKey subtaskKey, long j) throws Exception {
        if (this.fileSystemInitiated) {
            this.managedSharedStateDirHandles.computeIfPresent(subtaskKey, (subtaskKey2, directoryHandleWithReferenceTrack) -> {
                directoryHandleWithReferenceTrack.decreaseRefCountWhenCheckpointAbort(j);
                return directoryHandleWithReferenceTrack;
            });
            this.managedExclusiveStateDirHandle.decreaseRefCountWhenCheckpointAbort(j);
        }
        synchronized (this.lock) {
            Set<LogicalFile> set = this.uploadedStates.get(Long.valueOf(j));
            if (set == null) {
                return;
            }
            if (discardLogicalFiles(subtaskKey, j, set)) {
                this.uploadedStates.remove(Long.valueOf(j));
            }
            notifyReleaseCheckpoint(subtaskKey, j);
        }
    }

    @Override // org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager
    public void notifyCheckpointSubsumed(FileMergingSnapshotManager.SubtaskKey subtaskKey, long j) throws Exception {
        if (this.fileSystemInitiated) {
            this.managedSharedStateDirHandles.computeIfPresent(subtaskKey, (subtaskKey2, directoryHandleWithReferenceTrack) -> {
                directoryHandleWithReferenceTrack.handoverOwnershipWhenCheckpointSubsumed(j);
                return directoryHandleWithReferenceTrack;
            });
            this.managedExclusiveStateDirHandle.handoverOwnershipWhenCheckpointSubsumed(j);
        }
        synchronized (this.lock) {
            Iterator<Map.Entry<Long, Set<LogicalFile>>> it = this.uploadedStates.headMap(Long.valueOf(j), true).entrySet().iterator();
            while (it.hasNext()) {
                if (discardLogicalFiles(subtaskKey, j, it.next().getValue())) {
                    it.remove();
                }
            }
        }
        notifyReleaseCheckpoint(subtaskKey, j);
    }

    private void notifyReleaseCheckpoint(FileMergingSnapshotManager.SubtaskKey subtaskKey, long j) throws IOException {
        synchronized (this.notifyLock) {
            if (this.notifiedCheckpoint.contains(Long.valueOf(j))) {
                return;
            }
            Set set = (Set) this.notifiedSubtaskCheckpoint.computeIfAbsent(Long.valueOf(j), l -> {
                return new HashSet();
            });
            set.add(subtaskKey);
            if (set.containsAll(this.managedSharedStateDir.keySet())) {
                tryDiscardCheckpoint(j);
            }
            if (this.notifiedSubtaskCheckpoint.size() > 16) {
                this.notifiedSubtaskCheckpoint.pollFirstEntry();
            }
        }
    }

    private void tryDiscardCheckpoint(long j) throws IOException {
        synchronized (this.notifyLock) {
            if (!this.notifiedCheckpoint.contains(Long.valueOf(j))) {
                this.notifiedCheckpoint.add(Long.valueOf(j));
                this.notifiedSubtaskCheckpoint.remove(Long.valueOf(j));
                discardCheckpoint(j);
                if (this.notifiedCheckpoint.size() > 16) {
                    this.notifiedCheckpoint.pollFirst();
                }
            }
        }
    }

    @Override // org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager
    public void reusePreviousStateHandle(long j, Collection<? extends StreamStateHandle> collection) {
        LogicalFile logicalFile;
        for (StreamStateHandle streamStateHandle : collection) {
            if (streamStateHandle instanceof SegmentFileStateHandle) {
                LogicalFile logicalFile2 = this.knownLogicalFiles.get(((SegmentFileStateHandle) streamStateHandle).getLogicalFileId());
                if (logicalFile2 != null) {
                    logicalFile2.advanceLastCheckpointId(j);
                }
            } else if ((streamStateHandle instanceof PlaceholderStreamStateHandle) && ((PlaceholderStreamStateHandle) streamStateHandle).isFileMerged() && (logicalFile = this.knownLogicalFiles.get(new LogicalFile.LogicalFileId(streamStateHandle.getStreamStateHandleID().getKeyString()))) != null) {
                logicalFile.advanceLastCheckpointId(j);
            }
        }
    }

    private void controlSpace() {
        if (this.maxSpaceAmplification == Float.MAX_VALUE || ((float) this.spaceStat.logicalFileSize.get()) * this.maxSpaceAmplification >= ((float) this.spaceStat.physicalFileSize.get())) {
            return;
        }
        long round = Math.round(((float) this.spaceStat.logicalFileSize.get()) * this.maxSpaceAmplification);
        AtomicLong atomicLong = new AtomicLong(0L);
        HashSet hashSet = new HashSet();
        this.knownLogicalFiles.values().stream().map((v0) -> {
            return v0.getPhysicalFile();
        }).forEach(physicalFile -> {
            if (physicalFile.isCouldReuse() && hashSet.add(physicalFile)) {
                atomicLong.addAndGet(physicalFile.getSize());
            }
        });
        if (atomicLong.get() > round) {
            TreeSet<PhysicalFile> treeSet = new TreeSet((physicalFile2, physicalFile3) -> {
                return Long.compare(physicalFile3.wastedSize(), physicalFile2.wastedSize());
            });
            Stream filter = hashSet.stream().filter((v0) -> {
                return v0.closed();
            });
            treeSet.getClass();
            filter.forEach((v1) -> {
                r1.add(v1);
            });
            for (PhysicalFile physicalFile4 : treeSet) {
                if (!physicalFile4.checkReuseOnSpaceAmplification(this.maxSpaceAmplification) && atomicLong.addAndGet(-physicalFile4.wastedSize()) <= round) {
                    return;
                }
            }
        }
    }

    @Override // org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager
    public boolean couldReusePreviousStateHandle(StreamStateHandle streamStateHandle) {
        LogicalFile logicalFile;
        if (streamStateHandle instanceof SegmentFileStateHandle) {
            LogicalFile logicalFile2 = this.knownLogicalFiles.get(((SegmentFileStateHandle) streamStateHandle).getLogicalFileId());
            if (logicalFile2 != null) {
                return logicalFile2.getPhysicalFile().isCouldReuse();
            }
            return false;
        }
        if ((streamStateHandle instanceof PlaceholderStreamStateHandle) && ((PlaceholderStreamStateHandle) streamStateHandle).isFileMerged() && (logicalFile = this.knownLogicalFiles.get(new LogicalFile.LogicalFileId(streamStateHandle.getStreamStateHandleID().getKeyString()))) != null) {
            return logicalFile.getPhysicalFile().isCouldReuse();
        }
        return false;
    }

    public void discardSingleLogicalFile(LogicalFile logicalFile, long j) throws IOException {
        logicalFile.discardWithCheckpointId(j);
        if (logicalFile.getPhysicalFile().isOwned()) {
            this.spaceStat.onLogicalFileDelete(logicalFile.getLength());
        }
    }

    private boolean discardLogicalFiles(FileMergingSnapshotManager.SubtaskKey subtaskKey, long j, Set<LogicalFile> set) throws Exception {
        Iterator<LogicalFile> it = set.iterator();
        while (it.hasNext()) {
            LogicalFile next = it.next();
            if (next.getSubtaskKey().equals(subtaskKey) && next.getLastUsedCheckpointID() <= j) {
                discardSingleLogicalFile(next, j);
                it.remove();
                this.knownLogicalFiles.remove(next.getFileId());
            }
        }
        if (!set.isEmpty()) {
            return false;
        }
        tryDiscardCheckpoint(j);
        return true;
    }

    @Override // org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager
    public Path getManagedDir(FileMergingSnapshotManager.SubtaskKey subtaskKey, CheckpointedStateScope checkpointedStateScope) {
        return checkpointedStateScope.equals(CheckpointedStateScope.SHARED) ? this.managedSharedStateDir.get(subtaskKey) : this.managedExclusiveStateDir;
    }

    @Override // org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager
    public DirectoryStreamStateHandle getManagedDirStateHandle(FileMergingSnapshotManager.SubtaskKey subtaskKey, CheckpointedStateScope checkpointedStateScope) {
        if (!checkpointedStateScope.equals(CheckpointedStateScope.SHARED)) {
            return this.managedExclusiveStateDirHandle.getHandle();
        }
        DirectoryHandleWithReferenceTrack directoryHandleWithReferenceTrack = this.managedSharedStateDirHandles.get(subtaskKey);
        if (directoryHandleWithReferenceTrack != null) {
            return directoryHandleWithReferenceTrack.getHandle();
        }
        return null;
    }

    static boolean shouldSyncAfterClosingLogicalFile(FileSystem fileSystem) {
        return true;
    }

    private static String uriEscape(String str) {
        return str.replaceAll("[;/?:@&=+$,\\[\\]]", HelpFormatter.DEFAULT_OPT_PREFIX);
    }

    private boolean createManagedDirectory(Path path) {
        FileStatus fileStatus = null;
        try {
            try {
                fileStatus = this.fs.getFileStatus(path);
            } catch (FileNotFoundException e) {
            }
            if (fileStatus == null) {
                this.fs.mkdirs(path);
                LOG.info("Created a directory {} for checkpoint file-merging.", path);
                return true;
            }
            if (!fileStatus.isDir()) {
                throw new FlinkRuntimeException("The managed path " + path + " for file-merging is occupied by another file. Cannot create directory.");
            }
            LOG.info("Reusing previous directory {} for checkpoint file-merging.", path);
            return false;
        } catch (IOException e2) {
            throw new FlinkRuntimeException("Cannot create directory " + path + " for file-merging ", e2);
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.fileSystemInitiated) {
            quietlyCleanupManagedDir();
        }
    }

    private void quietlyCleanupManagedDir() {
        this.managedSharedStateDirHandles.forEach((subtaskKey, directoryHandleWithReferenceTrack) -> {
            directoryHandleWithReferenceTrack.tryCleanupQuietly();
        });
        this.managedExclusiveStateDirHandle.tryCleanupQuietly();
    }

    @VisibleForTesting
    public String getId() {
        return this.id;
    }

    @Override // org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager
    public void restoreStateHandles(long j, FileMergingSnapshotManager.SubtaskKey subtaskKey, Stream<SegmentFileStateHandle> stream) {
        synchronized (this.lock) {
            Set set = (Set) this.uploadedStates.computeIfAbsent(Long.valueOf(j), l -> {
                return new HashSet();
            });
            HashMap hashMap = new HashMap();
            this.knownLogicalFiles.values().stream().map((v0) -> {
                return v0.getPhysicalFile();
            }).forEach(physicalFile -> {
            });
            stream.forEach(segmentFileStateHandle -> {
                PhysicalFile physicalFile2 = (PhysicalFile) hashMap.computeIfAbsent(segmentFileStateHandle.getFilePath(), path -> {
                    boolean z = this.fileSystemInitiated && isManagedByFileMergingManager(path, subtaskKey, segmentFileStateHandle.getScope());
                    PhysicalFile physicalFile3 = new PhysicalFile(null, path, this.physicalFileDeleter, segmentFileStateHandle.getScope(), z);
                    try {
                        physicalFile3.updateSize(getFileSize(physicalFile3));
                        if (z) {
                            this.spaceStat.onPhysicalFileCreate();
                            this.spaceStat.onPhysicalFileUpdate(physicalFile3.getSize());
                        }
                        return physicalFile3;
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                });
                LogicalFile.LogicalFileId logicalFileId = segmentFileStateHandle.getLogicalFileId();
                LogicalFile logicalFile = new LogicalFile(logicalFileId, physicalFile2, segmentFileStateHandle.getStartPos(), segmentFileStateHandle.getStateSize(), subtaskKey);
                if (physicalFile2.isOwned()) {
                    this.spaceStat.onLogicalFileCreate(logicalFile.getLength());
                }
                this.knownLogicalFiles.put(logicalFileId, logicalFile);
                logicalFile.advanceLastCheckpointId(j);
                set.add(logicalFile);
            });
        }
    }

    private long getFileSize(PhysicalFile physicalFile) throws IOException {
        FileStatus fileStatus = physicalFile.getFilePath().getFileSystem().getFileStatus(physicalFile.getFilePath());
        if (fileStatus == null || fileStatus.isDir()) {
            throw new FileNotFoundException("File " + physicalFile.getFilePath() + " does not exist.");
        }
        return fileStatus.getLen();
    }

    private boolean isManagedByFileMergingManager(Path path, FileMergingSnapshotManager.SubtaskKey subtaskKey, CheckpointedStateScope checkpointedStateScope) {
        if (checkpointedStateScope == CheckpointedStateScope.SHARED) {
            return path.toString().startsWith(this.managedSharedStateDir.get(subtaskKey).toString());
        }
        if (checkpointedStateScope == CheckpointedStateScope.EXCLUSIVE) {
            return path.toString().startsWith(this.managedExclusiveStateDir.toString());
        }
        throw new UnsupportedOperationException("Unsupported CheckpointStateScope " + checkpointedStateScope);
    }

    @VisibleForTesting
    public LogicalFile getLogicalFile(LogicalFile.LogicalFileId logicalFileId) {
        return this.knownLogicalFiles.get(logicalFileId);
    }

    @VisibleForTesting
    TreeMap<Long, Set<LogicalFile>> getUploadedStates() {
        return this.uploadedStates;
    }

    @VisibleForTesting
    boolean isCheckpointDiscard(long j) {
        return this.notifiedCheckpoint.contains(Long.valueOf(j));
    }
}
