package org.apache.flink.runtime.io.network.netty;

import java.io.IOException;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.io.network.NetworkSequenceViewReader;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.PartitionRequestListener;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.class */
class CreditBasedSequenceNumberingViewReader implements BufferAvailabilityListener, NetworkSequenceViewReader {
    private final InputChannelID receiverId;
    private final PartitionRequestQueue requestQueue;
    private final int initialCredit;
    private int subpartitionId;
    private volatile ResultSubpartitionView subpartitionView;
    private volatile PartitionRequestListener partitionRequestListener;
    private int numCreditsAvailable;
    private final Object requestLock = new Object();
    private boolean isRegisteredAsAvailable = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    public CreditBasedSequenceNumberingViewReader(InputChannelID inputChannelID, int i, PartitionRequestQueue partitionRequestQueue) {
        Preconditions.checkArgument(i >= 0, "Must be non-negative.");
        this.receiverId = inputChannelID;
        this.initialCredit = i;
        this.numCreditsAvailable = i;
        this.requestQueue = partitionRequestQueue;
        this.subpartitionId = -1;
    }

    @Override // org.apache.flink.runtime.io.network.NetworkSequenceViewReader
    public void requestSubpartitionViewOrRegisterListener(ResultPartitionProvider resultPartitionProvider, ResultPartitionID resultPartitionID, ResultSubpartitionIndexSet resultSubpartitionIndexSet) throws IOException {
        synchronized (this.requestLock) {
            Preconditions.checkState(this.subpartitionView == null, "Subpartitions already requested");
            Preconditions.checkState(this.partitionRequestListener == null, "Partition request listener already created");
            this.partitionRequestListener = new NettyPartitionRequestListener(resultPartitionProvider, this, resultSubpartitionIndexSet, resultPartitionID);
            Optional<ResultSubpartitionView> createSubpartitionViewOrRegisterListener = resultPartitionProvider.createSubpartitionViewOrRegisterListener(resultPartitionID, resultSubpartitionIndexSet, this, this.partitionRequestListener);
            if (createSubpartitionViewOrRegisterListener.isPresent()) {
                this.subpartitionView = createSubpartitionViewOrRegisterListener.get();
                if (resultSubpartitionIndexSet.size() == 1) {
                    this.subpartitionId = resultSubpartitionIndexSet.values().iterator().next().intValue();
                }
                notifyDataAvailable(this.subpartitionView);
                this.requestQueue.notifyReaderCreated(this);
            }
        }
    }

    @Override // org.apache.flink.runtime.io.network.NetworkSequenceViewReader
    public void notifySubpartitionsCreated(ResultPartition resultPartition, ResultSubpartitionIndexSet resultSubpartitionIndexSet) throws IOException {
        synchronized (this.requestLock) {
            Preconditions.checkState(this.subpartitionView == null, "Subpartitions already requested");
            this.subpartitionView = resultPartition.createSubpartitionView(resultSubpartitionIndexSet, this);
            if (resultSubpartitionIndexSet.size() == 1) {
                this.subpartitionId = resultSubpartitionIndexSet.values().iterator().next().intValue();
            }
        }
        notifyDataAvailable(this.subpartitionView);
        this.requestQueue.notifyReaderCreated(this);
    }

    @Override // org.apache.flink.runtime.io.network.NetworkSequenceViewReader
    public void addCredit(int i) {
        this.numCreditsAvailable += i;
    }

    @Override // org.apache.flink.runtime.io.network.NetworkSequenceViewReader
    public void notifyRequiredSegmentId(int i, int i2) {
        this.subpartitionView.notifyRequiredSegmentId(i, i2);
    }

    @Override // org.apache.flink.runtime.io.network.NetworkSequenceViewReader
    public void resumeConsumption() {
        if (this.initialCredit == 0) {
            this.numCreditsAvailable = 0;
        }
        this.subpartitionView.resumeConsumption();
    }

    @Override // org.apache.flink.runtime.io.network.NetworkSequenceViewReader
    public void acknowledgeAllRecordsProcessed() {
        this.subpartitionView.acknowledgeAllDataProcessed();
    }

    @Override // org.apache.flink.runtime.io.network.NetworkSequenceViewReader
    public void setRegisteredAsAvailable(boolean z) {
        this.isRegisteredAsAvailable = z;
    }

    @Override // org.apache.flink.runtime.io.network.NetworkSequenceViewReader
    public boolean isRegisteredAsAvailable() {
        return this.isRegisteredAsAvailable;
    }

    @Override // org.apache.flink.runtime.io.network.NetworkSequenceViewReader
    public ResultSubpartitionView.AvailabilityWithBacklog getAvailabilityAndBacklog() {
        return this.subpartitionView.getAvailabilityAndBacklog(this.numCreditsAvailable > 0);
    }

    private Buffer.DataType getNextDataType(ResultSubpartition.BufferAndBacklog bufferAndBacklog) {
        Buffer.DataType nextDataType = bufferAndBacklog.getNextDataType();
        return (this.numCreditsAvailable > 0 || nextDataType.isEvent()) ? nextDataType : Buffer.DataType.NONE;
    }

    @Override // org.apache.flink.runtime.io.network.NetworkSequenceViewReader
    public InputChannelID getReceiverId() {
        return this.receiverId;
    }

    @Override // org.apache.flink.runtime.io.network.NetworkSequenceViewReader
    public void notifyNewBufferSize(int i) {
        this.subpartitionView.notifyNewBufferSize(i);
    }

    @Override // org.apache.flink.runtime.io.network.NetworkSequenceViewReader
    public void notifyPartitionRequestTimeout(PartitionRequestListener partitionRequestListener) {
        this.requestQueue.notifyPartitionRequestTimeout(partitionRequestListener);
        this.partitionRequestListener = null;
    }

    @VisibleForTesting
    int getNumCreditsAvailable() {
        return this.numCreditsAvailable;
    }

    @VisibleForTesting
    ResultSubpartitionView.AvailabilityWithBacklog hasBuffersAvailable() {
        return this.subpartitionView.getAvailabilityAndBacklog(true);
    }

    @Override // org.apache.flink.runtime.io.network.NetworkSequenceViewReader
    public int peekNextBufferSubpartitionId() throws IOException {
        return this.subpartitionId >= 0 ? this.subpartitionId : this.subpartitionView.peekNextBufferSubpartitionId();
    }

    @Override // org.apache.flink.runtime.io.network.NetworkSequenceViewReader
    @Nullable
    public InputChannel.BufferAndAvailability getNextBuffer() throws IOException {
        ResultSubpartition.BufferAndBacklog nextBuffer = this.subpartitionView.getNextBuffer();
        if (nextBuffer == null) {
            return null;
        }
        if (nextBuffer.buffer().isBuffer()) {
            int i = this.numCreditsAvailable - 1;
            this.numCreditsAvailable = i;
            if (i < 0) {
                throw new IllegalStateException("no credit available");
            }
        }
        return new InputChannel.BufferAndAvailability(nextBuffer.buffer(), getNextDataType(nextBuffer), nextBuffer.buffersInBacklog(), nextBuffer.getSequenceNumber());
    }

    @Override // org.apache.flink.runtime.io.network.NetworkSequenceViewReader
    public boolean needAnnounceBacklog() {
        return this.initialCredit == 0 && this.numCreditsAvailable == 0;
    }

    @Override // org.apache.flink.runtime.io.network.NetworkSequenceViewReader
    public boolean isReleased() {
        return this.subpartitionView.isReleased();
    }

    @Override // org.apache.flink.runtime.io.network.NetworkSequenceViewReader
    public Throwable getFailureCause() {
        return this.subpartitionView.getFailureCause();
    }

    @Override // org.apache.flink.runtime.io.network.NetworkSequenceViewReader
    public void releaseAllResources() throws IOException {
        if (this.partitionRequestListener != null) {
            this.partitionRequestListener.releaseListener();
        }
        this.subpartitionView.releaseAllResources();
    }

    @Override // org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener
    public void notifyDataAvailable(ResultSubpartitionView resultSubpartitionView) {
        this.requestQueue.notifyReaderNonEmpty(this);
    }

    @Override // org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener
    public void notifyPriorityEvent(int i) {
        notifyDataAvailable(this.subpartitionView);
    }

    public String toString() {
        return "CreditBasedSequenceNumberingViewReader{requestLock=" + this.requestLock + ", receiverId=" + this.receiverId + ", numCreditsAvailable=" + this.numCreditsAvailable + ", isRegisteredAsAvailable=" + this.isRegisteredAsAvailable + '}';
    }
}
