package org.apache.flink.runtime.state;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Objects;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.SnapshotStrategy;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.function.SupplierWithException;

/* loaded from: input_file:org/apache/flink/runtime/state/FullSnapshotAsyncWriter.class */
public class FullSnapshotAsyncWriter<K> implements SnapshotStrategy.SnapshotResultSupplier<KeyedStateHandle> {

    @Nonnull
    private final SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier;

    @Nonnull
    private final FullSnapshotResources<K> snapshotResources;

    @Nonnull
    private final CheckpointType checkpointType;
    static final /* synthetic */ boolean $assertionsDisabled;

    public FullSnapshotAsyncWriter(@Nonnull CheckpointType checkpointType, @Nonnull SupplierWithException<CheckpointStreamWithResultProvider, Exception> supplierWithException, @Nonnull FullSnapshotResources<K> fullSnapshotResources) {
        this.checkpointStreamSupplier = supplierWithException;
        this.snapshotResources = fullSnapshotResources;
        this.checkpointType = checkpointType;
    }

    @Override // org.apache.flink.runtime.state.SnapshotStrategy.SnapshotResultSupplier
    public SnapshotResult<KeyedStateHandle> get(CloseableRegistry closeableRegistry) throws Exception {
        KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(this.snapshotResources.getKeyGroupRange());
        CheckpointStreamWithResultProvider checkpointStreamWithResultProvider = this.checkpointStreamSupplier.get();
        closeableRegistry.registerCloseable(checkpointStreamWithResultProvider);
        writeSnapshotToOutputStream(checkpointStreamWithResultProvider, keyGroupRangeOffsets);
        if (closeableRegistry.unregisterCloseable(checkpointStreamWithResultProvider)) {
            return CheckpointStreamWithResultProvider.toKeyedStateHandleSnapshotResult(checkpointStreamWithResultProvider.closeAndFinalizeCheckpointStreamResult(), keyGroupRangeOffsets, this.checkpointType.isSavepoint() ? KeyGroupsSavepointStateHandle::new : KeyGroupsStateHandle::new);
        }
        throw new IOException("Stream is already unregistered/closed.");
    }

    private void writeSnapshotToOutputStream(@Nonnull CheckpointStreamWithResultProvider checkpointStreamWithResultProvider, @Nonnull KeyGroupRangeOffsets keyGroupRangeOffsets) throws IOException, InterruptedException {
        writeKVStateMetaData(new DataOutputViewStreamWrapper(checkpointStreamWithResultProvider.getCheckpointOutputStream()));
        KeyValueStateIterator createKVStateIterator = this.snapshotResources.createKVStateIterator();
        Throwable th = null;
        try {
            try {
                writeKVStateData(createKVStateIterator, checkpointStreamWithResultProvider, keyGroupRangeOffsets);
                if (createKVStateIterator != null) {
                    if (0 == 0) {
                        createKVStateIterator.close();
                        return;
                    }
                    try {
                        createKVStateIterator.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createKVStateIterator != null) {
                if (th != null) {
                    try {
                        createKVStateIterator.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createKVStateIterator.close();
                }
            }
            throw th4;
        }
    }

    private void writeKVStateMetaData(DataOutputView dataOutputView) throws IOException {
        new KeyedBackendSerializationProxy(this.snapshotResources.getKeySerializer(), this.snapshotResources.getMetaInfoSnapshots(), !Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, this.snapshotResources.getStreamCompressionDecorator())).write(dataOutputView);
    }

    private void writeKVStateData(KeyValueStateIterator keyValueStateIterator, CheckpointStreamWithResultProvider checkpointStreamWithResultProvider, KeyGroupRangeOffsets keyGroupRangeOffsets) throws IOException, InterruptedException {
        byte[] bArr = null;
        byte[] bArr2 = null;
        DataOutputViewStreamWrapper dataOutputViewStreamWrapper = null;
        OutputStream outputStream = null;
        CheckpointStreamFactory.CheckpointStateOutputStream checkpointOutputStream = checkpointStreamWithResultProvider.getCheckpointOutputStream();
        try {
            if (keyValueStateIterator.isValid()) {
                keyGroupRangeOffsets.setKeyGroupOffset(keyValueStateIterator.keyGroup(), checkpointOutputStream.getPos());
                outputStream = this.snapshotResources.getStreamCompressionDecorator().decorateWithCompression(checkpointOutputStream);
                dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper(outputStream);
                dataOutputViewStreamWrapper.writeShort(keyValueStateIterator.kvStateId());
                bArr = keyValueStateIterator.key();
                bArr2 = keyValueStateIterator.value();
                keyValueStateIterator.next();
            }
            while (keyValueStateIterator.isValid()) {
                if (!$assertionsDisabled && FullSnapshotUtil.hasMetaDataFollowsFlag(bArr)) {
                    throw new AssertionError();
                }
                if (keyValueStateIterator.isNewKeyGroup() || keyValueStateIterator.isNewKeyValueState()) {
                    checkInterrupted();
                    FullSnapshotUtil.setMetaDataFollowsFlagInKey(bArr);
                }
                writeKeyValuePair(bArr, bArr2, dataOutputViewStreamWrapper);
                if (keyValueStateIterator.isNewKeyGroup()) {
                    dataOutputViewStreamWrapper.writeShort(65535);
                    outputStream.close();
                    keyGroupRangeOffsets.setKeyGroupOffset(keyValueStateIterator.keyGroup(), checkpointOutputStream.getPos());
                    outputStream = this.snapshotResources.getStreamCompressionDecorator().decorateWithCompression(checkpointOutputStream);
                    dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper(outputStream);
                    dataOutputViewStreamWrapper.writeShort(keyValueStateIterator.kvStateId());
                } else if (keyValueStateIterator.isNewKeyValueState()) {
                    dataOutputViewStreamWrapper.writeShort(keyValueStateIterator.kvStateId());
                }
                bArr = keyValueStateIterator.key();
                bArr2 = keyValueStateIterator.value();
                keyValueStateIterator.next();
            }
            if (bArr != null) {
                if (!$assertionsDisabled && FullSnapshotUtil.hasMetaDataFollowsFlag(bArr)) {
                    throw new AssertionError();
                }
                FullSnapshotUtil.setMetaDataFollowsFlagInKey(bArr);
                writeKeyValuePair(bArr, bArr2, dataOutputViewStreamWrapper);
                dataOutputViewStreamWrapper.writeShort(65535);
                outputStream.close();
                outputStream = null;
            }
            outputStream = outputStream;
        } finally {
            IOUtils.closeQuietly(null);
        }
    }

    private void writeKeyValuePair(byte[] bArr, byte[] bArr2, DataOutputView dataOutputView) throws IOException {
        BytePrimitiveArraySerializer.INSTANCE.serialize(bArr, dataOutputView);
        BytePrimitiveArraySerializer.INSTANCE.serialize(bArr2, dataOutputView);
    }

    private void checkInterrupted() throws InterruptedException {
        if (Thread.currentThread().isInterrupted()) {
            throw new InterruptedException("RocksDB snapshot interrupted.");
        }
    }

    static {
        $assertionsDisabled = !FullSnapshotAsyncWriter.class.desiredAssertionStatus();
    }
}
