package org.apache.flink.runtime.state;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.LongPredicate;
import java.util.stream.Collectors;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import one.profiler.Events;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/TaskLocalStateStoreImpl.class */
public class TaskLocalStateStoreImpl implements OwnedTaskLocalStateStore {
    private static final Logger LOG = LoggerFactory.getLogger(TaskLocalStateStoreImpl.class);

    @VisibleForTesting
    static final TaskStateSnapshot NULL_DUMMY = new TaskStateSnapshot(0, false);
    public static final String TASK_STATE_SNAPSHOT_FILENAME = "_task_state_snapshot";

    @Nonnull
    protected final JobID jobID;

    @Nonnull
    protected final AllocationID allocationID;

    @Nonnull
    protected final JobVertexID jobVertexID;

    @Nonnegative
    protected final int subtaskIndex;

    @Nonnull
    protected final LocalRecoveryConfig localRecoveryConfig;

    @Nonnull
    protected final Executor discardExecutor;

    @Nonnull
    protected final Object lock = new Object();

    @Nonnull
    @GuardedBy(Events.LOCK)
    protected final SortedMap<Long, TaskStateSnapshot> storedTaskStateByCheckpointID = new TreeMap();

    @GuardedBy(Events.LOCK)
    protected boolean disposed = false;

    public TaskLocalStateStoreImpl(@Nonnull JobID jobID, @Nonnull AllocationID allocationID, @Nonnull JobVertexID jobVertexID, @Nonnegative int i, @Nonnull LocalRecoveryConfig localRecoveryConfig, @Nonnull Executor executor) {
        this.jobID = jobID;
        this.allocationID = allocationID;
        this.jobVertexID = jobVertexID;
        this.subtaskIndex = i;
        this.discardExecutor = executor;
        this.localRecoveryConfig = localRecoveryConfig;
    }

    @Override // org.apache.flink.runtime.state.TaskLocalStateStore
    public void storeLocalState(@Nonnegative long j, @Nullable TaskStateSnapshot taskStateSnapshot) {
        if (taskStateSnapshot == null) {
            taskStateSnapshot = NULL_DUMMY;
        }
        if (LOG.isTraceEnabled()) {
            LOG.trace("Stored local state for checkpoint {} in subtask ({} - {} - {}) : {}.", new Object[]{Long.valueOf(j), this.jobID, this.jobVertexID, Integer.valueOf(this.subtaskIndex), taskStateSnapshot});
        } else if (LOG.isDebugEnabled()) {
            LOG.debug("Stored local state for checkpoint {} in subtask ({} - {} - {})", new Object[]{Long.valueOf(j), this.jobID, this.jobVertexID, Integer.valueOf(this.subtaskIndex)});
        }
        Tuple2 tuple2 = null;
        synchronized (this.lock) {
            if (this.disposed) {
                tuple2 = Tuple2.of(Long.valueOf(j), taskStateSnapshot);
            } else {
                TaskStateSnapshot put = this.storedTaskStateByCheckpointID.put(Long.valueOf(j), taskStateSnapshot);
                persistLocalStateMetadata(j, taskStateSnapshot);
                if (put != null) {
                    tuple2 = Tuple2.of(Long.valueOf(j), put);
                }
            }
        }
        if (tuple2 != null) {
            asyncDiscardLocalStateForCollection(Collections.singletonList(tuple2));
        }
    }

    private void persistLocalStateMetadata(long j, TaskStateSnapshot taskStateSnapshot) {
        createFolderOrFail(getCheckpointDirectory(j));
        File taskStateSnapshotFile = getTaskStateSnapshotFile(j);
        try {
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(new FileOutputStream(taskStateSnapshotFile));
            Throwable th = null;
            try {
                try {
                    objectOutputStream.writeObject(taskStateSnapshot);
                    LOG.debug("Successfully written local task state snapshot file {} for checkpoint {}.", taskStateSnapshotFile, Long.valueOf(j));
                    if (objectOutputStream != null) {
                        if (0 != 0) {
                            try {
                                objectOutputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            objectOutputStream.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (IOException e) {
            ExceptionUtils.rethrow(e, "Could not write the local task state snapshot file.");
        }
    }

    @VisibleForTesting
    File getTaskStateSnapshotFile(long j) {
        return new File(getCheckpointDirectory(j), TASK_STATE_SNAPSHOT_FILENAME);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public File getCheckpointDirectory(long j) {
        return getLocalRecoveryDirectoryProvider().subtaskSpecificCheckpointDirectory(j);
    }

    private void createFolderOrFail(File file) {
        if (!file.exists() && !file.mkdirs()) {
            throw new FlinkRuntimeException(String.format("Could not create the checkpoint directory '%s'", file));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public LocalSnapshotDirectoryProvider getLocalRecoveryDirectoryProvider() {
        return this.localRecoveryConfig.getLocalStateDirectoryProvider().orElseThrow(() -> {
            return new IllegalStateException("Local recovery must be enabled.");
        });
    }

    @Override // org.apache.flink.runtime.state.TaskLocalStateStore
    @Nullable
    public TaskStateSnapshot retrieveLocalState(long j) {
        TaskStateSnapshot loadTaskStateSnapshot;
        synchronized (this.lock) {
            loadTaskStateSnapshot = loadTaskStateSnapshot(j);
        }
        if (!this.localRecoveryConfig.isLocalRecoveryEnabled()) {
            LOG.debug("Local recovery is disabled for checkpoint {} in subtask ({} - {} - {})", new Object[]{Long.valueOf(j), this.jobID, this.jobVertexID, Integer.valueOf(this.subtaskIndex)});
            return null;
        }
        if (loadTaskStateSnapshot != null) {
            LOG.info("Found registered local state for checkpoint {} in subtask ({} - {} - {}) : {}", new Object[]{Long.valueOf(j), this.jobID, this.jobVertexID, Integer.valueOf(this.subtaskIndex), loadTaskStateSnapshot});
        } else {
            LOG.info("Did not find registered local state for checkpoint {} in subtask ({} - {} - {})", new Object[]{Long.valueOf(j), this.jobID, this.jobVertexID, Integer.valueOf(this.subtaskIndex)});
        }
        if (loadTaskStateSnapshot != NULL_DUMMY) {
            return loadTaskStateSnapshot;
        }
        return null;
    }

    @GuardedBy(Events.LOCK)
    @Nullable
    private TaskStateSnapshot loadTaskStateSnapshot(long j) {
        return this.storedTaskStateByCheckpointID.computeIfAbsent(Long.valueOf(j), (v1) -> {
            return tryLoadTaskStateSnapshotFromDisk(v1);
        });
    }

    @GuardedBy(Events.LOCK)
    @Nullable
    private TaskStateSnapshot tryLoadTaskStateSnapshotFromDisk(long j) {
        ObjectInputStream objectInputStream;
        Throwable th;
        File taskStateSnapshotFile = getTaskStateSnapshotFile(j);
        if (!taskStateSnapshotFile.exists()) {
            return null;
        }
        TaskStateSnapshot taskStateSnapshot = null;
        try {
            objectInputStream = new ObjectInputStream(new FileInputStream(taskStateSnapshotFile));
            th = null;
        } catch (IOException | ClassNotFoundException e) {
            LOG.debug("Could not read task state snapshot file {} for checkpoint {}. Deleting the corresponding local state.", taskStateSnapshotFile, Long.valueOf(j));
            discardLocalStateForCheckpoint(j, Optional.empty());
        }
        try {
            try {
                taskStateSnapshot = (TaskStateSnapshot) objectInputStream.readObject();
                LOG.debug("Loaded task state snapshot for checkpoint {} successfully from disk.", Long.valueOf(j));
                if (objectInputStream != null) {
                    if (0 != 0) {
                        try {
                            objectInputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        objectInputStream.close();
                    }
                }
                return taskStateSnapshot;
            } finally {
            }
        } finally {
        }
    }

    @Override // org.apache.flink.runtime.state.TaskLocalStateStore
    @Nonnull
    public LocalRecoveryConfig getLocalRecoveryConfig() {
        return this.localRecoveryConfig;
    }

    @Override // org.apache.flink.runtime.state.TaskLocalStateStore
    public void confirmCheckpoint(long j) {
        LOG.debug("Received confirmation for checkpoint {} in subtask ({} - {} - {}). Starting to prune history.", new Object[]{Long.valueOf(j), this.jobID, this.jobVertexID, Integer.valueOf(this.subtaskIndex)});
        pruneCheckpoints(j2 -> {
            return j2 < j;
        }, true);
    }

    @Override // org.apache.flink.runtime.state.TaskLocalStateStore
    public void abortCheckpoint(long j) {
        LOG.debug("Received abort information for checkpoint {} in subtask ({} - {} - {}). Starting to prune history.", new Object[]{Long.valueOf(j), this.jobID, this.jobVertexID, Integer.valueOf(this.subtaskIndex)});
        pruneCheckpoints(j2 -> {
            return j2 == j;
        }, false);
    }

    @Override // org.apache.flink.runtime.state.TaskLocalStateStore
    public void pruneMatchingCheckpoints(@Nonnull LongPredicate longPredicate) {
        pruneCheckpoints(longPredicate, false);
    }

    @Override // org.apache.flink.runtime.state.OwnedTaskLocalStateStore
    public CompletableFuture<Void> dispose() {
        Collection collection;
        synchronized (this.lock) {
            this.disposed = true;
            collection = (Collection) this.storedTaskStateByCheckpointID.entrySet().stream().map(entry -> {
                return Tuple2.of(entry.getKey(), entry.getValue());
            }).collect(Collectors.toList());
            this.storedTaskStateByCheckpointID.clear();
        }
        return CompletableFuture.runAsync(() -> {
            syncDiscardLocalStateForCollection(collection);
            for (int i = 0; i < getLocalRecoveryDirectoryProvider().allocationBaseDirsCount(); i++) {
                File selectSubtaskBaseDirectory = getLocalRecoveryDirectoryProvider().selectSubtaskBaseDirectory(i);
                try {
                    deleteDirectory(selectSubtaskBaseDirectory);
                } catch (IOException e) {
                    LOG.warn("Exception when deleting local recovery subtask base directory {} in subtask ({} - {} - {})", new Object[]{selectSubtaskBaseDirectory, this.jobID, this.jobVertexID, Integer.valueOf(this.subtaskIndex), e});
                }
            }
        }, this.discardExecutor);
    }

    private void asyncDiscardLocalStateForCollection(Collection<Tuple2<Long, TaskStateSnapshot>> collection) {
        if (collection.isEmpty()) {
            return;
        }
        this.discardExecutor.execute(() -> {
            syncDiscardLocalStateForCollection(collection);
        });
    }

    private void syncDiscardLocalStateForCollection(Collection<Tuple2<Long, TaskStateSnapshot>> collection) {
        for (Tuple2<Long, TaskStateSnapshot> tuple2 : collection) {
            discardLocalStateForCheckpoint(tuple2.f0.longValue(), Optional.of(tuple2.f1));
        }
    }

    private void discardLocalStateForCheckpoint(long j, Optional<TaskStateSnapshot> optional) {
        if (LOG.isTraceEnabled()) {
            LOG.trace("Discarding local task state snapshot of checkpoint {} for subtask ({} - {} - {}).", new Object[]{Long.valueOf(j), this.jobID, this.jobVertexID, Integer.valueOf(this.subtaskIndex)});
        } else {
            LOG.debug("Discarding local task state snapshot {} of checkpoint {} for subtask ({} - {} - {}).", new Object[]{optional, Long.valueOf(j), this.jobID, this.jobVertexID, Integer.valueOf(this.subtaskIndex)});
        }
        optional.ifPresent(taskStateSnapshot -> {
            try {
                taskStateSnapshot.discardState();
            } catch (Exception e) {
                LOG.warn("Exception while discarding local task state snapshot of checkpoint {} in subtask ({} - {} - {}).", new Object[]{Long.valueOf(j), this.jobID, this.jobVertexID, Integer.valueOf(this.subtaskIndex), e});
            }
        });
        File checkpointDirectory = getCheckpointDirectory(j);
        LOG.debug("Deleting local state directory {} of checkpoint {} for subtask ({} - {} - {}).", new Object[]{checkpointDirectory, Long.valueOf(j), this.jobID, this.jobVertexID, Integer.valueOf(this.subtaskIndex)});
        try {
            deleteDirectory(checkpointDirectory);
        } catch (IOException e) {
            LOG.warn("Exception while deleting local state directory of checkpoint {} in subtask ({} - {} - {}).", new Object[]{Long.valueOf(j), this.jobID, this.jobVertexID, Integer.valueOf(this.subtaskIndex), e});
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void deleteDirectory(File file) throws IOException {
        Path path = new Path(file.toURI());
        FileSystem fileSystem = path.getFileSystem();
        if (fileSystem.exists(path)) {
            fileSystem.delete(path, true);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void pruneCheckpoints(LongPredicate longPredicate, boolean z) {
        ArrayList arrayList = new ArrayList();
        synchronized (this.lock) {
            Iterator<Map.Entry<Long, TaskStateSnapshot>> it = this.storedTaskStateByCheckpointID.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<Long, TaskStateSnapshot> next = it.next();
                long longValue = next.getKey().longValue();
                if (longPredicate.test(longValue)) {
                    arrayList.add(Tuple2.of(Long.valueOf(longValue), next.getValue()));
                    it.remove();
                } else if (z) {
                    break;
                }
            }
        }
        asyncDiscardLocalStateForCollection(arrayList);
    }

    public String toString() {
        return "TaskLocalStateStore{jobID=" + this.jobID + ", jobVertexID=" + this.jobVertexID + ", allocationID=" + this.allocationID.toHexString() + ", subtaskIndex=" + this.subtaskIndex + ", localRecoveryConfig=" + this.localRecoveryConfig + ", storedCheckpointIDs=" + this.storedTaskStateByCheckpointID.keySet() + '}';
    }
}
