/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.fs.azurebfs.services;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamStatisticsImpl;
import org.apache.hadoop.fs.azurebfs.services.AbfsPerfInfo;
import org.apache.hadoop.fs.azurebfs.services.AbfsPerfTracker;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import org.apache.hadoop.fs.azurebfs.services.ReadBufferManager;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AbfsInputStream
extends FSInputStream
implements CanUnbuffer,
StreamCapabilities {
    private static final Logger LOG = LoggerFactory.getLogger(AbfsInputStream.class);
    private final AbfsInputStreamStatisticsImpl streamStatistics;
    private final AbfsClient client;
    private final FileSystem.Statistics statistics;
    private final String path;
    private final long contentLength;
    private final int bufferSize;
    private final int readAheadQueueDepth;
    private final String eTag;
    private final boolean tolerateOobAppends;
    private final boolean readAheadEnabled;
    private final int readAheadRange;
    private byte[] buffer = null;
    private long fCursor = 0L;
    private long fCursorAfterLastRead = -1L;
    private int bCursor = 0;
    private int limit = 0;
    private boolean closed = false;
    private long nextReadPos;

    public AbfsInputStream(AbfsClient client, FileSystem.Statistics statistics, String path, long contentLength, int bufferSize, int readAheadQueueDepth, boolean tolerateOobAppends, String eTag, boolean isReadAheadEnabled, int readAheadRange) {
        this.client = client;
        this.statistics = statistics;
        this.path = path;
        this.contentLength = contentLength;
        this.bufferSize = bufferSize;
        this.readAheadQueueDepth = readAheadQueueDepth >= 0 ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors();
        this.tolerateOobAppends = tolerateOobAppends;
        this.eTag = eTag;
        this.readAheadEnabled = isReadAheadEnabled;
        Preconditions.checkArgument((readAheadRange > 0 ? 1 : 0) != 0, (Object)"Read ahead range should be greater than 0");
        this.readAheadRange = readAheadRange;
        this.streamStatistics = new AbfsInputStreamStatisticsImpl();
    }

    public String getPath() {
        return this.path;
    }

    public int read() throws IOException {
        byte[] b = new byte[1];
        int numberOfBytesRead = this.read(b, 0, 1);
        if (numberOfBytesRead < 0) {
            return -1;
        }
        return b[0] & 0xFF;
    }

    public synchronized int read(byte[] b, int off, int len) throws IOException {
        int lastReadBytes;
        if (b != null) {
            LOG.debug("read requested b.length = {} offset = {} len = {}", new Object[]{b.length, off, len});
        } else {
            LOG.debug("read requested b = null offset = {} len = {}", (Object)off, (Object)len);
        }
        int currentOff = off;
        int currentLen = len;
        int totalReadBytes = 0;
        this.streamStatistics.readOperationStarted(off, len);
        do {
            if (this.nextReadPos >= this.fCursor - (long)this.limit && this.nextReadPos <= this.fCursor) {
                this.bCursor = (int)(this.nextReadPos - (this.fCursor - (long)this.limit));
                if (this.bCursor != this.limit) {
                    this.streamStatistics.seekInBuffer();
                }
            } else {
                this.fCursor = this.nextReadPos;
                this.limit = 0;
                this.bCursor = 0;
            }
            if ((lastReadBytes = this.readOneBlock(b, currentOff, currentLen)) <= 0) continue;
            currentOff += lastReadBytes;
            currentLen -= lastReadBytes;
            totalReadBytes += lastReadBytes;
        } while (currentLen > 0 && currentLen <= b.length - currentOff && lastReadBytes > 0);
        return totalReadBytes > 0 ? totalReadBytes : lastReadBytes;
    }

    private int readOneBlock(byte[] b, int off, int len) throws IOException {
        if (this.closed) {
            throw new IOException("Stream is closed!");
        }
        Preconditions.checkNotNull((Object)b);
        LOG.debug("read one block requested b.length = {} off {} len {}", new Object[]{b.length, off, len});
        if (len == 0) {
            return 0;
        }
        if (this.available() == 0) {
            return -1;
        }
        if (off < 0 || len < 0 || len > b.length - off) {
            throw new IndexOutOfBoundsException();
        }
        if (this.bCursor == this.limit) {
            if (this.fCursor >= this.contentLength) {
                return -1;
            }
            long bytesRead = 0L;
            this.bCursor = 0;
            this.limit = 0;
            if (this.buffer == null) {
                LOG.debug("created new buffer size {}", (Object)this.bufferSize);
                this.buffer = new byte[this.bufferSize];
            }
            if (-1L == this.fCursorAfterLastRead || this.fCursorAfterLastRead == this.fCursor || b.length >= this.bufferSize) {
                LOG.debug("Sequential read with read ahead size of {}", (Object)this.bufferSize);
                bytesRead = this.readInternal(this.fCursor, this.buffer, 0, this.bufferSize, false);
            } else {
                int lengthWithReadAhead = Math.min(b.length + this.readAheadRange, this.bufferSize);
                LOG.debug("Random read with read ahead size of {}", (Object)lengthWithReadAhead);
                bytesRead = this.readInternal(this.fCursor, this.buffer, 0, lengthWithReadAhead, true);
            }
            if (bytesRead == -1L) {
                return -1;
            }
            this.limit = (int)((long)this.limit + bytesRead);
            this.fCursor += bytesRead;
            this.fCursorAfterLastRead = this.fCursor;
        }
        int bytesRemaining = this.limit - this.bCursor;
        int bytesToRead = Math.min(len, bytesRemaining);
        System.arraycopy(this.buffer, this.bCursor, b, off, bytesToRead);
        this.bCursor += bytesToRead;
        this.nextReadPos += (long)bytesToRead;
        if (this.statistics != null) {
            this.statistics.incrementBytesRead((long)bytesToRead);
        }
        this.streamStatistics.bytesRead(bytesToRead);
        return bytesToRead;
    }

    private int readInternal(long position, byte[] b, int offset, int length, boolean bypassReadAhead) throws IOException {
        if (this.readAheadEnabled && !bypassReadAhead) {
            long nextSize;
            if (offset != 0) {
                throw new IllegalArgumentException("readahead buffers cannot have non-zero buffer offsets");
            }
            int numReadAheads = this.readAheadQueueDepth;
            LOG.debug("read ahead enabled issuing readheads num = {}", (Object)numReadAheads);
            for (long nextOffset = position; numReadAheads > 0 && nextOffset < this.contentLength; nextOffset += nextSize, --numReadAheads) {
                nextSize = Math.min((long)this.bufferSize, this.contentLength - nextOffset);
                LOG.debug("issuing read ahead requestedOffset = {} requested size {}", (Object)nextOffset, (Object)nextSize);
                ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int)nextSize);
            }
            int receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b);
            if (receivedBytes > 0) {
                LOG.debug("Received data from read ahead, not doing remote read");
                this.streamStatistics.bytesReadFromBuffer(receivedBytes);
                return receivedBytes;
            }
            receivedBytes = this.readRemote(position, b, offset, length);
            return receivedBytes;
        }
        LOG.debug("read ahead disabled, reading remote");
        return this.readRemote(position, b, offset, length);
    }

    int readRemote(long position, byte[] b, int offset, int length) throws IOException {
        AbfsRestOperation op;
        if (position < 0L) {
            throw new IllegalArgumentException("attempting to read from negative offset");
        }
        if (position >= this.contentLength) {
            return -1;
        }
        if (b == null) {
            throw new IllegalArgumentException("null byte array passed in to read() method");
        }
        if (offset >= b.length) {
            throw new IllegalArgumentException("offset greater than length of array");
        }
        if (length < 0) {
            throw new IllegalArgumentException("requested read length is less than zero");
        }
        if (length > b.length - offset) {
            throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer");
        }
        AbfsPerfTracker tracker = this.client.getAbfsPerfTracker();
        try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote", "read");){
            this.streamStatistics.remoteReadOperation();
            LOG.debug("issuing HTTP GET request params position = {} b.length = {} offset = {} length = {}", new Object[]{position, b.length, offset, length});
            op = this.client.read(this.path, position, b, offset, length, this.tolerateOobAppends ? "*" : this.eTag);
            perfInfo.registerResult(op.getResult()).registerSuccess(true);
        }
        catch (AzureBlobFileSystemException ex) {
            AbfsRestOperationException ere;
            if (ex instanceof AbfsRestOperationException && (ere = (AbfsRestOperationException)ex).getStatusCode() == 404) {
                throw new FileNotFoundException(ere.getMessage());
            }
            throw new IOException(ex);
        }
        long bytesRead = op.getResult().getBytesReceived();
        if (bytesRead > Integer.MAX_VALUE) {
            throw new IOException("Unexpected Content-Length");
        }
        LOG.debug("HTTP request read bytes = {}", (Object)bytesRead);
        return (int)bytesRead;
    }

    public synchronized void seek(long n) throws IOException {
        LOG.debug("requested seek to position {}", (Object)n);
        if (this.closed) {
            throw new IOException("Stream is closed!");
        }
        if (n < 0L) {
            throw new EOFException("Cannot seek to a negative offset");
        }
        if (n > this.contentLength) {
            throw new EOFException("Attempted to seek or read past the end of the file");
        }
        this.streamStatistics.seek(n, this.fCursor);
        this.nextReadPos = n;
        LOG.debug("set nextReadPos to {}", (Object)this.nextReadPos);
    }

    public synchronized long skip(long n) throws IOException {
        if (this.closed) {
            throw new IOException("Stream is closed!");
        }
        long currentPos = this.getPos();
        if (currentPos == this.contentLength && n > 0L) {
            throw new EOFException("Attempted to seek or read past the end of the file");
        }
        long newPos = currentPos + n;
        if (newPos < 0L) {
            newPos = 0L;
            n = newPos - currentPos;
        }
        if (newPos > this.contentLength) {
            newPos = this.contentLength;
            n = newPos - currentPos;
        }
        this.seek(newPos);
        return n;
    }

    public synchronized int available() throws IOException {
        if (this.closed) {
            throw new IOException("Stream is closed!");
        }
        long remaining = this.contentLength - this.getPos();
        return remaining <= Integer.MAX_VALUE ? (int)remaining : Integer.MAX_VALUE;
    }

    public long length() throws IOException {
        if (this.closed) {
            throw new IOException("Stream is closed!");
        }
        return this.contentLength;
    }

    public synchronized long getPos() throws IOException {
        if (this.closed) {
            throw new IOException("Stream is closed!");
        }
        return this.nextReadPos < 0L ? 0L : this.nextReadPos;
    }

    public boolean seekToNewSource(long l) throws IOException {
        return false;
    }

    public synchronized void close() throws IOException {
        this.closed = true;
        this.buffer = null;
        LOG.debug("Closing {}", (Object)this);
    }

    public synchronized void mark(int readlimit) {
        throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
    }

    public synchronized void reset() throws IOException {
        throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
    }

    public boolean markSupported() {
        return false;
    }

    @VisibleForTesting
    public boolean isReadAheadEnabled() {
        return this.readAheadEnabled;
    }

    @VisibleForTesting
    public int getReadAheadRange() {
        return this.readAheadRange;
    }

    @VisibleForTesting
    public AbfsInputStreamStatisticsImpl getStreamStatistics() {
        return this.streamStatistics;
    }

    public String toString() {
        StringBuilder sb = new StringBuilder(super.toString());
        sb.append("AbfsInputStream@(").append(((Object)((Object)this)).hashCode()).append("){");
        sb.append("ReadAheadEnabled=").append(this.readAheadEnabled).append(", ");
        sb.append(this.streamStatistics.toString());
        sb.append("}");
        return sb.toString();
    }

    public synchronized void unbuffer() {
        this.buffer = null;
        this.fCursor = this.nextReadPos > 0L ? this.nextReadPos : 0L;
        this.fCursorAfterLastRead = -1L;
        this.bCursor = 0;
        this.limit = 0;
    }

    public boolean hasCapability(String capability) {
        return "in:unbuffer".equals(StringUtils.toLowerCase((String)capability));
    }

    byte[] getBuffer() {
        return this.buffer;
    }
}

