package tachyon.client;

import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tachyon.Constants;
import tachyon.client.RemoteBlockReader;
import tachyon.conf.TachyonConf;
import tachyon.thrift.ClientBlockInfo;
import tachyon.thrift.NetAddress;
import tachyon.underfs.UnderFileSystem;
import tachyon.util.NetworkUtils;

/* loaded from: input_file:tachyon/client/RemoteBlockInStream.class */
public class RemoteBlockInStream extends BlockInStream {
    private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE);
    private ClientBlockInfo mBlockInfo;
    private InputStream mCheckpointInputStream;
    private long mCheckpointPos;
    private long mBlockPos;
    private ByteBuffer mCurrentBuffer;
    private long mBufferStartPos;
    private boolean mRecache;
    private boolean mAttemptReadFromWorkers;
    private BlockOutStream mBlockOutStream;
    private Object mUFSConf;
    private long mBytesReadRemote;
    private static final int MAX_REMOTE_READ_ATTEMPTS = 2;

    RemoteBlockInStream(TachyonFile tachyonFile, ReadType readType, int i, TachyonConf tachyonConf) throws IOException {
        this(tachyonFile, readType, i, null, tachyonConf);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RemoteBlockInStream(TachyonFile tachyonFile, ReadType readType, int i, Object obj, TachyonConf tachyonConf) throws IOException {
        super(tachyonFile, readType, i, tachyonConf);
        this.mCheckpointInputStream = null;
        this.mCheckpointPos = -1L;
        this.mBlockPos = 0L;
        this.mCurrentBuffer = null;
        this.mAttemptReadFromWorkers = true;
        this.mBlockOutStream = null;
        this.mUFSConf = null;
        this.mBytesReadRemote = 0L;
        if (!this.mFile.isComplete()) {
            throw new IOException("File " + this.mFile.getPath() + " is not ready to read");
        }
        this.mBlockInfo = this.mFile.getClientBlockInfo(this.mBlockIndex);
        this.mRecache = readType.isCache();
        this.mUFSConf = obj;
    }

    private void cancelRecache() throws IOException {
        if (this.mRecache) {
            this.mRecache = false;
            if (this.mBlockOutStream != null) {
                this.mBlockOutStream.cancel();
            }
        }
    }

    @Override // tachyon.client.InStream, java.io.InputStream, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.mClosed) {
            return;
        }
        if (this.mRecache && this.mBlockOutStream != null) {
            if (this.mBlockPos == this.mBlockInfo.length) {
                this.mBlockOutStream.close();
            } else {
                this.mBlockOutStream.cancel();
            }
        }
        if (this.mCheckpointInputStream != null) {
            this.mCheckpointInputStream.close();
        }
        if (this.mBytesReadRemote > 0) {
            this.mTachyonFS.getClientMetrics().incBlocksReadRemote(1L);
        }
        this.mClosed = true;
    }

    @Override // tachyon.client.InStream, java.io.InputStream
    public int read() throws IOException {
        byte[] bArr = new byte[1];
        if (read(bArr) == -1) {
            return -1;
        }
        return bArr[0] & 255;
    }

    @Override // tachyon.client.InStream, java.io.InputStream
    public int read(byte[] bArr) throws IOException {
        return read(bArr, 0, bArr.length);
    }

    @Override // tachyon.client.InStream, java.io.InputStream
    public int read(byte[] bArr, int i, int i2) throws IOException {
        if (bArr == null) {
            throw new NullPointerException();
        }
        if (i < 0 || i2 < 0 || i2 > bArr.length - i) {
            throw new IndexOutOfBoundsException();
        }
        if (i2 == 0) {
            return 0;
        }
        if (this.mBlockPos == this.mBlockInfo.length) {
            return -1;
        }
        int min = (int) Math.min(i2, this.mBlockInfo.length - this.mBlockPos);
        int i3 = min;
        if (i3 > 0 && this.mBlockOutStream == null && this.mRecache) {
            try {
                this.mBlockOutStream = BlockOutStream.get(this.mFile, WriteType.TRY_CACHE, this.mBlockIndex, this.mTachyonConf);
            } catch (IOException e) {
                LOG.warn("Recache attempt failed.", e);
                cancelRecache();
            }
        }
        while (i3 > 0 && this.mAttemptReadFromWorkers && updateCurrentBuffer()) {
            int min2 = Math.min(i3, this.mCurrentBuffer.remaining());
            this.mCurrentBuffer.get(bArr, i, min2);
            if (this.mRecache) {
                this.mBlockOutStream.write(bArr, i, min2);
            }
            i += min2;
            i3 -= min2;
            this.mBlockPos += min2;
        }
        this.mBytesReadRemote += min - i3;
        this.mTachyonFS.getClientMetrics().incBytesReadRemote(min - i3);
        if (i3 > 0) {
            this.mAttemptReadFromWorkers = false;
            if (!setupStreamFromUnderFs()) {
                LOG.error("Failed to read at position " + this.mBlockPos + " in block " + this.mBlockInfo.getBlockId() + " from workers or underfs");
                return min - i3;
            }
            while (i3 > 0) {
                int read = this.mCheckpointInputStream.read(bArr, i, i3);
                if (read <= 0) {
                    LOG.error("Checkpoint stream read 0 bytes, which shouldn't ever happen");
                    return min - i3;
                }
                if (this.mRecache) {
                    this.mBlockOutStream.write(bArr, i, read);
                }
                i += read;
                i3 -= read;
                this.mBlockPos += read;
                this.mCheckpointPos += read;
                this.mTachyonFS.getClientMetrics().incBytesReadUfs(read);
            }
        }
        return min;
    }

    public static ByteBuffer readRemoteByteBuffer(TachyonFS tachyonFS, ClientBlockInfo clientBlockInfo, long j, long j2, TachyonConf tachyonConf) {
        ByteBuffer byteBuffer = null;
        try {
            List<NetAddress> locations = clientBlockInfo.getLocations();
            LOG.info("Block locations:" + locations);
            String localHostName = NetworkUtils.getLocalHostName(tachyonConf);
            for (NetAddress netAddress : locations) {
                String str = netAddress.mHost;
                int i = netAddress.mSecondaryPort;
                if (netAddress.mPort != -1) {
                    if (str.equals(InetAddress.getLocalHost().getHostName()) || str.equals(InetAddress.getLocalHost().getHostAddress()) || str.equals(localHostName)) {
                        LOG.warn("Master thinks the local machine has data, but not!(or local read is disabled) blockId:{}", Long.valueOf(clientBlockInfo.blockId));
                    }
                    LOG.info(str + ":" + i + " current host is " + localHostName + " " + NetworkUtils.getLocalIpAddress(tachyonConf));
                    try {
                        byteBuffer = retrieveByteBufferFromRemoteMachine(new InetSocketAddress(str, i), clientBlockInfo.blockId, j, j2, tachyonConf);
                    } catch (IOException e) {
                        LOG.error("Fail to retrieve byte buffer for block " + clientBlockInfo.blockId + " from remote " + str + ":" + i + " with offset " + j + " and length " + j2, e);
                        byteBuffer = null;
                    }
                    if (byteBuffer != null) {
                        break;
                    }
                }
            }
        } catch (IOException e2) {
            LOG.error("Failed to get read data from remote ", e2);
            byteBuffer = null;
        }
        return byteBuffer;
    }

    private static ByteBuffer retrieveByteBufferFromRemoteMachine(InetSocketAddress inetSocketAddress, long j, long j2, long j3, TachyonConf tachyonConf) throws IOException {
        return RemoteBlockReader.Factory.createRemoteBlockReader(tachyonConf).readRemoteBlock(inetSocketAddress.getHostName(), inetSocketAddress.getPort(), j, j2, j3);
    }

    @Override // tachyon.client.InStream
    public void seek(long j) throws IOException {
        if (j < 0) {
            throw new IOException("Seek position is negative: " + j);
        }
        if (j > this.mBlockInfo.length) {
            throw new IOException("Seek position is past block size: " + j + ", Block Size = " + this.mBlockInfo.length);
        }
        if (j == this.mBlockPos) {
            return;
        }
        cancelRecache();
        this.mBlockPos = j;
    }

    private boolean setupStreamFromUnderFs() throws IOException {
        if (this.mCheckpointInputStream == null || this.mBlockPos < this.mCheckpointPos) {
            String ufsPath = this.mFile.getUfsPath();
            LOG.info("Opening stream from underlayer fs: " + ufsPath);
            if (ufsPath.equals("")) {
                return false;
            }
            this.mCheckpointInputStream = UnderFileSystem.get(ufsPath, this.mUFSConf, this.mTachyonConf).open(ufsPath);
            if (this.mCheckpointInputStream.skip(this.mBlockInfo.offset) != this.mBlockInfo.offset) {
                throw new IOException("Failed to skip to the block offset " + this.mBlockInfo.offset + " in the checkpoint file");
            }
            this.mCheckpointPos = 0L;
        }
        while (this.mCheckpointPos < this.mBlockPos) {
            long skip = this.mCheckpointInputStream.skip(this.mBlockPos - this.mCheckpointPos);
            if (skip <= 0) {
                throw new IOException("Failed to skip to the position " + this.mBlockPos + " for block " + this.mBlockInfo);
            }
            this.mCheckpointPos += skip;
        }
        return true;
    }

    @Override // tachyon.client.InStream, java.io.InputStream
    public long skip(long j) throws IOException {
        if (j <= 0) {
            return 0L;
        }
        cancelRecache();
        long min = Math.min(j, this.mBlockInfo.length - this.mBlockPos);
        this.mBlockPos += min;
        return min;
    }

    private boolean updateCurrentBuffer() throws IOException {
        long bytes = this.mTachyonConf.getBytes("tachyon.user.remote.read.buffer.size.byte", 8388608L);
        if (this.mCurrentBuffer != null && this.mBufferStartPos <= this.mBlockPos && this.mBlockPos < Math.min(this.mBufferStartPos + bytes, this.mBlockInfo.length)) {
            this.mCurrentBuffer.position((int) (this.mBlockPos - this.mBufferStartPos));
            return true;
        }
        this.mBufferStartPos = this.mBlockPos;
        long min = Math.min(bytes, this.mBlockInfo.length - this.mBufferStartPos);
        LOG.info(String.format("Try to find remote worker and read block %d from %d, with len %d", Long.valueOf(this.mBlockInfo.blockId), Long.valueOf(this.mBufferStartPos), Long.valueOf(min)));
        for (int i = 0; i < MAX_REMOTE_READ_ATTEMPTS; i++) {
            this.mCurrentBuffer = readRemoteByteBuffer(this.mTachyonFS, this.mBlockInfo, this.mBufferStartPos, min, this.mTachyonConf);
            if (this.mCurrentBuffer != null) {
                return true;
            }
            this.mBlockInfo = this.mFile.getClientBlockInfo(this.mBlockIndex);
        }
        return false;
    }
}
