package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.util.FatalExitExceptionHandler;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.class */
class SortMergeResultPartitionReadScheduler implements Runnable, BufferRecycler {
    private static final Logger LOG;
    private static final Duration DEFAULT_BUFFER_REQUEST_TIMEOUT;
    private final ByteBuffer headerBuf;
    private final ByteBuffer indexEntryBufferInit;
    private final ByteBuffer indexEntryBufferRead;
    private final Object lock;
    private final CompletableFuture<?> releaseFuture;
    private final BatchShuffleReadBufferPool bufferPool;
    private final Executor ioExecutor;
    private final Duration bufferRequestTimeout;

    @GuardedBy("lock")
    private final Set<SortMergeSubpartitionReader> failedReaders;

    @GuardedBy("lock")
    private final Set<SortMergeSubpartitionReader> allReaders;

    @GuardedBy("lock")
    private final Queue<SortMergeSubpartitionReader> sortedReaders;

    @GuardedBy("lock")
    private FileChannel dataFileChannel;

    @GuardedBy("lock")
    private FileChannel indexFileChannel;

    @GuardedBy("lock")
    private boolean isRunning;

    @GuardedBy("lock")
    private volatile int numRequestedBuffers;

    @GuardedBy("lock")
    private volatile boolean isReleased;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SortMergeResultPartitionReadScheduler(BatchShuffleReadBufferPool batchShuffleReadBufferPool, Executor executor, Object obj) {
        this(batchShuffleReadBufferPool, executor, obj, DEFAULT_BUFFER_REQUEST_TIMEOUT);
    }

    SortMergeResultPartitionReadScheduler(BatchShuffleReadBufferPool batchShuffleReadBufferPool, Executor executor, Object obj, Duration duration) {
        this.headerBuf = BufferReaderWriterUtil.allocatedHeaderBuffer();
        this.indexEntryBufferInit = ByteBuffer.allocateDirect(16);
        this.indexEntryBufferRead = ByteBuffer.allocateDirect(16);
        this.releaseFuture = new CompletableFuture<>();
        this.failedReaders = new HashSet();
        this.allReaders = new HashSet();
        this.sortedReaders = new PriorityQueue();
        this.lock = Preconditions.checkNotNull(obj);
        this.bufferPool = (BatchShuffleReadBufferPool) Preconditions.checkNotNull(batchShuffleReadBufferPool);
        this.ioExecutor = (Executor) Preconditions.checkNotNull(executor);
        this.bufferRequestTimeout = (Duration) Preconditions.checkNotNull(duration);
        BufferReaderWriterUtil.configureByteBuffer(this.indexEntryBufferInit);
        BufferReaderWriterUtil.configureByteBuffer(this.indexEntryBufferRead);
    }

    @Override // java.lang.Runnable
    public synchronized void run() {
        HashSet hashSet = new HashSet();
        try {
            Queue<MemorySegment> allocateBuffers = allocateBuffers();
            Preconditions.checkState(!allocateBuffers.isEmpty(), "No buffer available.");
            int size = allocateBuffers.size();
            ArrayList<SortMergeSubpartitionReader> arrayList = new ArrayList<>();
            SortMergeSubpartitionReader nextReader = getNextReader();
            while (nextReader != null) {
                try {
                    if (nextReader.readBuffers(allocateBuffers, this)) {
                        arrayList.add(nextReader);
                    } else {
                        hashSet.add(nextReader);
                    }
                } catch (Throwable th) {
                    failSubpartitionReaders(Collections.singletonList(nextReader), th);
                    LOG.debug("Failed to read shuffle data.", th);
                }
                if (allocateBuffers.isEmpty()) {
                    break;
                }
                nextReader = getNextReader();
                if (nextReader == null && !arrayList.isEmpty()) {
                    returnUnfinishedReaders(arrayList);
                    nextReader = getNextReader();
                }
            }
            int size2 = size - allocateBuffers.size();
            releaseBuffers(allocateBuffers);
            returnUnfinishedReaders(arrayList);
            removeFinishedAndFailedReaders(size2, hashSet);
        } catch (Throwable th2) {
            LOG.error("Failed to request buffers for data reading.", th2);
            failSubpartitionReaders(getAllReaders(), th2);
            removeFinishedAndFailedReaders(0, hashSet);
        }
    }

    @VisibleForTesting
    Queue<MemorySegment> allocateBuffers() throws Exception {
        long bufferRequestTimeoutTime = getBufferRequestTimeoutTime();
        while (true) {
            List<MemorySegment> requestBuffers = this.bufferPool.requestBuffers();
            if (!requestBuffers.isEmpty()) {
                return new ArrayDeque(requestBuffers);
            }
            Preconditions.checkState(!this.isReleased, "Result partition has been already released.");
            if (System.nanoTime() >= bufferRequestTimeoutTime) {
                long nanoTime = System.nanoTime();
                bufferRequestTimeoutTime = getBufferRequestTimeoutTime();
                if (nanoTime >= nanoTime) {
                    if (this.numRequestedBuffers <= 0) {
                        throw new TimeoutException(String.format("Buffer request timeout, this means there is a fierce contention of the batch shuffle read memory, please increase '%s'.", TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY.key()));
                    }
                    return new ArrayDeque();
                }
            }
        }
    }

    private long getBufferRequestTimeoutTime() {
        return this.bufferPool.getLastBufferOperationTimestamp() + this.bufferRequestTimeout.toNanos();
    }

    private void releaseBuffers(Queue<MemorySegment> queue) {
        if (queue.isEmpty()) {
            return;
        }
        try {
            this.bufferPool.recycle(queue);
            queue.clear();
        } catch (Throwable th) {
            FatalExitExceptionHandler.INSTANCE.uncaughtException(Thread.currentThread(), th);
        }
    }

    private void failSubpartitionReaders(Collection<SortMergeSubpartitionReader> collection, Throwable th) {
        synchronized (this.lock) {
            this.failedReaders.addAll(collection);
        }
        Iterator<SortMergeSubpartitionReader> it = collection.iterator();
        while (it.hasNext()) {
            try {
                it.next().fail(th);
            } catch (Throwable th2) {
                FatalExitExceptionHandler.INSTANCE.uncaughtException(Thread.currentThread(), th2);
            }
        }
    }

    private void removeFinishedAndFailedReaders(int i, Set<SortMergeSubpartitionReader> set) {
        synchronized (this.lock) {
            Iterator<SortMergeSubpartitionReader> it = set.iterator();
            while (it.hasNext()) {
                this.allReaders.remove(it.next());
            }
            set.clear();
            Iterator<SortMergeSubpartitionReader> it2 = this.failedReaders.iterator();
            while (it2.hasNext()) {
                this.allReaders.remove(it2.next());
            }
            this.failedReaders.clear();
            if (this.allReaders.isEmpty()) {
                this.bufferPool.unregisterRequester(this);
                closeFileChannels();
                this.sortedReaders.clear();
            }
            this.numRequestedBuffers += i;
            this.isRunning = false;
            mayTriggerReading();
            mayNotifyReleased();
        }
    }

    private void mayNotifyReleased() {
        if (!$assertionsDisabled && !Thread.holdsLock(this.lock)) {
            throw new AssertionError();
        }
        if (this.isReleased && this.allReaders.isEmpty()) {
            this.releaseFuture.complete(null);
        }
    }

    private Queue<SortMergeSubpartitionReader> getAllReaders() {
        synchronized (this.lock) {
            if (this.isReleased) {
                return new ArrayDeque();
            }
            return new ArrayDeque(this.allReaders);
        }
    }

    @Nullable
    private SortMergeSubpartitionReader getNextReader() {
        SortMergeSubpartitionReader sortMergeSubpartitionReader;
        synchronized (this.lock) {
            SortMergeSubpartitionReader poll = this.sortedReaders.poll();
            while (poll != null && this.failedReaders.contains(poll)) {
                poll = this.sortedReaders.poll();
            }
            sortMergeSubpartitionReader = poll;
        }
        return sortMergeSubpartitionReader;
    }

    private void returnUnfinishedReaders(ArrayList<SortMergeSubpartitionReader> arrayList) {
        if (arrayList == null || arrayList.isEmpty()) {
            return;
        }
        synchronized (this.lock) {
            this.sortedReaders.addAll(arrayList);
            arrayList.clear();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SortMergeSubpartitionReader createSubpartitionReader(BufferAvailabilityListener bufferAvailabilityListener, int i, PartitionedFile partitionedFile) throws IOException {
        SortMergeSubpartitionReader sortMergeSubpartitionReader;
        synchronized (this.lock) {
            Preconditions.checkState(!this.isReleased, "Partition is already released.");
            sortMergeSubpartitionReader = new SortMergeSubpartitionReader(bufferAvailabilityListener, createFileReader(partitionedFile, i));
            if (this.allReaders.isEmpty()) {
                this.bufferPool.registerRequester(this);
            }
            this.allReaders.add(sortMergeSubpartitionReader);
            this.sortedReaders.add(sortMergeSubpartitionReader);
            sortMergeSubpartitionReader.getReleaseFuture().thenRun(() -> {
                releaseSubpartitionReader(sortMergeSubpartitionReader);
            });
            mayTriggerReading();
        }
        return sortMergeSubpartitionReader;
    }

    private void releaseSubpartitionReader(SortMergeSubpartitionReader sortMergeSubpartitionReader) {
        synchronized (this.lock) {
            if (this.allReaders.contains(sortMergeSubpartitionReader)) {
                this.failedReaders.add(sortMergeSubpartitionReader);
            }
        }
    }

    private PartitionedFileReader createFileReader(PartitionedFile partitionedFile, int i) throws IOException {
        if (!$assertionsDisabled && !Thread.holdsLock(this.lock)) {
            throw new AssertionError();
        }
        try {
            if (this.allReaders.isEmpty()) {
                openFileChannels(partitionedFile);
            }
            PartitionedFileReader partitionedFileReader = new PartitionedFileReader(partitionedFile, i, this.dataFileChannel, this.indexFileChannel, this.headerBuf, this.indexEntryBufferRead);
            partitionedFileReader.initRegionIndex(this.indexEntryBufferInit);
            return partitionedFileReader;
        } catch (Throwable th) {
            if (this.allReaders.isEmpty()) {
                closeFileChannels();
            }
            throw th;
        }
    }

    private void openFileChannels(PartitionedFile partitionedFile) throws IOException {
        if (!$assertionsDisabled && !Thread.holdsLock(this.lock)) {
            throw new AssertionError();
        }
        closeFileChannels();
        this.dataFileChannel = openFileChannel(partitionedFile.getDataFilePath());
        this.indexFileChannel = openFileChannel(partitionedFile.getIndexFilePath());
    }

    private void closeFileChannels() {
        if (!$assertionsDisabled && !Thread.holdsLock(this.lock)) {
            throw new AssertionError();
        }
        IOUtils.closeAllQuietly(this.dataFileChannel, this.indexFileChannel);
        this.dataFileChannel = null;
        this.indexFileChannel = null;
    }

    @Override // org.apache.flink.runtime.io.network.buffer.BufferRecycler
    public void recycle(MemorySegment memorySegment) {
        synchronized (this.lock) {
            this.bufferPool.recycle(memorySegment);
            this.numRequestedBuffers--;
            mayTriggerReading();
        }
    }

    private void mayTriggerReading() {
        if (!$assertionsDisabled && !Thread.holdsLock(this.lock)) {
            throw new AssertionError();
        }
        int max = Math.max(4 * this.bufferPool.getNumBuffersPerRequest(), 2 * this.allReaders.size());
        if (this.isRunning || this.allReaders.isEmpty() || this.numRequestedBuffers + this.bufferPool.getNumBuffersPerRequest() > max || this.numRequestedBuffers >= this.bufferPool.getAverageBuffersPerRequester()) {
            return;
        }
        this.isRunning = true;
        this.ioExecutor.execute(this);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<?> release() {
        synchronized (this.lock) {
            if (this.isReleased) {
                return this.releaseFuture;
            }
            this.isReleased = true;
            this.failedReaders.addAll(this.allReaders);
            ArrayList arrayList = new ArrayList(this.allReaders);
            mayNotifyReleased();
            failSubpartitionReaders(arrayList, new IllegalStateException("Result partition has been already released."));
            return this.releaseFuture;
        }
    }

    private static FileChannel openFileChannel(Path path) throws IOException {
        return FileChannel.open(path, StandardOpenOption.READ);
    }

    @VisibleForTesting
    int getNumPendingReaders() {
        int size;
        synchronized (this.lock) {
            size = this.allReaders.size();
        }
        return size;
    }

    @VisibleForTesting
    FileChannel getDataFileChannel() {
        FileChannel fileChannel;
        synchronized (this.lock) {
            fileChannel = this.dataFileChannel;
        }
        return fileChannel;
    }

    @VisibleForTesting
    FileChannel getIndexFileChannel() {
        FileChannel fileChannel;
        synchronized (this.lock) {
            fileChannel = this.indexFileChannel;
        }
        return fileChannel;
    }

    @VisibleForTesting
    CompletableFuture<?> getReleaseFuture() {
        return this.releaseFuture;
    }

    @VisibleForTesting
    boolean isRunning() {
        boolean z;
        synchronized (this.lock) {
            z = this.isRunning;
        }
        return z;
    }

    static {
        $assertionsDisabled = !SortMergeResultPartitionReadScheduler.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(SortMergeResultPartitionReadScheduler.class);
        DEFAULT_BUFFER_REQUEST_TIMEOUT = Duration.ofMinutes(5L);
    }
}
