/*
 * Decompiled with CFR 0.152.
 */
package amazon.emr.metrics;

import amazon.emr.MetricProtos;
import amazon.emr.metrics.ChunkUtil;
import amazon.emr.metrics.MetricRecordReader;
import amazon.emr.metrics.MetricsConfig;
import amazon.emr.metrics.MetricsSaver;
import amazon.emr.metrics.MetricsUtil;
import amazon.emr.metrics.RecordWriter;
import amazon.emr.metrics.S3ChunkInputStream;
import amazon.emr.metrics.S3ChunkOutputStream;
import amazon.emr.metrics.S3Path;
import amazon.emr.metrics.SessionS3Client;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.DeleteObjectRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import java.io.File;
import java.io.IOException;
import java.util.Random;
import java.util.Vector;
import junit.framework.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class TestS3ChunkStream {
    static final Logger logger = LoggerFactory.getLogger(TestS3ChunkStream.class);
    int recordIndex = 0;
    int verifiedIndex = 0;
    Random random = new Random();
    int numTestFiles = 10;
    final String scratchDir = "/tmp/scratch";
    Vector<String> testFiles = new Vector();
    final boolean verbose = false;
    Exception exception = null;
    String bucket = "danzhi-test";
    String prefix = "testS3Chunk/";
    SessionS3Client session = null;
    String s3Dir = "s3://danzhi-test/testS3Chunk";

    public TestS3ChunkStream() throws IOException {
        for (int i = 0; i < this.numTestFiles; ++i) {
            String testFile = MetricsUtil.combinePath("/tmp/scratch", "test_" + i + ".bin");
            this.testFiles.add(testFile);
            MetricsUtil.deleteFile(testFile);
        }
        MetricsUtil.ensureDir("/tmp/scratch");
        if (MetricsUtil.isDesktopMachine()) {
            this.session = new SessionS3Client("/home/danzhi/emr/credentials.json", this.bucket, this.prefix);
        } else {
            this.session = new SessionS3Client();
            this.bucket = this.session.getBucket();
            this.prefix = this.session.getPrefix();
            this.s3Dir = "s3://" + this.bucket + "/" + this.prefix + "testS3Chunk";
            logger.info("s3Dir: {}", (Object)this.s3Dir);
        }
    }

    MetricProtos.EmrMetricRecord getIndexedRecord(int index) {
        MetricProtos.EmrMetricRecord.Builder rb = MetricProtos.EmrMetricRecord.newBuilder();
        MetricProtos.EmrMetricKey.Builder kb = MetricProtos.EmrMetricKey.newBuilder();
        kb.setInstanceId("TestInstance");
        kb.setProcess("TestProcess");
        kb.setKey("TestKey");
        kb.setInterval(0);
        MetricProtos.EmrMetricRawValue.Builder vb = MetricProtos.EmrMetricRawValue.newBuilder();
        vb.setTime(System.currentTimeMillis());
        vb.setValue(index);
        rb.setKey(kb.build());
        rb.addValues(vb.build());
        return rb.build();
    }

    MetricProtos.FileChunk getTestChunk() throws IOException {
        String testFile = this.testFiles.get(this.random.nextInt(this.numTestFiles));
        MetricProtos.FileChunk.Builder cb = MetricProtos.FileChunk.newBuilder();
        cb.setFilePath(testFile);
        cb.setBegin(MetricsUtil.getFileLength(testFile));
        cb.setMinTime(System.currentTimeMillis());
        int recordCount = 0;
        while (recordCount == 0) {
            recordCount = this.random.nextInt(8096);
        }
        Vector<MetricProtos.EmrMetricRecord> records = new Vector<MetricProtos.EmrMetricRecord>();
        for (int i = 0; i < recordCount; ++i) {
            records.add(this.getIndexedRecord(this.recordIndex++));
        }
        RecordWriter<MetricProtos.EmrMetricRecord> w = new RecordWriter<MetricProtos.EmrMetricRecord>(testFile);
        w.appendAndFlush(records);
        cb.setEnd(MetricsUtil.getFileLength(testFile));
        cb.setMaxTime(System.currentTimeMillis());
        cb.setTime(System.currentTimeMillis());
        return cb.build();
    }

    Vector<MetricProtos.FileChunk> getTestChunks() throws IOException {
        int numChunks = 0;
        while (numChunks == 0) {
            numChunks = this.random.nextInt(10);
        }
        Vector<MetricProtos.FileChunk> chunks = new Vector<MetricProtos.FileChunk>();
        for (int i = 0; i < numChunks; ++i) {
            chunks.add(this.getTestChunk());
        }
        return chunks;
    }

    public static void testSessionToken() throws RuntimeException, IOException {
        String prefix;
        String bucket;
        SessionS3Client session = null;
        if (MetricsUtil.isDesktopMachine()) {
            bucket = "danzhi-test-new";
            prefix = "sampleprefix/";
            String credentialFile = "/home/danzhi/emr/credentials.json";
            session = new SessionS3Client(credentialFile, bucket, prefix);
        } else {
            session = new SessionS3Client();
            bucket = session.getBucket();
            prefix = session.getPrefix();
        }
        AmazonS3Client client = session.s3Client();
        Assert.assertNotNull((Object)client);
        String randomFile = "/home/hadoop/lib/log4j-1.2.16.jar";
        if (MetricsUtil.fileExists(randomFile)) {
            String key = prefix + "randomFile.jar";
            logger.info("putObject bucket:{} key:{}", (Object)bucket, (Object)key);
            client.putObject(bucket, key, new File(randomFile));
            logger.info("deleteObject bucket: {} prefix: {}", (Object)bucket, (Object)key);
            client.deleteObject(new DeleteObjectRequest(bucket, key));
        }
        try {
            logger.info("listObjects bucket: {} prefix: {}", (Object)bucket, (Object)prefix);
            ObjectListing objectListing = session.s3Client().listObjects(bucket, prefix);
            for (S3ObjectSummary obj : objectListing.getObjectSummaries()) {
                logger.info("  {}", (Object)S3Path.getPath(bucket, obj.getKey()));
            }
        }
        catch (Exception e) {
            logger.info("listObjects with session token failed with exception ", (Throwable)e);
        }
    }

    void run(MetricsConfig configxx) throws Exception {
        S3ChunkOutputStream.delete(this.s3Dir, this.session);
        MetricsSaver.StopWatch stopWatch = new MetricsSaver.StopWatch();
        Writer writer = new Writer();
        Reader reader = new Reader();
        writer.start();
        reader.start();
        writer.waitComplete();
        reader.stop = true;
        reader.waitComplete();
        Assert.assertEquals(null, (Object)this.exception);
        Assert.assertEquals((int)this.verifiedIndex, (int)this.recordIndex);
        logger.info("TestS3ChunkStream finished in {} seconds", (Object)stopWatch.elapsedSeconds());
    }

    class Reader
    extends Thread {
        boolean stop = false;

        Reader() {
        }

        @Override
        public void run() {
            logger.info("Reader started");
            try {
                S3ChunkInputStream input = new S3ChunkInputStream(TestS3ChunkStream.this.s3Dir, TestS3ChunkStream.this.session);
                while (true) {
                    boolean lastLoop = this.stop;
                    logger.info("Reader verified:{}/{}", (Object)TestS3ChunkStream.this.verifiedIndex, (Object)TestS3ChunkStream.this.recordIndex);
                    Vector<MetricProtos.FileChunk> chunks = input.read();
                    for (MetricProtos.FileChunk chunk : chunks) {
                        Vector<MetricProtos.EmrMetricRecord> records = MetricRecordReader.readMetricsFile(chunk);
                        for (MetricProtos.EmrMetricRecord record : records) {
                            Assert.assertEquals((int)1, (int)record.getValuesCount());
                            int index = (int)record.getValues(0).getValue();
                            Assert.assertEquals((int)TestS3ChunkStream.this.verifiedIndex++, (int)index);
                        }
                    }
                    if (lastLoop) {
                        return;
                    }
                    Thread.sleep(TestS3ChunkStream.this.random.nextInt(3000));
                }
            }
            catch (Exception ex) {
                logger.info("Reader exception ", (Throwable)ex);
                TestS3ChunkStream.this.exception = ex;
                return;
            }
        }

        public synchronized void waitComplete() throws InterruptedException {
            super.wait();
            logger.info("Reader completed");
        }
    }

    class Writer
    extends Thread {
        static final int WriterCycles = 40;

        Writer() {
        }

        @Override
        public void run() {
            logger.info("Writer started");
            try {
                S3ChunkOutputStream output = new S3ChunkOutputStream(TestS3ChunkStream.this.s3Dir, TestS3ChunkStream.this.session);
                output.delete();
                ChunkUtil.showS3Recursive(TestS3ChunkStream.this.session.s3Client(), TestS3ChunkStream.this.s3Dir);
                int cycle = 0;
                while (++cycle < 40) {
                    logger.info("Writer cycle {} recordIndex:{}", (Object)cycle, (Object)TestS3ChunkStream.this.recordIndex);
                    Vector<MetricProtos.FileChunk> chunks = TestS3ChunkStream.this.getTestChunks();
                    output.write(chunks);
                    Thread.sleep(2000L);
                }
            }
            catch (Exception ex) {
                logger.info("Writer exception ", (Throwable)ex);
                TestS3ChunkStream.this.exception = ex;
                return;
            }
        }

        public synchronized void waitComplete() throws InterruptedException {
            super.wait();
            logger.info("Writer completed");
        }
    }
}

