package org.apache.flink.runtime.io.network.partition.hybrid;

import java.io.IOException;
import java.util.Optional;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.class */
public class HsSubpartitionView implements ResultSubpartitionView, HsSubpartitionViewInternalOperations {
    private final BufferAvailabilityListener availabilityListener;
    private final Object lock = new Object();

    @GuardedBy("lock")
    private int lastConsumedBufferIndex = -1;

    @GuardedBy("lock")
    private boolean needNotify = true;

    @GuardedBy("lock")
    @Nullable
    private Buffer.DataType cachedNextDataType = null;

    @GuardedBy("lock")
    @Nullable
    private Throwable failureCause = null;

    @GuardedBy("lock")
    private boolean isReleased = false;

    @GuardedBy("lock")
    @Nullable
    private HsDataView diskDataView;

    @GuardedBy("lock")
    @Nullable
    private HsDataView memoryDataView;
    static final /* synthetic */ boolean $assertionsDisabled;

    public HsSubpartitionView(BufferAvailabilityListener bufferAvailabilityListener) {
        this.availabilityListener = bufferAvailabilityListener;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    @Nullable
    public ResultSubpartition.BufferAndBacklog getNextBuffer() {
        ResultSubpartition.BufferAndBacklog bufferAndBacklog;
        try {
            synchronized (this.lock) {
                Preconditions.checkNotNull(this.diskDataView, "disk data view must be not null.");
                Preconditions.checkNotNull(this.memoryDataView, "memory data view must be not null.");
                Optional<ResultSubpartition.BufferAndBacklog> tryReadFromDisk = tryReadFromDisk();
                if (!tryReadFromDisk.isPresent()) {
                    tryReadFromDisk = this.memoryDataView.consumeBuffer(this.lastConsumedBufferIndex + 1);
                }
                updateConsumingStatus(tryReadFromDisk);
                bufferAndBacklog = (ResultSubpartition.BufferAndBacklog) tryReadFromDisk.map(this::handleBacklog).orElse(null);
            }
            return bufferAndBacklog;
        } catch (Throwable th) {
            releaseInternal(th);
            return null;
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public void notifyDataAvailable() {
        boolean z = false;
        synchronized (this.lock) {
            if (this.isReleased) {
                return;
            }
            if (this.needNotify) {
                z = true;
                this.needNotify = false;
            }
            if (z) {
                this.availabilityListener.notifyDataAvailable();
            }
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public ResultSubpartitionView.AvailabilityWithBacklog getAvailabilityAndBacklog(int i) {
        ResultSubpartitionView.AvailabilityWithBacklog availabilityWithBacklog;
        synchronized (this.lock) {
            boolean z = i > 0;
            if (i <= 0) {
                if (this.cachedNextDataType != null && this.cachedNextDataType == Buffer.DataType.EVENT_BUFFER) {
                    z = true;
                }
            }
            int subpartitionBacklog = getSubpartitionBacklog();
            if (subpartitionBacklog == 0) {
                this.needNotify = true;
            }
            availabilityWithBacklog = new ResultSubpartitionView.AvailabilityWithBacklog(z, subpartitionBacklog);
        }
        return availabilityWithBacklog;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public void releaseAllResources() throws IOException {
        releaseInternal(null);
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public boolean isReleased() {
        boolean z;
        synchronized (this.lock) {
            z = this.isReleased;
        }
        return z;
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsSubpartitionViewInternalOperations
    public int getConsumingOffset(boolean z) {
        int i;
        if (!z) {
            return this.lastConsumedBufferIndex;
        }
        synchronized (this.lock) {
            i = this.lastConsumedBufferIndex;
        }
        return i;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public Throwable getFailureCause() {
        Throwable th;
        synchronized (this.lock) {
            th = this.failureCause;
        }
        return th;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setDiskDataView(HsDataView hsDataView) {
        synchronized (this.lock) {
            Preconditions.checkState(this.diskDataView == null, "repeatedly set disk data view is not allowed.");
            this.diskDataView = hsDataView;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setMemoryDataView(HsDataView hsDataView) {
        synchronized (this.lock) {
            Preconditions.checkState(this.memoryDataView == null, "repeatedly set memory data view is not allowed.");
            this.memoryDataView = hsDataView;
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public void resumeConsumption() {
        throw new UnsupportedOperationException("resumeConsumption should never be called.");
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public void acknowledgeAllDataProcessed() {
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public int unsynchronizedGetNumberOfQueuedBuffers() {
        return getSubpartitionBacklog();
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public int getNumberOfQueuedBuffers() {
        int subpartitionBacklog;
        synchronized (this.lock) {
            subpartitionBacklog = getSubpartitionBacklog();
        }
        return subpartitionBacklog;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public void notifyNewBufferSize(int i) {
        throw new UnsupportedOperationException("Method should never be called.");
    }

    private int getSubpartitionBacklog() {
        if (this.memoryDataView == null || this.diskDataView == null) {
            return 0;
        }
        return Math.max(this.memoryDataView.getBacklog(), this.diskDataView.getBacklog());
    }

    private ResultSubpartition.BufferAndBacklog handleBacklog(ResultSubpartition.BufferAndBacklog bufferAndBacklog) {
        return bufferAndBacklog.buffersInBacklog() == 0 ? new ResultSubpartition.BufferAndBacklog(bufferAndBacklog.buffer(), getSubpartitionBacklog(), bufferAndBacklog.getNextDataType(), bufferAndBacklog.getSequenceNumber()) : bufferAndBacklog;
    }

    @GuardedBy("lock")
    private Optional<ResultSubpartition.BufferAndBacklog> tryReadFromDisk() throws Throwable {
        int i = this.lastConsumedBufferIndex + 1;
        return ((HsDataView) Preconditions.checkNotNull(this.diskDataView)).consumeBuffer(i).map(bufferAndBacklog -> {
            return bufferAndBacklog.getNextDataType() == Buffer.DataType.NONE ? new ResultSubpartition.BufferAndBacklog(bufferAndBacklog.buffer(), bufferAndBacklog.buffersInBacklog(), ((HsDataView) Preconditions.checkNotNull(this.memoryDataView)).peekNextToConsumeDataType(i + 1), bufferAndBacklog.getSequenceNumber()) : bufferAndBacklog;
        });
    }

    @GuardedBy("lock")
    private void updateConsumingStatus(Optional<ResultSubpartition.BufferAndBacklog> optional) {
        if (!$assertionsDisabled && !Thread.holdsLock(this.lock)) {
            throw new AssertionError();
        }
        if (optional.isPresent()) {
            this.lastConsumedBufferIndex++;
            Preconditions.checkState(optional.get().getSequenceNumber() == this.lastConsumedBufferIndex);
        }
        this.needNotify = !((Boolean) optional.map((v0) -> {
            return v0.isDataAvailable();
        }).orElse(false)).booleanValue();
        this.cachedNextDataType = (Buffer.DataType) optional.map((v0) -> {
            return v0.getNextDataType();
        }).orElse(null);
    }

    private void releaseInternal(@Nullable Throwable th) {
        boolean z = false;
        synchronized (this.lock) {
            if (this.isReleased) {
                return;
            }
            this.isReleased = true;
            this.failureCause = th;
            if (this.diskDataView != null) {
                z = true;
            }
            if (z) {
                this.diskDataView.releaseDataView();
            }
        }
    }

    static {
        $assertionsDisabled = !HsSubpartitionView.class.desiredAssertionStatus();
    }
}
