package org.apache.flink.streaming.runtime.io;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.util.StringUtils;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/io/BufferSpiller.class */
public class BufferSpiller {
    static final int HEADER_SIZE = 9;
    private static final AtomicInteger DIRECTORY_INDEX = new AtomicInteger(0);
    private static final int READ_BUFFER_SIZE = 1048576;
    private final File tempDir;
    private final String spillFilePrefix;
    private final ByteBuffer readBuffer = ByteBuffer.allocateDirect(1048576);
    private final ByteBuffer headBuffer;
    private final ByteBuffer[] sources;
    private File currentSpillFile;
    private FileChannel currentChannel;
    private final int pageSize;
    private int fileCounter;
    private long bytesWritten;

    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/BufferSpiller$SpilledBufferOrEventSequence.class */
    public static class SpilledBufferOrEventSequence {
        private static final int HEADER_LENGTH = 9;
        private final File file;
        private final FileChannel fileChannel;
        private final ByteBuffer buffer;
        private final long size;
        private final int pageSize;
        private boolean opened = false;

        SpilledBufferOrEventSequence(File file, FileChannel fileChannel, ByteBuffer byteBuffer, int i) throws IOException {
            this.file = file;
            this.fileChannel = fileChannel;
            this.buffer = byteBuffer;
            this.pageSize = i;
            this.size = fileChannel.size();
        }

        public void open() {
            if (this.opened) {
                return;
            }
            this.opened = true;
            this.buffer.position(0);
            this.buffer.limit(0);
        }

        public BufferOrEvent getNext() throws IOException {
            if (this.buffer.remaining() < 9) {
                this.buffer.compact();
                while (this.buffer.position() < 9) {
                    if (this.fileChannel.read(this.buffer) == -1) {
                        if (this.buffer.position() == 0) {
                            return null;
                        }
                        throw new IOException("Found trailing incomplete buffer or event");
                    }
                }
                this.buffer.flip();
            }
            int i = this.buffer.getInt();
            int i2 = this.buffer.getInt();
            if (!(this.buffer.get() == 0)) {
                if (i2 > this.buffer.capacity() - 9) {
                    throw new IOException("Event is too large");
                }
                if (this.buffer.remaining() < i2) {
                    this.buffer.compact();
                    while (this.buffer.position() < i2) {
                        if (this.fileChannel.read(this.buffer) == -1) {
                            throw new IOException("Found trailing incomplete event");
                        }
                    }
                    this.buffer.flip();
                }
                int limit = this.buffer.limit();
                this.buffer.limit(this.buffer.position() + i2);
                AbstractEvent fromSerializedEvent = EventSerializer.fromSerializedEvent(this.buffer, getClass().getClassLoader());
                this.buffer.limit(limit);
                return new BufferOrEvent(fromSerializedEvent, i);
            }
            if (i2 > this.pageSize) {
                throw new IOException(String.format("Spilled buffer (%d bytes) is larger than page size of (%d bytes)", Integer.valueOf(i2), Integer.valueOf(this.pageSize)));
            }
            MemorySegment allocateUnpooledSegment = MemorySegmentFactory.allocateUnpooledSegment(this.pageSize);
            int i3 = 0;
            int i4 = i2;
            while (true) {
                int min = Math.min(this.buffer.remaining(), i4);
                if (min > 0) {
                    allocateUnpooledSegment.put(i3, this.buffer, min);
                    i3 += min;
                    i4 -= min;
                }
                if (i4 == 0) {
                    Buffer buffer = new Buffer(allocateUnpooledSegment, FreeingBufferRecycler.INSTANCE);
                    buffer.setSize(i2);
                    return new BufferOrEvent(buffer, i);
                }
                this.buffer.clear();
                if (this.fileChannel.read(this.buffer) == -1) {
                    throw new IOException("Found trailing incomplete buffer");
                }
                this.buffer.flip();
            }
        }

        public void cleanup() throws IOException {
            this.fileChannel.close();
            if (!this.file.delete()) {
                throw new IOException("Cannot remove temp file for stream alignment writer");
            }
        }

        public long size() throws IOException {
            return this.size;
        }
    }

    public BufferSpiller(IOManager iOManager, int i) throws IOException {
        this.pageSize = i;
        this.readBuffer.order(ByteOrder.LITTLE_ENDIAN);
        this.headBuffer = ByteBuffer.allocateDirect(16);
        this.headBuffer.order(ByteOrder.LITTLE_ENDIAN);
        this.sources = new ByteBuffer[]{this.headBuffer, null};
        File[] spillingDirectories = iOManager.getSpillingDirectories();
        this.tempDir = spillingDirectories[DIRECTORY_INDEX.getAndIncrement() % spillingDirectories.length];
        byte[] bArr = new byte[32];
        new Random().nextBytes(bArr);
        this.spillFilePrefix = StringUtils.byteToHexString(bArr) + '.';
        createSpillingChannel();
    }

    public void add(BufferOrEvent bufferOrEvent) throws IOException {
        ByteBuffer serializedEvent;
        try {
            if (bufferOrEvent.isBuffer()) {
                Buffer buffer = bufferOrEvent.getBuffer();
                serializedEvent = buffer.getMemorySegment().wrap(0, buffer.getSize());
            } else {
                serializedEvent = EventSerializer.toSerializedEvent(bufferOrEvent.getEvent());
            }
            this.headBuffer.clear();
            this.headBuffer.putInt(bufferOrEvent.getChannelIndex());
            this.headBuffer.putInt(serializedEvent.remaining());
            this.headBuffer.put((byte) (bufferOrEvent.isBuffer() ? 0 : 1));
            this.headBuffer.flip();
            this.bytesWritten += this.headBuffer.remaining() + serializedEvent.remaining();
            this.sources[1] = serializedEvent;
            this.currentChannel.write(this.sources);
            if (bufferOrEvent.isBuffer()) {
                bufferOrEvent.getBuffer().recycle();
            }
        } catch (Throwable th) {
            if (bufferOrEvent.isBuffer()) {
                bufferOrEvent.getBuffer().recycle();
            }
            throw th;
        }
    }

    public SpilledBufferOrEventSequence rollOver() throws IOException {
        return rollOverInternal(false);
    }

    public SpilledBufferOrEventSequence rollOverWithNewBuffer() throws IOException {
        return rollOverInternal(true);
    }

    private SpilledBufferOrEventSequence rollOverInternal(boolean z) throws IOException {
        ByteBuffer byteBuffer;
        if (this.bytesWritten == 0) {
            return null;
        }
        if (z) {
            byteBuffer = ByteBuffer.allocateDirect(1048576);
            byteBuffer.order(ByteOrder.LITTLE_ENDIAN);
        } else {
            byteBuffer = this.readBuffer;
        }
        this.currentChannel.position(0L);
        SpilledBufferOrEventSequence spilledBufferOrEventSequence = new SpilledBufferOrEventSequence(this.currentSpillFile, this.currentChannel, byteBuffer, this.pageSize);
        createSpillingChannel();
        this.bytesWritten = 0L;
        return spilledBufferOrEventSequence;
    }

    public void close() throws IOException {
        this.currentChannel.close();
        if (!this.currentSpillFile.delete()) {
            throw new IOException("Cannot delete spill file");
        }
    }

    public long getBytesWritten() {
        return this.bytesWritten;
    }

    File getCurrentSpillFile() {
        return this.currentSpillFile;
    }

    FileChannel getCurrentChannel() {
        return this.currentChannel;
    }

    private void createSpillingChannel() throws IOException {
        File file = this.tempDir;
        StringBuilder append = new StringBuilder().append(this.spillFilePrefix);
        int i = this.fileCounter;
        this.fileCounter = i + 1;
        this.currentSpillFile = new File(file, append.append(i).append(".buffer").toString());
        this.currentChannel = new RandomAccessFile(this.currentSpillFile, "rw").getChannel();
    }
}
