package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentProvider;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferListener;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/consumer/BufferManager.class */
public class BufferManager implements BufferListener, BufferRecycler {
    private final AvailableBufferQueue bufferQueue = new AvailableBufferQueue();
    private final MemorySegmentProvider globalPool;
    private final InputChannel inputChannel;

    @GuardedBy("bufferQueue")
    private boolean isWaitingForFloatingBuffers;

    @GuardedBy("bufferQueue")
    private int numRequiredBuffers;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/consumer/BufferManager$AvailableBufferQueue.class */
    public static final class AvailableBufferQueue {
        final ArrayDeque<Buffer> exclusiveBuffers = new ArrayDeque<>();
        final ArrayDeque<Buffer> floatingBuffers = new ArrayDeque<>();

        AvailableBufferQueue() {
        }

        @Nullable
        Buffer addExclusiveBuffer(Buffer buffer, int i) {
            this.exclusiveBuffers.add(buffer);
            if (getAvailableBufferSize() > i) {
                return this.floatingBuffers.poll();
            }
            return null;
        }

        void addFloatingBuffer(Buffer buffer) {
            this.floatingBuffers.add(buffer);
        }

        @Nullable
        Buffer takeBuffer() {
            return this.floatingBuffers.size() > 0 ? this.floatingBuffers.poll() : this.exclusiveBuffers.poll();
        }

        void releaseAll(List<MemorySegment> list) {
            while (true) {
                Buffer poll = this.floatingBuffers.poll();
                if (poll == null) {
                    break;
                } else {
                    poll.recycleBuffer();
                }
            }
            while (true) {
                Buffer poll2 = this.exclusiveBuffers.poll();
                if (poll2 == null) {
                    return;
                } else {
                    list.add(poll2.getMemorySegment());
                }
            }
        }

        void releaseFloatingBuffers() {
            while (true) {
                Buffer poll = this.floatingBuffers.poll();
                if (poll == null) {
                    return;
                } else {
                    poll.recycleBuffer();
                }
            }
        }

        int getAvailableBufferSize() {
            return this.floatingBuffers.size() + this.exclusiveBuffers.size();
        }
    }

    public BufferManager(MemorySegmentProvider memorySegmentProvider, InputChannel inputChannel, int i) {
        this.globalPool = (MemorySegmentProvider) Preconditions.checkNotNull(memorySegmentProvider);
        this.inputChannel = (InputChannel) Preconditions.checkNotNull(inputChannel);
        Preconditions.checkArgument(i >= 0);
        this.numRequiredBuffers = i;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Nullable
    public Buffer requestBuffer() {
        Buffer takeBuffer;
        synchronized (this.bufferQueue) {
            takeBuffer = this.bufferQueue.takeBuffer();
        }
        return takeBuffer;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Buffer requestBufferBlocking() throws InterruptedException {
        synchronized (this.bufferQueue) {
            while (true) {
                Buffer takeBuffer = this.bufferQueue.takeBuffer();
                Buffer buffer = takeBuffer;
                if (takeBuffer != null) {
                    return buffer;
                }
                if (this.inputChannel.isReleased()) {
                    throw new CancelTaskException("Input channel [" + this.inputChannel.channelInfo + "] has already been released.");
                }
                if (!this.isWaitingForFloatingBuffers) {
                    BufferPool bufferPool = this.inputChannel.inputGate.getBufferPool();
                    buffer = bufferPool.requestBuffer();
                    if (buffer == null && shouldContinueRequest(bufferPool)) {
                    }
                }
                if (buffer != null) {
                    return buffer;
                }
                this.bufferQueue.wait();
            }
        }
    }

    private boolean shouldContinueRequest(BufferPool bufferPool) {
        if (bufferPool.addBufferListener(this)) {
            this.isWaitingForFloatingBuffers = true;
            this.numRequiredBuffers = 1;
            return false;
        }
        if (bufferPool.isDestroyed()) {
            throw new CancelTaskException("Local buffer pool has already been released.");
        }
        return true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void requestExclusiveBuffers(int i) throws IOException {
        Collection<MemorySegment> requestMemorySegments = this.globalPool.requestMemorySegments(i);
        Preconditions.checkArgument(!requestMemorySegments.isEmpty(), "The number of exclusive buffers per channel should be larger than 0.");
        synchronized (this.bufferQueue) {
            Preconditions.checkState(unsynchronizedGetFloatingBuffersAvailable() == 0, "Bug in buffer allocation logic: floating buffer is allocated before exclusive buffers are initialized.");
            Iterator<MemorySegment> it = requestMemorySegments.iterator();
            while (it.hasNext()) {
                this.bufferQueue.addExclusiveBuffer(new NetworkBuffer(it.next(), this), this.numRequiredBuffers);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int requestFloatingBuffers(int i) {
        int i2 = 0;
        synchronized (this.bufferQueue) {
            if (this.inputChannel.isReleased()) {
                return 0;
            }
            this.numRequiredBuffers = i;
            while (true) {
                if (this.bufferQueue.getAvailableBufferSize() >= this.numRequiredBuffers || this.isWaitingForFloatingBuffers) {
                    break;
                }
                BufferPool bufferPool = this.inputChannel.inputGate.getBufferPool();
                Buffer requestBuffer = bufferPool.requestBuffer();
                if (requestBuffer != null) {
                    this.bufferQueue.addFloatingBuffer(requestBuffer);
                    i2++;
                } else if (bufferPool.addBufferListener(this)) {
                    this.isWaitingForFloatingBuffers = true;
                    break;
                }
            }
            return i2;
        }
    }

    @Override // org.apache.flink.runtime.io.network.buffer.BufferRecycler
    public void recycle(MemorySegment memorySegment) {
        Buffer buffer = null;
        synchronized (this.bufferQueue) {
            try {
                try {
                } catch (Throwable th) {
                    this.bufferQueue.notifyAll();
                    throw th;
                }
            } catch (Throwable th2) {
                ExceptionUtils.rethrow(th2);
                this.bufferQueue.notifyAll();
            }
            if (this.inputChannel.isReleased()) {
                this.globalPool.recycleMemorySegments(Collections.singletonList(memorySegment));
                this.bufferQueue.notifyAll();
                return;
            }
            buffer = this.bufferQueue.addExclusiveBuffer(new NetworkBuffer(memorySegment, this), this.numRequiredBuffers);
            this.bufferQueue.notifyAll();
            if (buffer != null) {
                buffer.recycleBuffer();
                return;
            }
            try {
                this.inputChannel.notifyBufferAvailable(1);
            } catch (Throwable th3) {
                ExceptionUtils.rethrow(th3);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void releaseFloatingBuffers() {
        synchronized (this.bufferQueue) {
            this.numRequiredBuffers = 0;
            this.bufferQueue.releaseFloatingBuffers();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void releaseAllBuffers(ArrayDeque<Buffer> arrayDeque) throws IOException {
        ArrayList arrayList = new ArrayList();
        Exception exc = null;
        while (true) {
            Buffer poll = arrayDeque.poll();
            if (poll != null) {
                try {
                    if (poll.getRecycler() == this) {
                        arrayList.add(poll.getMemorySegment());
                    } else {
                        poll.recycleBuffer();
                    }
                } catch (Exception e) {
                    exc = (Exception) ExceptionUtils.firstOrSuppressed(e, exc);
                }
            } else {
                try {
                    break;
                } catch (Exception e2) {
                    exc = (Exception) ExceptionUtils.firstOrSuppressed(e2, exc);
                }
            }
        }
        synchronized (this.bufferQueue) {
            this.bufferQueue.releaseAll(arrayList);
            this.bufferQueue.notifyAll();
        }
        try {
            if (arrayList.size() > 0) {
                this.globalPool.recycleMemorySegments(arrayList);
            }
        } catch (Exception e3) {
            exc = (Exception) ExceptionUtils.firstOrSuppressed(e3, exc);
        }
        if (exc != null) {
            if (!(exc instanceof IOException)) {
                throw new IOException(exc);
            }
        }
    }

    @Override // org.apache.flink.runtime.io.network.buffer.BufferListener
    public BufferListener.NotificationResult notifyBufferAvailable(Buffer buffer) {
        BufferListener.NotificationResult notificationResult = BufferListener.NotificationResult.BUFFER_NOT_USED;
        if (this.inputChannel.isReleased()) {
            return notificationResult;
        }
        try {
        } catch (Throwable th) {
            this.inputChannel.setError(th);
        }
        synchronized (this.bufferQueue) {
            Preconditions.checkState(this.isWaitingForFloatingBuffers, "This channel should be waiting for floating buffers.");
            if (this.inputChannel.isReleased() || this.bufferQueue.getAvailableBufferSize() >= this.numRequiredBuffers) {
                this.isWaitingForFloatingBuffers = false;
                return notificationResult;
            }
            this.bufferQueue.addFloatingBuffer(buffer);
            this.bufferQueue.notifyAll();
            if (this.bufferQueue.getAvailableBufferSize() == this.numRequiredBuffers) {
                this.isWaitingForFloatingBuffers = false;
                notificationResult = BufferListener.NotificationResult.BUFFER_USED_NO_NEED_MORE;
            } else {
                notificationResult = BufferListener.NotificationResult.BUFFER_USED_NEED_MORE;
            }
            if (notificationResult != BufferListener.NotificationResult.BUFFER_NOT_USED) {
                this.inputChannel.notifyBufferAvailable(1);
            }
            return notificationResult;
        }
    }

    @Override // org.apache.flink.runtime.io.network.buffer.BufferListener
    public void notifyBufferDestroyed() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public int unsynchronizedGetNumberOfRequiredBuffers() {
        return this.numRequiredBuffers;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public boolean unsynchronizedIsWaitingForFloatingBuffers() {
        return this.isWaitingForFloatingBuffers;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public int getNumberOfAvailableBuffers() {
        int availableBufferSize;
        synchronized (this.bufferQueue) {
            availableBufferSize = this.bufferQueue.getAvailableBufferSize();
        }
        return availableBufferSize;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int unsynchronizedGetAvailableExclusiveBuffers() {
        return this.bufferQueue.exclusiveBuffers.size();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int unsynchronizedGetFloatingBuffersAvailable() {
        return this.bufferQueue.floatingBuffers.size();
    }
}
