package org.apache.flink.runtime.state.filesystem;

import java.io.IOException;
import java.net.URI;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.ConfigurableStateBackend;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TernaryBoolean;
import org.slf4j.LoggerFactory;

@PublicEvolving
/* loaded from: input_file:org/apache/flink/runtime/state/filesystem/FsStateBackend.class */
public class FsStateBackend extends AbstractFileStateBackend implements ConfigurableStateBackend {
    private static final long serialVersionUID = -8191916350224044011L;
    private static final int MAX_FILE_STATE_THRESHOLD = 1048576;
    private final int fileStateThreshold;
    private final TernaryBoolean asynchronousSnapshots;

    public FsStateBackend(String str) {
        this(new Path(str));
    }

    public FsStateBackend(String str, boolean z) {
        this(new Path(str), z);
    }

    public FsStateBackend(Path path) {
        this(path.toUri());
    }

    public FsStateBackend(Path path, boolean z) {
        this(path.toUri(), z);
    }

    public FsStateBackend(URI uri) {
        this(uri, null, -1, TernaryBoolean.UNDEFINED);
    }

    public FsStateBackend(URI uri, @Nullable URI uri2) {
        this(uri, uri2, -1, TernaryBoolean.UNDEFINED);
    }

    public FsStateBackend(URI uri, boolean z) {
        this(uri, null, -1, TernaryBoolean.fromBoolean(z));
    }

    public FsStateBackend(URI uri, int i) {
        this(uri, null, i, TernaryBoolean.UNDEFINED);
    }

    public FsStateBackend(URI uri, int i, boolean z) {
        this(uri, null, i, TernaryBoolean.fromBoolean(z));
    }

    public FsStateBackend(URI uri, @Nullable URI uri2, int i, TernaryBoolean ternaryBoolean) {
        super((URI) Preconditions.checkNotNull(uri, "checkpoint directory is null"), uri2);
        Preconditions.checkNotNull(ternaryBoolean, "asynchronousSnapshots");
        Preconditions.checkArgument(i >= -1 && i <= 1048576, "The threshold for file state size must be in [-1, %s], where '-1' means to use the value from the deployment's configuration.", 1048576);
        this.fileStateThreshold = i;
        this.asynchronousSnapshots = ternaryBoolean;
    }

    private FsStateBackend(FsStateBackend fsStateBackend, Configuration configuration) {
        super(fsStateBackend.getCheckpointPath(), fsStateBackend.getSavepointPath(), configuration);
        this.asynchronousSnapshots = fsStateBackend.asynchronousSnapshots.resolveUndefined(configuration.getBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS));
        int integer = fsStateBackend.fileStateThreshold >= 0 ? fsStateBackend.fileStateThreshold : configuration.getInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD);
        if (integer >= 0 && integer <= 1048576) {
            this.fileStateThreshold = integer;
        } else {
            this.fileStateThreshold = CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue().intValue();
            LoggerFactory.getLogger(AbstractFileStateBackend.class).warn("Ignoring invalid file size threshold value ({}): {} - using default value {} instead.", new Object[]{CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.key(), Integer.valueOf(integer), CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue()});
        }
    }

    @Deprecated
    public Path getBasePath() {
        return getCheckpointPath();
    }

    @Override // org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend
    @Nonnull
    public Path getCheckpointPath() {
        return super.getCheckpointPath();
    }

    public int getMinFileSizeThreshold() {
        return this.fileStateThreshold >= 0 ? this.fileStateThreshold : CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue().intValue();
    }

    public boolean isUsingAsynchronousSnapshots() {
        return this.asynchronousSnapshots.getOrDefault(CheckpointingOptions.ASYNC_SNAPSHOTS.defaultValue().booleanValue());
    }

    @Override // org.apache.flink.runtime.state.ConfigurableStateBackend
    public FsStateBackend configure(Configuration configuration) {
        return new FsStateBackend(this, configuration);
    }

    @Override // org.apache.flink.runtime.state.StateBackend
    public CheckpointStorage createCheckpointStorage(JobID jobID) throws IOException {
        Preconditions.checkNotNull(jobID, "jobId");
        return new FsCheckpointStorage(getCheckpointPath(), getSavepointPath(), jobID, getMinFileSizeThreshold());
    }

    @Override // org.apache.flink.runtime.state.AbstractStateBackend, org.apache.flink.runtime.state.StateBackend
    public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment environment, JobID jobID, String str, TypeSerializer<K> typeSerializer, int i, KeyGroupRange keyGroupRange, TaskKvStateRegistry taskKvStateRegistry, TtlTimeProvider ttlTimeProvider) {
        return new HeapKeyedStateBackend(taskKvStateRegistry, typeSerializer, environment.getUserClassLoader(), i, keyGroupRange, isUsingAsynchronousSnapshots(), environment.getExecutionConfig(), environment.getTaskStateManager().createLocalRecoveryConfig(), new HeapPriorityQueueSetFactory(keyGroupRange, i, 128), ttlTimeProvider);
    }

    @Override // org.apache.flink.runtime.state.AbstractStateBackend, org.apache.flink.runtime.state.StateBackend
    public OperatorStateBackend createOperatorStateBackend(Environment environment, String str) {
        return new DefaultOperatorStateBackend(environment.getUserClassLoader(), environment.getExecutionConfig(), isUsingAsynchronousSnapshots());
    }

    public String toString() {
        return "File State Backend (checkpoints: '" + getCheckpointPath() + "', savepoints: '" + getSavepointPath() + "', asynchronous: " + this.asynchronousSnapshots + ", fileStateThreshold: " + this.fileStateThreshold + ")";
    }
}
