package org.apache.flink.runtime.checkpoint.filemerging;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Executor;
import javax.annotation.Nonnull;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
import org.apache.flink.runtime.checkpoint.filemerging.PhysicalFilePool;
import org.apache.flink.runtime.state.CheckpointedStateScope;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManager.class */
public class WithinCheckpointFileMergingSnapshotManager extends FileMergingSnapshotManagerBase {
    private final Map<Long, PhysicalFilePool> writablePhysicalFilePool;

    public WithinCheckpointFileMergingSnapshotManager(String str, long j, PhysicalFilePool.Type type, float f, Executor executor, MetricGroup metricGroup) {
        super(str, j, type, f, executor, metricGroup);
        this.writablePhysicalFilePool = new HashMap();
    }

    @Override // org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase, org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager
    public void notifyCheckpointComplete(FileMergingSnapshotManager.SubtaskKey subtaskKey, long j) throws Exception {
        super.notifyCheckpointComplete(subtaskKey, j);
        removeAndCloseFiles(subtaskKey, j);
    }

    @Override // org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase, org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager
    public void notifyCheckpointAborted(FileMergingSnapshotManager.SubtaskKey subtaskKey, long j) throws Exception {
        super.notifyCheckpointAborted(subtaskKey, j);
        removeAndCloseFiles(subtaskKey, j);
    }

    private void removeAndCloseFiles(FileMergingSnapshotManager.SubtaskKey subtaskKey, long j) throws Exception {
        PhysicalFilePool physicalFilePool;
        synchronized (this.writablePhysicalFilePool) {
            physicalFilePool = this.writablePhysicalFilePool.get(Long.valueOf(j));
        }
        if (physicalFilePool != null) {
            physicalFilePool.close(subtaskKey);
            if (physicalFilePool.isEmpty()) {
                synchronized (this.writablePhysicalFilePool) {
                    this.writablePhysicalFilePool.remove(Long.valueOf(j));
                }
            }
        }
    }

    @Override // org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase
    @Nonnull
    protected PhysicalFile getOrCreatePhysicalFileForCheckpoint(FileMergingSnapshotManager.SubtaskKey subtaskKey, long j, CheckpointedStateScope checkpointedStateScope) throws IOException {
        return getOrCreateFilePool(j).pollFile(subtaskKey, checkpointedStateScope);
    }

    @Override // org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase
    protected void returnPhysicalFileForNextReuse(FileMergingSnapshotManager.SubtaskKey subtaskKey, long j, PhysicalFile physicalFile) throws IOException {
        FSDataOutputStream outputStream;
        if (this.shouldSyncAfterClosingLogicalFile && (outputStream = physicalFile.getOutputStream()) != null) {
            outputStream.sync();
        }
        if (getOrCreateFilePool(j).tryPutFile(subtaskKey, physicalFile)) {
            return;
        }
        physicalFile.close();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase
    public void discardCheckpoint(long j) throws IOException {
        PhysicalFilePool physicalFilePool;
        synchronized (this.writablePhysicalFilePool) {
            physicalFilePool = this.writablePhysicalFilePool.get(Long.valueOf(j));
        }
        if (physicalFilePool != null) {
            physicalFilePool.close();
        }
        super.discardCheckpoint(j);
    }

    private PhysicalFilePool getOrCreateFilePool(long j) {
        PhysicalFilePool physicalFilePool;
        synchronized (this.writablePhysicalFilePool) {
            PhysicalFilePool physicalFilePool2 = this.writablePhysicalFilePool.get(Long.valueOf(j));
            if (physicalFilePool2 == null) {
                physicalFilePool2 = createPhysicalPool();
                this.writablePhysicalFilePool.put(Long.valueOf(j), physicalFilePool2);
            }
            physicalFilePool = physicalFilePool2;
        }
        return physicalFilePool;
    }

    @Override // org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        super.close();
        Iterator<PhysicalFilePool> it = this.writablePhysicalFilePool.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        this.writablePhysicalFilePool.clear();
    }
}
