package org.apache.flink.runtime.state;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.UUID;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.filesystem.FileBasedStateOutputStream;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.class */
public interface CheckpointStreamWithResultProvider extends Closeable {
    public static final Logger LOG = LoggerFactory.getLogger(CheckpointStreamWithResultProvider.class);

    @FunctionalInterface
    /* loaded from: input_file:org/apache/flink/runtime/state/CheckpointStreamWithResultProvider$KeyedStateHandleFactory.class */
    public interface KeyedStateHandleFactory {
        KeyedStateHandle create(KeyGroupRangeOffsets keyGroupRangeOffsets, StreamStateHandle streamStateHandle);
    }

    /* loaded from: input_file:org/apache/flink/runtime/state/CheckpointStreamWithResultProvider$PrimaryAndSecondaryStream.class */
    public static class PrimaryAndSecondaryStream implements CheckpointStreamWithResultProvider {
        private static final Logger LOG = LoggerFactory.getLogger(PrimaryAndSecondaryStream.class);

        @Nonnull
        private final DuplicatingCheckpointOutputStream outputStream;

        public PrimaryAndSecondaryStream(@Nonnull CheckpointStateOutputStream checkpointStateOutputStream, CheckpointStateOutputStream checkpointStateOutputStream2) throws IOException {
            this(new DuplicatingCheckpointOutputStream(checkpointStateOutputStream, checkpointStateOutputStream2));
        }

        PrimaryAndSecondaryStream(@Nonnull DuplicatingCheckpointOutputStream duplicatingCheckpointOutputStream) {
            this.outputStream = duplicatingCheckpointOutputStream;
        }

        @Override // org.apache.flink.runtime.state.CheckpointStreamWithResultProvider
        @Nonnull
        public SnapshotResult<StreamStateHandle> closeAndFinalizeCheckpointStreamResult() throws IOException {
            try {
                StreamStateHandle closeAndGetPrimaryHandle = this.outputStream.closeAndGetPrimaryHandle();
                StreamStateHandle streamStateHandle = null;
                try {
                    streamStateHandle = this.outputStream.closeAndGetSecondaryHandle();
                } catch (IOException e) {
                    LOG.warn("Exception from secondary/local checkpoint stream.", e);
                }
                return closeAndGetPrimaryHandle != null ? streamStateHandle != null ? SnapshotResult.withLocalState(closeAndGetPrimaryHandle, streamStateHandle) : SnapshotResult.of(closeAndGetPrimaryHandle) : SnapshotResult.empty();
            } catch (IOException e2) {
                e = e2;
                try {
                    this.outputStream.close();
                } catch (IOException e3) {
                    e = (IOException) ExceptionUtils.firstOrSuppressed(e3, e);
                }
                throw e;
            }
        }

        @Override // org.apache.flink.runtime.state.CheckpointStreamWithResultProvider
        @Nonnull
        public DuplicatingCheckpointOutputStream getCheckpointOutputStream() {
            return this.outputStream;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/state/CheckpointStreamWithResultProvider$PrimaryStreamOnly.class */
    public static class PrimaryStreamOnly implements CheckpointStreamWithResultProvider {

        @Nonnull
        private final CheckpointStateOutputStream outputStream;

        public PrimaryStreamOnly(@Nonnull CheckpointStateOutputStream checkpointStateOutputStream) {
            this.outputStream = checkpointStateOutputStream;
        }

        @Override // org.apache.flink.runtime.state.CheckpointStreamWithResultProvider
        @Nonnull
        public SnapshotResult<StreamStateHandle> closeAndFinalizeCheckpointStreamResult() throws IOException {
            return SnapshotResult.of(this.outputStream.closeAndGetHandle());
        }

        @Override // org.apache.flink.runtime.state.CheckpointStreamWithResultProvider
        @Nonnull
        public CheckpointStateOutputStream getCheckpointOutputStream() {
            return this.outputStream;
        }
    }

    @Nonnull
    SnapshotResult<StreamStateHandle> closeAndFinalizeCheckpointStreamResult() throws IOException;

    @Nonnull
    CheckpointStateOutputStream getCheckpointOutputStream();

    @Override // java.io.Closeable, java.lang.AutoCloseable
    default void close() throws IOException {
        getCheckpointOutputStream().close();
    }

    @Nonnull
    static CheckpointStreamWithResultProvider createSimpleStream(@Nonnull CheckpointedStateScope checkpointedStateScope, @Nonnull CheckpointStreamFactory checkpointStreamFactory) throws IOException {
        return new PrimaryStreamOnly(checkpointStreamFactory.createCheckpointStateOutputStream(checkpointedStateScope));
    }

    @Nonnull
    static CheckpointStreamWithResultProvider createDuplicatingStream(@Nonnegative long j, @Nonnull CheckpointedStateScope checkpointedStateScope, @Nonnull CheckpointStreamFactory checkpointStreamFactory, @Nonnull LocalRecoveryDirectoryProvider localRecoveryDirectoryProvider) throws IOException {
        CheckpointStateOutputStream createCheckpointStateOutputStream = checkpointStreamFactory.createCheckpointStateOutputStream(checkpointedStateScope);
        try {
            Path path = new Path(new File(localRecoveryDirectoryProvider.subtaskSpecificCheckpointDirectory(j), String.valueOf(UUID.randomUUID())).toURI());
            return new PrimaryAndSecondaryStream(createCheckpointStateOutputStream, new FileBasedStateOutputStream(path.getFileSystem(), path));
        } catch (IOException e) {
            LOG.warn("Exception when opening secondary/local checkpoint output stream. Continue only with the primary stream.", e);
            return new PrimaryStreamOnly(createCheckpointStateOutputStream);
        }
    }

    @Nonnull
    static SnapshotResult<KeyedStateHandle> toKeyedStateHandleSnapshotResult(@Nonnull SnapshotResult<StreamStateHandle> snapshotResult, @Nonnull KeyGroupRangeOffsets keyGroupRangeOffsets, @Nonnull KeyedStateHandleFactory keyedStateHandleFactory) {
        StreamStateHandle jobManagerOwnedSnapshot = snapshotResult.getJobManagerOwnedSnapshot();
        if (jobManagerOwnedSnapshot == null) {
            return SnapshotResult.empty();
        }
        KeyedStateHandle create = keyedStateHandleFactory.create(keyGroupRangeOffsets, jobManagerOwnedSnapshot);
        StreamStateHandle taskLocalSnapshot = snapshotResult.getTaskLocalSnapshot();
        return taskLocalSnapshot != null ? SnapshotResult.withLocalState(create, keyedStateHandleFactory.create(keyGroupRangeOffsets, taskLocalSnapshot)) : SnapshotResult.of(create);
    }
}
