/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.highavailability;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashSet;
import java.util.NoSuchElementException;
import java.util.Set;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore;
import org.apache.flink.runtime.highavailability.JobResultEntry;
import org.apache.flink.runtime.highavailability.JobResultStoreOptions;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.rest.messages.json.JobResultDeserializer;
import org.apache.flink.runtime.rest.messages.json.JobResultSerializer;
import org.apache.flink.runtime.util.NonClosingOutputStreamDecorator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileSystemJobResultStore
extends AbstractThreadsafeJobResultStore {
    private static final Logger LOG = LoggerFactory.getLogger(FileSystemJobResultStore.class);
    @VisibleForTesting
    static final String FILE_EXTENSION = ".json";
    @VisibleForTesting
    static final String DIRTY_FILE_EXTENSION = "_DIRTY.json";
    private final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
    private final FileSystem fileSystem;
    private volatile boolean basePathCreated;
    private final Path basePath;
    private final boolean deleteOnCommit;

    @VisibleForTesting
    public static boolean hasValidDirtyJobResultStoreEntryExtension(String filename) {
        return filename.endsWith(DIRTY_FILE_EXTENSION);
    }

    @VisibleForTesting
    public static boolean hasValidJobResultStoreEntryExtension(String filename) {
        return filename.endsWith(FILE_EXTENSION);
    }

    @VisibleForTesting
    FileSystemJobResultStore(FileSystem fileSystem, Path basePath, boolean deleteOnCommit) {
        this.fileSystem = fileSystem;
        this.basePath = basePath;
        this.deleteOnCommit = deleteOnCommit;
    }

    public static FileSystemJobResultStore fromConfiguration(Configuration config) throws IOException {
        Path basePath;
        Preconditions.checkNotNull((Object)config);
        String jrsStoragePath = (String)config.get(JobResultStoreOptions.STORAGE_PATH);
        if (StringUtils.isNullOrWhitespaceOnly((String)jrsStoragePath)) {
            String haStoragePath = (String)config.get(HighAvailabilityOptions.HA_STORAGE_PATH);
            String haClusterId = (String)config.get(HighAvailabilityOptions.HA_CLUSTER_ID);
            basePath = new Path(FileSystemJobResultStore.createDefaultJobResultStorePath(haStoragePath, haClusterId));
        } else {
            basePath = new Path(jrsStoragePath);
        }
        boolean deleteOnCommit = (Boolean)config.get(JobResultStoreOptions.DELETE_ON_COMMIT);
        return new FileSystemJobResultStore(basePath.getFileSystem(), basePath, deleteOnCommit);
    }

    private void createBasePathIfNeeded() throws IOException {
        if (!this.basePathCreated) {
            LOG.info("Creating highly available job result storage directory at {}", (Object)this.basePath);
            this.fileSystem.mkdirs(this.basePath);
            LOG.info("Created highly available job result storage directory at {}", (Object)this.basePath);
            this.basePathCreated = true;
        }
    }

    public static String createDefaultJobResultStorePath(String baseDir, String clusterId) {
        return baseDir + "/job-result-store/" + clusterId;
    }

    private Path constructDirtyPath(JobID jobId) {
        return this.constructEntryPath(jobId.toString() + DIRTY_FILE_EXTENSION);
    }

    private Path constructCleanPath(JobID jobId) {
        return this.constructEntryPath(jobId.toString() + FILE_EXTENSION);
    }

    @VisibleForTesting
    Path constructEntryPath(String fileName) {
        return new Path(this.basePath, fileName);
    }

    @Override
    public void createDirtyResultInternal(JobResultEntry jobResultEntry) throws IOException {
        this.createBasePathIfNeeded();
        Path path = this.constructDirtyPath(jobResultEntry.getJobId());
        try (FSDataOutputStream os = this.fileSystem.create(path, FileSystem.WriteMode.NO_OVERWRITE);){
            this.mapper.writeValue((OutputStream)new NonClosingOutputStreamDecorator((OutputStream)os), (Object)new JsonJobResultEntry(jobResultEntry));
        }
    }

    @Override
    public void markResultAsCleanInternal(JobID jobId) throws IOException, NoSuchElementException {
        Path dirtyPath = this.constructDirtyPath(jobId);
        if (!this.fileSystem.exists(dirtyPath)) {
            throw new NoSuchElementException(String.format("Could not mark job %s as clean as it is not present in the job result store.", jobId));
        }
        if (this.deleteOnCommit) {
            this.fileSystem.delete(dirtyPath, false);
        } else {
            this.fileSystem.rename(dirtyPath, this.constructCleanPath(jobId));
        }
    }

    @Override
    public boolean hasDirtyJobResultEntryInternal(JobID jobId) throws IOException {
        return this.fileSystem.exists(this.constructDirtyPath(jobId));
    }

    @Override
    public boolean hasCleanJobResultEntryInternal(JobID jobId) throws IOException {
        return this.fileSystem.exists(this.constructCleanPath(jobId));
    }

    @Override
    public Set<JobResult> getDirtyResultsInternal() throws IOException {
        this.createBasePathIfNeeded();
        FileStatus[] statuses = this.fileSystem.listStatus(this.basePath);
        Preconditions.checkState((statuses != null ? 1 : 0) != 0, (Object)"The base directory of the JobResultStore isn't accessible. No dirty JobResults can be restored.");
        HashSet<JobResult> dirtyResults = new HashSet<JobResult>();
        for (FileStatus s : statuses) {
            if (s.isDir() || !FileSystemJobResultStore.hasValidDirtyJobResultStoreEntryExtension(s.getPath().getName())) continue;
            JsonJobResultEntry jre = (JsonJobResultEntry)this.mapper.readValue((InputStream)this.fileSystem.open(s.getPath()), JsonJobResultEntry.class);
            dirtyResults.add(jre.getJobResult());
        }
        return dirtyResults;
    }

    @JsonIgnoreProperties(value={"version"}, allowGetters=true)
    @VisibleForTesting
    static class JsonJobResultEntry
    extends JobResultEntry {
        private static final String FIELD_NAME_RESULT = "result";
        static final String FIELD_NAME_VERSION = "version";

        private JsonJobResultEntry(JobResultEntry entry) {
            this(entry.getJobResult());
        }

        @JsonCreator
        private JsonJobResultEntry(@JsonProperty(value="result") JobResult jobResult) {
            super(jobResult);
        }

        @Override
        @JsonProperty(value="result")
        @JsonSerialize(using=JobResultSerializer.class)
        @JsonDeserialize(using=JobResultDeserializer.class)
        public JobResult getJobResult() {
            return super.getJobResult();
        }

        @Override
        @JsonIgnore
        public JobID getJobId() {
            return super.getJobId();
        }

        public int getVersion() {
            return 1;
        }
    }
}

