package org.apache.hadoop.fs.s3a.scale;

import java.io.EOFException;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AInputPolicy;
import org.apache.hadoop.fs.s3a.S3AInputStream;
import org.apache.hadoop.fs.s3a.S3AInstrumentation;
import org.apache.hadoop.fs.s3a.S3ATestConstants;
import org.apache.hadoop.fs.s3a.S3ATestUtils;
import org.apache.hadoop.fs.s3a.TestS3AInputPolicies;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.util.LineReader;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:test-classes/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.class */
public class ITestS3AInputStreamPerformance extends S3AScaleTestBase {
    private S3AFileSystem s3aFS;
    private Path testData;
    private S3AFileStatus testDataStatus;
    private FSDataInputStream in;
    private S3AInstrumentation.InputStreamStatistics streamStatistics;
    public static final int BLOCK_SIZE = 32768;
    public static final int BIG_BLOCK_SIZE = 262144;
    private boolean testDataAvailable = true;
    private String assumptionMessage = "test file";
    public static final int _4K = 4096;
    public static final int _8K = 8192;
    public static final int _16K = 16384;
    public static final int _32K = 32768;
    public static final int _64K = 65536;
    public static final int _256K = 262144;
    public static final int _1MB = 1048576;
    public static final int _10MB = 10485760;
    public static final int _5MB = 5242880;
    private static final Logger LOG = LoggerFactory.getLogger(ITestS3AInputStreamPerformance.class);
    public static final int _2MB = 2097152;
    public static final int _128K = 131072;
    private static final int[][] RANDOM_IO_SEQUENCE = {new int[]{_2MB, _128K}, new int[]{_128K, _128K}, new int[]{5242880, 65536}, new int[]{1048576, 1048576}};

    @Before
    public void openFS() throws IOException {
        Configuration conf = getConf();
        conf.setInt(Constants.SOCKET_SEND_BUFFER, 16384);
        conf.setInt(Constants.SOCKET_RECV_BUFFER, 16384);
        String trimmed = conf.getTrimmed(S3ATestConstants.KEY_CSVTEST_FILE, S3ATestConstants.DEFAULT_CSVTEST_FILE);
        if (trimmed.isEmpty()) {
            this.assumptionMessage = "Empty test property: fs.s3a.scale.test.csvfile";
            LOG.warn(this.assumptionMessage);
            this.testDataAvailable = false;
            return;
        }
        this.testData = new Path(trimmed);
        LOG.info("Using {} as input stream source", this.testData);
        bindS3aFS(this.testData);
        try {
            this.testDataStatus = this.s3aFS.m44getFileStatus(this.testData);
        } catch (IOException e) {
            LOG.warn("Failed to read file {} specified in {}", new Object[]{trimmed, S3ATestConstants.KEY_CSVTEST_FILE, e});
            throw e;
        }
    }

    private void bindS3aFS(Path path) throws IOException {
        this.s3aFS = (S3AFileSystem) FileSystem.newInstance(path.toUri(), getConf());
    }

    @After
    public void cleanup() {
        describe("cleanup");
        IOUtils.closeStream(this.in);
        IOUtils.closeStream(this.s3aFS);
    }

    private void requireCSVTestData() {
        S3ATestUtils.assume(this.assumptionMessage, this.testDataAvailable);
    }

    FSDataInputStream openTestFile() throws IOException {
        return openTestFile(S3AInputPolicy.Normal, 0L);
    }

    FSDataInputStream openTestFile(S3AInputPolicy s3AInputPolicy, long j) throws IOException {
        requireCSVTestData();
        return openDataFile(this.s3aFS, this.testData, s3AInputPolicy, j);
    }

    private FSDataInputStream openDataFile(S3AFileSystem s3AFileSystem, Path path, S3AInputPolicy s3AInputPolicy, long j) throws IOException {
        int i = getConf().getInt(S3ATestConstants.KEY_READ_BUFFER_SIZE, 16384);
        S3AInputPolicy inputPolicy = s3AFileSystem.getInputPolicy();
        s3AFileSystem.setInputPolicy(s3AInputPolicy);
        try {
            FSDataInputStream open = s3AFileSystem.open(path, i);
            if (j >= 0) {
                open.setReadahead(Long.valueOf(j));
            }
            this.streamStatistics = getInputStreamStatistics(open);
            s3AFileSystem.setInputPolicy(inputPolicy);
            return open;
        } catch (Throwable th) {
            s3AFileSystem.setInputPolicy(inputPolicy);
            throw th;
        }
    }

    protected void assertStreamOpenedExactlyOnce() {
        assertOpenOperationCount(1L);
    }

    private void assertOpenOperationCount(long j) {
        assertEquals("open operations in\n" + this.in, j, this.streamStatistics.openOperations);
    }

    protected void logTimePerIOP(String str, ContractTestUtils.NanoTimer nanoTimer, long j) {
        LOG.info("Time per {}: {} nS", str, ContractTestUtils.toHuman(nanoTimer.duration() / j));
    }

    @Test
    public void testTimeToOpenAndReadWholeFileBlocks() throws Throwable {
        requireCSVTestData();
        describe("Open the test file %s and read it in blocks of size %d", this.testData, 1048576);
        long len = this.testDataStatus.getLen();
        this.in = openTestFile();
        byte[] bArr = new byte[1048576];
        ContractTestUtils.NanoTimer nanoTimer = new ContractTestUtils.NanoTimer();
        long j = 0;
        long j2 = len / 1048576;
        long j3 = j2 * 1048576;
        int i = 0;
        long j4 = 0;
        while (true) {
            long j5 = j4;
            if (j5 >= j2) {
                nanoTimer.end("Time to read %d bytes in %d blocks", new Object[]{Long.valueOf(j3), Long.valueOf(j2)});
                LOG.info("Overall Bandwidth {} MB/s; reset connections {}", Double.valueOf(nanoTimer.bandwidth(j3)), Integer.valueOf(i));
                logStreamStatistics();
                return;
            }
            int i2 = 0;
            int i3 = 1048576;
            long j6 = j5 + 1;
            ContractTestUtils.NanoTimer nanoTimer2 = new ContractTestUtils.NanoTimer();
            int i4 = 0;
            while (i3 > 0) {
                ContractTestUtils.NanoTimer nanoTimer3 = new ContractTestUtils.NanoTimer();
                int read = this.in.read(bArr, i2, i3);
                i4++;
                if (read == 1) {
                    break;
                }
                i3 -= read;
                i2 += read;
                j += read;
                nanoTimer3.end();
                if (read != 0) {
                    LOG.debug("Bytes in read #{}: {} , block bytes: {}, remaining in block: {} duration={} nS; ns/byte: {}, bandwidth={} MB/s", new Object[]{Integer.valueOf(i4), Integer.valueOf(read), Integer.valueOf(1048576 - i3), Integer.valueOf(i3), Long.valueOf(nanoTimer3.duration()), Long.valueOf(nanoTimer3.nanosPerOperation(read)), nanoTimer3.bandwidthDescription(read)});
                } else {
                    LOG.warn("0 bytes returned by read() operation #{}", Integer.valueOf(i4));
                }
            }
            nanoTimer2.end("Reading block %d in %d reads", new Object[]{Long.valueOf(j6), Integer.valueOf(i4)});
            String bandwidthDescription = nanoTimer2.bandwidthDescription(1048576);
            LOG.info("Bandwidth of block {}: {} MB/s: ", Long.valueOf(j6), bandwidthDescription);
            if (bandwidth(nanoTimer2, 1048576) < TestS3AInputPolicies._128K) {
                LOG.warn("Bandwidth {} too low on block {}: resetting connection", bandwidthDescription, Long.valueOf(j6));
                Assert.assertTrue("Bandwidth of " + bandwidthDescription + " too low after  " + i + " attempts", i <= 4);
                i++;
                getS3AInputStream(this.in).resetConnection();
            }
            j4 = j5 + 1;
        }
    }

    public static double bandwidth(ContractTestUtils.NanoTimer nanoTimer, long j) {
        return (j * 1.0E9d) / nanoTimer.duration();
    }

    @Test
    public void testLazySeekEnabled() throws Throwable {
        describe("Verify that seeks do not trigger any IO");
        this.in = openTestFile();
        long len = this.testDataStatus.getLen();
        ContractTestUtils.NanoTimer nanoTimer = new ContractTestUtils.NanoTimer();
        long j = len / 32768;
        long j2 = 0;
        while (true) {
            long j3 = j2;
            if (j3 >= j) {
                this.in.seek(0L);
                long j4 = j + 1;
                nanoTimer.end("Time to execute %d seeks", new Object[]{Long.valueOf(j4)});
                logTimePerIOP("seek()", nanoTimer, j4);
                logStreamStatistics();
                assertOpenOperationCount(0L);
                assertEquals("bytes read", 0L, this.streamStatistics.bytesRead);
                return;
            }
            this.in.seek((this.in.getPos() + 32768) - 1);
            j2 = j3 + 1;
        }
    }

    @Test
    public void testReadaheadOutOfRange() throws Throwable {
        try {
            this.in = openTestFile();
            this.in.setReadahead(-1L);
            fail("Stream should have rejected the request " + this.in);
        } catch (IllegalArgumentException e) {
        }
    }

    @Test
    public void testReadWithNormalPolicy() throws Throwable {
        describe("Read big blocks with a big readahead");
        executeSeekReadSequence(TestS3AInputPolicies._256K, 524288L, S3AInputPolicy.Normal);
        assertStreamOpenedExactlyOnce();
    }

    @Test
    public void testDecompressionSequential128K() throws Throwable {
        describe("Decompress with a 128K readahead");
        executeDecompression(TestS3AInputPolicies._128K, S3AInputPolicy.Sequential);
        assertStreamOpenedExactlyOnce();
    }

    private void executeDecompression(long j, S3AInputPolicy s3AInputPolicy) throws IOException {
        CompressionCodec codec = new CompressionCodecFactory(getConf()).getCodec(this.testData);
        long j2 = 0;
        int i = 0;
        FSDataInputStream openTestFile = openTestFile(s3AInputPolicy, j);
        ContractTestUtils.NanoTimer nanoTimer = new ContractTestUtils.NanoTimer();
        try {
            LineReader lineReader = new LineReader(codec.createInputStream(openTestFile), getConf());
            Throwable th = null;
            try {
                try {
                    Text text = new Text();
                    while (true) {
                        int readLine = lineReader.readLine(text);
                        if (readLine <= 0) {
                            break;
                        }
                        j2 += readLine;
                        i++;
                    }
                    if (lineReader != null) {
                        if (0 != 0) {
                            try {
                                lineReader.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            lineReader.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (EOFException e) {
        }
        nanoTimer.end("Time to read %d lines [%d bytes expanded, %d raw] with readahead = %d", new Object[]{Integer.valueOf(i), Long.valueOf(j2), Long.valueOf(this.testDataStatus.getLen()), Long.valueOf(j)});
        logTimePerIOP("line read", nanoTimer, i);
        logStreamStatistics();
    }

    private void logStreamStatistics() {
        LOG.info(String.format("Stream Statistics%n{}", new Object[0]), this.streamStatistics);
    }

    protected void executeSeekReadSequence(long j, long j2, S3AInputPolicy s3AInputPolicy) throws IOException {
        this.in = openTestFile(s3AInputPolicy, j2);
        long len = this.testDataStatus.getLen();
        ContractTestUtils.NanoTimer nanoTimer = new ContractTestUtils.NanoTimer();
        long j3 = len / j;
        LOG.info("Reading {} blocks, readahead = {}", Long.valueOf(j3), Long.valueOf(j2));
        long j4 = 0;
        while (true) {
            long j5 = j4;
            if (j5 >= j3) {
                nanoTimer.end("Time to execute %d seeks of distance %d with readahead = %d", new Object[]{Long.valueOf(j3), Long.valueOf(j), Long.valueOf(j2)});
                logTimePerIOP("seek(pos + " + j3 + "); read()", nanoTimer, j3);
                LOG.info("Effective bandwidth {} MB/S", nanoTimer.bandwidthDescription(this.streamStatistics.bytesRead - this.streamStatistics.bytesSkippedOnSeek));
                logStreamStatistics();
                return;
            }
            this.in.seek((this.in.getPos() + j) - 1);
            assertTrue(this.in.read() >= 0);
            j4 = j5 + 1;
        }
    }

    @Test
    public void testRandomIORandomPolicy() throws Throwable {
        executeRandomIO(S3AInputPolicy.Random, RANDOM_IO_SEQUENCE.length);
        assertEquals("streams aborted in " + this.streamStatistics, 0L, this.streamStatistics.aborted);
    }

    @Test
    public void testRandomIONormalPolicy() throws Throwable {
        executeRandomIO(S3AInputPolicy.Normal, RANDOM_IO_SEQUENCE.length);
        assertEquals("streams aborted in " + this.streamStatistics, 4L, this.streamStatistics.aborted);
    }

    private ContractTestUtils.NanoTimer executeRandomIO(S3AInputPolicy s3AInputPolicy, long j) throws IOException {
        describe("Random IO with policy \"%s\"", s3AInputPolicy);
        byte[] bArr = new byte[1048576];
        long j2 = 0;
        this.in = openTestFile(s3AInputPolicy, 0L);
        ContractTestUtils.NanoTimer nanoTimer = new ContractTestUtils.NanoTimer();
        for (int[] iArr : RANDOM_IO_SEQUENCE) {
            int i = iArr[0];
            int i2 = iArr[1];
            this.in.readFully(i, bArr, 0, i2);
            j2 += i2;
        }
        nanoTimer.end("Time to execute %d reads of total size %d bytes", new Object[]{Integer.valueOf(RANDOM_IO_SEQUENCE.length), Long.valueOf(j2)});
        this.in.close();
        assertOpenOperationCount(j);
        logTimePerIOP("byte read", nanoTimer, j2);
        LOG.info("Effective bandwidth {} MB/S", nanoTimer.bandwidthDescription(this.streamStatistics.bytesRead - this.streamStatistics.bytesSkippedOnSeek));
        logStreamStatistics();
        return nanoTimer;
    }

    S3AInputStream getS3aStream() {
        return (S3AInputStream) this.in.getWrappedStream();
    }

    @Test
    public void testRandomReadOverBuffer() throws Throwable {
        describe("read over a buffer, making sure that the requests spans readahead ranges");
        S3AFileSystem fileSystem = m15getFileSystem();
        Path path = path("testReadOverBuffer.bin");
        ContractTestUtils.writeDataset(fileSystem, path, ContractTestUtils.dataset(32768, 0, 64), 32768, 16384, true);
        byte[] bArr = new byte[32768];
        this.in = openDataFile(fileSystem, path, S3AInputPolicy.Random, 8192);
        LOG.info("Starting initial reads");
        S3AInputStream s3aStream = getS3aStream();
        assertEquals(8192, s3aStream.getReadahead());
        assertEquals(1L, this.in.read(0L, new byte[1], 0, 1));
        assertEquals("remaining in\n" + this.in, 8192 - 1, s3aStream.remainingInCurrentRequest());
        assertEquals("range start in\n" + this.in, 0L, s3aStream.getContentRangeStart());
        assertEquals("range finish in\n" + this.in, 8192, s3aStream.getContentRangeFinish());
        assertStreamOpenedExactlyOnce();
        describe("Starting sequence of positioned read calls over\n%s", this.in);
        ContractTestUtils.NanoTimer nanoTimer = new ContractTestUtils.NanoTimer();
        int i = 4096;
        int i2 = 4096;
        int i3 = 0;
        int i4 = 0;
        while (i3 < 4096) {
            int read = this.in.read(i, bArr, i2, bArr.length - i2);
            i3 += read;
            i2 += read;
            i4++;
            assertEquals("open operations on request #" + i4 + " after reading " + i3 + " current position in stream " + i + " in\n" + fileSystem + "\n " + this.in, 1L, this.streamStatistics.openOperations);
            for (int i5 = i; i5 < i + read; i5++) {
                assertEquals("Wrong value from byte " + i5, r0[i5], bArr[i5]);
            }
            i += read;
        }
        assertStreamOpenedExactlyOnce();
        assertEquals(8192, i);
        nanoTimer.end("read %d in %d operations", new Object[]{Integer.valueOf(i3), Integer.valueOf(i4)});
        bandwidth(nanoTimer, i3);
        LOG.info("Time per byte(): {} nS", ContractTestUtils.toHuman(nanoTimer.nanosPerOperation(i3)));
        LOG.info("Time per read(): {} nS", ContractTestUtils.toHuman(nanoTimer.nanosPerOperation(i4)));
        describe("read last byte");
        assertTrue("-1 from last read", this.in.read((long) i, bArr, i3, 1) >= 0);
        assertOpenOperationCount(2L);
        assertEquals("Wrong value from read ", r0[i], bArr[i]);
        int i6 = i + 1;
        describe("read() to EOF over \n%s", this.in);
        long j = 0;
        ContractTestUtils.NanoTimer nanoTimer2 = new ContractTestUtils.NanoTimer();
        LOG.info("seeking");
        this.in.seek(i6);
        LOG.info("reading");
        while (i6 < 32768) {
            int read2 = this.in.read();
            assertTrue("Negative read() at position " + i6 + " in\n" + this.in, read2 >= 0);
            bArr[i6] = (byte) read2;
            assertEquals("Wrong value from read from\n" + this.in, r0[i6], read2);
            i6++;
            j++;
        }
        nanoTimer2.end("read %d bytes", new Object[]{Long.valueOf(j)});
        bandwidth(nanoTimer2, j);
        LOG.info("Time per read(): {} nS", ContractTestUtils.toHuman(nanoTimer2.nanosPerOperation(j)));
        assertEquals("last read in " + this.in, -1L, this.in.read());
    }
}
