package org.apache.flink.runtime.state;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.RunnableFuture;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.class */
public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext, Closeable {
    private final long checkpointId;
    private final long checkpointTimestamp;
    private final CheckpointStreamFactory streamFactory;
    private final KeyGroupRange keyGroupRange;
    private final CloseableRegistry closableRegistry;
    private KeyedStateCheckpointOutputStream keyedStateCheckpointOutputStream;
    private OperatorStateCheckpointOutputStream operatorStateCheckpointOutputStream;

    @VisibleForTesting
    public StateSnapshotContextSynchronousImpl(long j, long j2) {
        this.checkpointId = j;
        this.checkpointTimestamp = j2;
        this.streamFactory = null;
        this.keyGroupRange = KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
        this.closableRegistry = null;
    }

    public StateSnapshotContextSynchronousImpl(long j, long j2, CheckpointStreamFactory checkpointStreamFactory, KeyGroupRange keyGroupRange, CloseableRegistry closeableRegistry) {
        this.checkpointId = j;
        this.checkpointTimestamp = j2;
        this.streamFactory = (CheckpointStreamFactory) Preconditions.checkNotNull(checkpointStreamFactory);
        this.keyGroupRange = (KeyGroupRange) Preconditions.checkNotNull(keyGroupRange);
        this.closableRegistry = (CloseableRegistry) Preconditions.checkNotNull(closeableRegistry);
    }

    @Override // org.apache.flink.runtime.state.ManagedSnapshotContext
    public long getCheckpointId() {
        return this.checkpointId;
    }

    @Override // org.apache.flink.runtime.state.ManagedSnapshotContext
    public long getCheckpointTimestamp() {
        return this.checkpointTimestamp;
    }

    private CheckpointStreamFactory.CheckpointStateOutputStream openAndRegisterNewStream() throws Exception {
        CheckpointStreamFactory.CheckpointStateOutputStream createCheckpointStateOutputStream = this.streamFactory.createCheckpointStateOutputStream(this.checkpointId, this.checkpointTimestamp);
        this.closableRegistry.registerCloseable(createCheckpointStateOutputStream);
        return createCheckpointStateOutputStream;
    }

    @Override // org.apache.flink.runtime.state.StateSnapshotContext
    public KeyedStateCheckpointOutputStream getRawKeyedOperatorStateOutput() throws Exception {
        if (null == this.keyedStateCheckpointOutputStream) {
            Preconditions.checkState(this.keyGroupRange != KeyGroupRange.EMPTY_KEY_GROUP_RANGE, "Not a keyed operator");
            this.keyedStateCheckpointOutputStream = new KeyedStateCheckpointOutputStream(openAndRegisterNewStream(), this.keyGroupRange);
        }
        return this.keyedStateCheckpointOutputStream;
    }

    @Override // org.apache.flink.runtime.state.StateSnapshotContext
    public OperatorStateCheckpointOutputStream getRawOperatorStateOutput() throws Exception {
        if (null == this.operatorStateCheckpointOutputStream) {
            this.operatorStateCheckpointOutputStream = new OperatorStateCheckpointOutputStream(openAndRegisterNewStream());
        }
        return this.operatorStateCheckpointOutputStream;
    }

    public RunnableFuture<KeyedStateHandle> getKeyedStateStreamFuture() throws IOException {
        return new DoneFuture((KeyGroupsStateHandle) closeAndUnregisterStreamToObtainStateHandle(this.keyedStateCheckpointOutputStream));
    }

    public RunnableFuture<OperatorStateHandle> getOperatorStateStreamFuture() throws IOException {
        return new DoneFuture((OperatorStateHandle) closeAndUnregisterStreamToObtainStateHandle(this.operatorStateCheckpointOutputStream));
    }

    private <T extends StreamStateHandle> T closeAndUnregisterStreamToObtainStateHandle(NonClosingCheckpointOutputStream<T> nonClosingCheckpointOutputStream) throws IOException {
        if (null == nonClosingCheckpointOutputStream || !this.closableRegistry.unregisterCloseable(nonClosingCheckpointOutputStream.getDelegate())) {
            return null;
        }
        return nonClosingCheckpointOutputStream.closeAndGetHandle();
    }

    private <T extends StreamStateHandle> void closeAndUnregisterStream(NonClosingCheckpointOutputStream<T> nonClosingCheckpointOutputStream) throws IOException {
        Preconditions.checkNotNull(nonClosingCheckpointOutputStream);
        CheckpointStreamFactory.CheckpointStateOutputStream delegate = nonClosingCheckpointOutputStream.getDelegate();
        if (this.closableRegistry.unregisterCloseable(delegate)) {
            delegate.close();
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        IOException iOException = null;
        if (this.keyedStateCheckpointOutputStream != null) {
            try {
                closeAndUnregisterStream(this.keyedStateCheckpointOutputStream);
            } catch (IOException e) {
                iOException = (IOException) ExceptionUtils.firstOrSuppressed(new IOException("Could not close the raw keyed state checkpoint output stream.", e), null);
            }
        }
        if (this.operatorStateCheckpointOutputStream != null) {
            try {
                closeAndUnregisterStream(this.operatorStateCheckpointOutputStream);
            } catch (IOException e2) {
                iOException = (IOException) ExceptionUtils.firstOrSuppressed(new IOException("Could not close the raw operator state checkpoint output stream.", e2), iOException);
            }
        }
        if (iOException != null) {
            throw iOException;
        }
    }
}
