package org.apache.flink.runtime.io.disk.iomanager;

import java.io.IOException;
import java.lang.Thread;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ShutdownHookUtil;

/* loaded from: input_file:org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.class */
public class IOManagerAsync extends IOManager implements Thread.UncaughtExceptionHandler {
    private final WriterThread[] writers;
    private final ReaderThread[] readers;
    private final AtomicBoolean isShutdown;
    private final Thread shutdownHook;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync$ReaderThread.class */
    public static final class ReaderThread extends Thread {
        protected final RequestQueue<ReadRequest> requestQueue = new RequestQueue<>();
        private volatile boolean alive = true;

        protected ReaderThread() {
        }

        protected void shutdown() {
            synchronized (this) {
                if (this.alive) {
                    this.alive = false;
                    this.requestQueue.close();
                    interrupt();
                }
                try {
                    join(1000L);
                } catch (InterruptedException e) {
                }
                IOException iOException = new IOException("IO-Manager has been closed.");
                while (!this.requestQueue.isEmpty()) {
                    ReadRequest poll = this.requestQueue.poll();
                    if (poll != null) {
                        try {
                            poll.requestDone(iOException);
                        } catch (Throwable th) {
                            IOManagerAsync.LOG.error("The handler of the request complete callback threw an exception" + (th.getMessage() == null ? "." : ": " + th.getMessage()), th);
                        }
                    }
                }
            }
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (this.alive) {
                ReadRequest readRequest = null;
                while (this.alive && readRequest == null) {
                    try {
                        readRequest = this.requestQueue.take();
                    } catch (InterruptedException e) {
                        if (!this.alive) {
                            return;
                        } else {
                            IOManagerAsync.LOG.warn(Thread.currentThread() + " was interrupted without shutdown.");
                        }
                    }
                }
                IOException iOException = null;
                try {
                    readRequest.read();
                } catch (IOException e2) {
                    iOException = e2;
                } catch (Throwable th) {
                    iOException = new IOException("The buffer could not be read: " + th.getMessage(), th);
                    IOManagerAsync.LOG.error("I/O reading thread encountered an error" + (th.getMessage() == null ? "." : ": " + th.getMessage()), th);
                }
                try {
                    readRequest.requestDone(iOException);
                } catch (Throwable th2) {
                    IOManagerAsync.LOG.error("The handler of the request-complete-callback threw an exception" + (th2.getMessage() == null ? "." : ": " + th2.getMessage()), th2);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync$WriterThread.class */
    public static final class WriterThread extends Thread {
        protected final RequestQueue<WriteRequest> requestQueue = new RequestQueue<>();
        private volatile boolean alive = true;

        protected WriterThread() {
        }

        protected void shutdown() {
            synchronized (this) {
                if (this.alive) {
                    this.alive = false;
                    this.requestQueue.close();
                    interrupt();
                }
                try {
                    join(1000L);
                } catch (InterruptedException e) {
                }
                IOException iOException = new IOException("IO-Manager has been closed.");
                while (!this.requestQueue.isEmpty()) {
                    WriteRequest poll = this.requestQueue.poll();
                    if (poll != null) {
                        try {
                            poll.requestDone(iOException);
                        } catch (Throwable th) {
                            IOManagerAsync.LOG.error("The handler of the request complete callback threw an exception" + (th.getMessage() == null ? "." : ": " + th.getMessage()), th);
                        }
                    }
                }
            }
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (this.alive) {
                WriteRequest writeRequest = null;
                while (this.alive && writeRequest == null) {
                    try {
                        writeRequest = this.requestQueue.take();
                    } catch (InterruptedException e) {
                        if (!this.alive) {
                            return;
                        } else {
                            IOManagerAsync.LOG.warn(Thread.currentThread() + " was interrupted without shutdown.");
                        }
                    }
                }
                IOException iOException = null;
                try {
                    writeRequest.write();
                } catch (IOException e2) {
                    iOException = e2;
                } catch (Throwable th) {
                    iOException = new IOException("The buffer could not be written: " + th.getMessage(), th);
                    IOManagerAsync.LOG.error("I/O writing thread encountered an error" + (th.getMessage() == null ? "." : ": " + th.getMessage()), th);
                }
                try {
                    writeRequest.requestDone(iOException);
                } catch (Throwable th2) {
                    IOManagerAsync.LOG.error("The handler of the request-complete-callback threw an exception" + (th2.getMessage() == null ? "." : ": " + th2.getMessage()), th2);
                }
            }
        }
    }

    public IOManagerAsync() {
        this(EnvironmentInformation.getTemporaryFileDirectory());
    }

    public IOManagerAsync(String str) {
        this(new String[]{str});
    }

    public IOManagerAsync(String[] strArr) {
        super(strArr);
        this.isShutdown = new AtomicBoolean();
        this.writers = new WriterThread[strArr.length];
        for (int i = 0; i < this.writers.length; i++) {
            WriterThread writerThread = new WriterThread();
            this.writers[i] = writerThread;
            writerThread.setName("IOManager writer thread #" + (i + 1));
            writerThread.setDaemon(true);
            writerThread.setUncaughtExceptionHandler(this);
            writerThread.start();
        }
        this.readers = new ReaderThread[strArr.length];
        for (int i2 = 0; i2 < this.readers.length; i2++) {
            ReaderThread readerThread = new ReaderThread();
            this.readers[i2] = readerThread;
            readerThread.setName("IOManager reader thread #" + (i2 + 1));
            readerThread.setDaemon(true);
            readerThread.setUncaughtExceptionHandler(this);
            readerThread.start();
        }
        this.shutdownHook = ShutdownHookUtil.addShutdownHook(this::shutdown, getClass().getSimpleName(), LOG);
    }

    @Override // org.apache.flink.runtime.io.disk.iomanager.IOManager
    public void shutdown() {
        if (this.isShutdown.compareAndSet(false, true)) {
            ShutdownHookUtil.removeShutdownHook(this.shutdownHook, getClass().getSimpleName(), LOG);
            try {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Shutting down I/O manager.");
                }
                for (WriterThread writerThread : this.writers) {
                    try {
                        writerThread.shutdown();
                    } catch (Throwable th) {
                        LOG.error("Error while shutting down IO Manager writer thread.", th);
                    }
                }
                for (ReaderThread readerThread : this.readers) {
                    try {
                        readerThread.shutdown();
                    } catch (Throwable th2) {
                        LOG.error("Error while shutting down IO Manager reader thread.", th2);
                    }
                }
                try {
                    for (WriterThread writerThread2 : this.writers) {
                        writerThread2.join();
                    }
                    for (ReaderThread readerThread2 : this.readers) {
                        readerThread2.join();
                    }
                } catch (InterruptedException e) {
                }
            } finally {
                super.shutdown();
            }
        }
    }

    @Override // org.apache.flink.runtime.io.disk.iomanager.IOManager
    public boolean isProperlyShutDown() {
        boolean z = true;
        for (ReaderThread readerThread : this.readers) {
            z &= readerThread.getState() == Thread.State.TERMINATED;
        }
        boolean z2 = true;
        for (WriterThread writerThread : this.writers) {
            z2 &= writerThread.getState() == Thread.State.TERMINATED;
        }
        return this.isShutdown.get() && z && z2 && super.isProperlyShutDown();
    }

    @Override // java.lang.Thread.UncaughtExceptionHandler
    public void uncaughtException(Thread thread, Throwable th) {
        LOG.error("IO Thread '" + thread.getName() + "' terminated due to an exception. Shutting down I/O Manager.", th);
        shutdown();
    }

    @Override // org.apache.flink.runtime.io.disk.iomanager.IOManager
    public BlockChannelWriter<MemorySegment> createBlockChannelWriter(FileIOChannel.ID id, LinkedBlockingQueue<MemorySegment> linkedBlockingQueue) throws IOException {
        Preconditions.checkState(!this.isShutdown.get(), "I/O-Manger is shut down.");
        return new AsynchronousBlockWriter(id, this.writers[id.getThreadNum()].requestQueue, linkedBlockingQueue);
    }

    @Override // org.apache.flink.runtime.io.disk.iomanager.IOManager
    public BlockChannelWriterWithCallback<MemorySegment> createBlockChannelWriter(FileIOChannel.ID id, RequestDoneCallback<MemorySegment> requestDoneCallback) throws IOException {
        Preconditions.checkState(!this.isShutdown.get(), "I/O-Manger is shut down.");
        return new AsynchronousBlockWriterWithCallback(id, this.writers[id.getThreadNum()].requestQueue, requestDoneCallback);
    }

    @Override // org.apache.flink.runtime.io.disk.iomanager.IOManager
    public BlockChannelReader<MemorySegment> createBlockChannelReader(FileIOChannel.ID id, LinkedBlockingQueue<MemorySegment> linkedBlockingQueue) throws IOException {
        Preconditions.checkState(!this.isShutdown.get(), "I/O-Manger is shut down.");
        return new AsynchronousBlockReader(id, this.readers[id.getThreadNum()].requestQueue, linkedBlockingQueue);
    }

    @Override // org.apache.flink.runtime.io.disk.iomanager.IOManager
    public BufferFileWriter createBufferFileWriter(FileIOChannel.ID id) throws IOException {
        Preconditions.checkState(!this.isShutdown.get(), "I/O-Manger is shut down.");
        return new AsynchronousBufferFileWriter(id, this.writers[id.getThreadNum()].requestQueue);
    }

    @Override // org.apache.flink.runtime.io.disk.iomanager.IOManager
    public BufferFileReader createBufferFileReader(FileIOChannel.ID id, RequestDoneCallback<Buffer> requestDoneCallback) throws IOException {
        Preconditions.checkState(!this.isShutdown.get(), "I/O-Manger is shut down.");
        return new AsynchronousBufferFileReader(id, this.readers[id.getThreadNum()].requestQueue, requestDoneCallback);
    }

    @Override // org.apache.flink.runtime.io.disk.iomanager.IOManager
    public BufferFileSegmentReader createBufferFileSegmentReader(FileIOChannel.ID id, RequestDoneCallback<FileSegment> requestDoneCallback) throws IOException {
        Preconditions.checkState(!this.isShutdown.get(), "I/O-Manger is shut down.");
        return new AsynchronousBufferFileSegmentReader(id, this.readers[id.getThreadNum()].requestQueue, requestDoneCallback);
    }

    @Override // org.apache.flink.runtime.io.disk.iomanager.IOManager
    public BulkBlockChannelReader createBulkBlockChannelReader(FileIOChannel.ID id, List<MemorySegment> list, int i) throws IOException {
        Preconditions.checkState(!this.isShutdown.get(), "I/O-Manger is shut down.");
        return new AsynchronousBulkBlockReader(id, this.readers[id.getThreadNum()].requestQueue, list, i);
    }

    RequestQueue<ReadRequest> getReadRequestQueue(FileIOChannel.ID id) {
        return this.readers[id.getThreadNum()].requestQueue;
    }

    RequestQueue<WriteRequest> getWriteRequestQueue(FileIOChannel.ID id) {
        return this.writers[id.getThreadNum()].requestQueue;
    }
}
