package amazon.emr.metrics;

import amazon.emr.MetricProtos;
import amazon.emr.metrics.ChunkUtil;
import com.amazonaws.services.s3.model.DeleteObjectRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Vector;
import junit.framework.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:amazon/emr/metrics/S3ChunkOutputStream.class */
public class S3ChunkOutputStream {
    static final Logger logger = LoggerFactory.getLogger(S3ChunkOutputStream.class);
    static final long chunkSplitSize = 134217728;
    final String s3bucket;
    final String s3dir;
    final String s3dirkey;
    final String localDir;
    final String s3ChunkIndexFile;
    final String s3ChunkIndexFileLocal;
    final String s3ChunkPrefix = "chunk_";
    static final String s3outputRoot = "/mnt/var/lib/s3output";
    long streamLength;
    SessionS3Client session;
    Vector<MetricProtos.FileChunk> s3Chunks;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:amazon/emr/metrics/S3ChunkOutputStream$ChunkBurst.class */
    public static class ChunkBurst {
        public int begin = 0;
        public int end = 0;
        public int count = 0;
        long length = 0;
        static final long matureSize = 33554432;
        static final long maxSize = 67108864;

        static ChunkBurst computeMergeChunkBurst(Vector<MetricProtos.FileChunk> vector) {
            Vector vector2 = new Vector();
            ChunkBurst chunkBurst = null;
            for (int i = 0; i < vector.size(); i++) {
                long end = vector.get(i).getEnd() - vector.get(i).getBegin();
                if (end < matureSize && (chunkBurst == null || chunkBurst.length + end <= maxSize)) {
                    if (chunkBurst == null) {
                        chunkBurst = new ChunkBurst();
                        chunkBurst.begin = i;
                    }
                    chunkBurst.length += end;
                    chunkBurst.end = i + 1;
                    chunkBurst.count = chunkBurst.end - chunkBurst.begin;
                } else if (chunkBurst != null) {
                    vector2.add(chunkBurst);
                    chunkBurst = null;
                }
            }
            if (chunkBurst != null) {
                vector2.add(chunkBurst);
            }
            Iterator it = vector2.iterator();
            while (it.hasNext()) {
                ChunkBurst chunkBurst2 = (ChunkBurst) it.next();
                if (chunkBurst2.length >= matureSize) {
                    return chunkBurst2;
                }
            }
            Iterator it2 = vector2.iterator();
            while (it2.hasNext()) {
                ChunkBurst chunkBurst3 = (ChunkBurst) it2.next();
                if (chunkBurst3.count >= 8 && chunkBurst3.end < vector.size()) {
                    return chunkBurst3;
                }
            }
            if (((ChunkBurst) vector2.lastElement()).count >= 128) {
                return (ChunkBurst) vector2.lastElement();
            }
            return null;
        }
    }

    public S3ChunkOutputStream(String str, SessionS3Client sessionS3Client) throws IOException {
        this.session = sessionS3Client;
        S3Path parse = S3Path.parse(str);
        this.s3bucket = parse.s3bucket;
        this.s3dirkey = parse.s3key;
        this.s3dir = str;
        this.s3ChunkIndexFile = S3Path.combine(str, "chunk_index.bin");
        this.localDir = MetricsUtil.combinePath(s3outputRoot, MetricsUtil.combinePath(this.s3bucket, this.s3dirkey));
        this.s3ChunkIndexFileLocal = MetricsUtil.combinePath(this.localDir, "chunk_index.bin");
        MetricsUtil.recursivelyDeleteFolder(this.localDir);
        MetricsUtil.deleteFile(this.s3ChunkIndexFileLocal);
        logger.info("Construct S3ChunkOutputStream localDir {}", this.localDir);
        MetricsUtil.ensureDir(this.localDir);
        this.s3Chunks = new Vector<>();
        ChunkUtil.download(sessionS3Client.s3Client(), this.s3ChunkIndexFile, this.s3ChunkIndexFileLocal);
        if (MetricsUtil.fileExists(this.s3ChunkIndexFileLocal)) {
            this.s3Chunks = MetricsReader.readChunkIndex(this.s3ChunkIndexFileLocal);
            logger.info("{} indexed chunks from {}", Integer.valueOf(this.s3Chunks.size()), this.s3ChunkIndexFile);
        } else {
            this.s3Chunks = new Vector<>();
        }
        this.streamLength = this.s3Chunks.size() > 0 ? this.s3Chunks.lastElement().getEnd() : 0L;
        showIndex();
        cleanupOrphanedChunks();
        logger.info(String.format("S3ChunkOutputStream %s chunks:%d length:%d", str, Integer.valueOf(this.s3Chunks.size()), Long.valueOf(this.streamLength)));
    }

    long length() {
        return this.streamLength;
    }

    public static void delete(String str, SessionS3Client sessionS3Client) throws IOException {
        S3Path parse = S3Path.parse(str);
        logger.info("delete chunk stream {}", str);
        for (S3ObjectSummary s3ObjectSummary : sessionS3Client.s3Client().listObjects(parse.s3bucket, parse.s3key + "/").getObjectSummaries()) {
            logger.info("delete {}", S3Path.getPath(parse.s3bucket, s3ObjectSummary.getKey()));
            sessionS3Client.s3Client().deleteObject(new DeleteObjectRequest(parse.s3bucket, s3ObjectSummary.getKey()));
        }
    }

    public void delete() throws IOException {
        if (this.streamLength == 0 && this.s3Chunks.size() == 0) {
            return;
        }
        logger.info("delete {}", this.s3dir);
        this.s3Chunks.clear();
        this.streamLength = 0L;
        cleanupOrphanedChunks();
        MetricsUtil.deleteFile(this.s3ChunkIndexFileLocal);
    }

    public boolean writeMergable(Vector<MetricProtos.FileChunk> vector) throws IOException, InterruptedException {
        return write(ChunkUtil.mergeChunks(vector));
    }

    public boolean write(Vector<MetricProtos.FileChunk> vector) throws IOException, InterruptedException {
        if (vector.size() == 0) {
            return true;
        }
        long j = 0;
        Iterator<MetricProtos.FileChunk> it = vector.iterator();
        while (it.hasNext()) {
            MetricProtos.FileChunk next = it.next();
            j += next.getEnd() - next.getBegin();
        }
        if (j < chunkSplitSize || vector.size() < 2) {
            return writeInternal(vector);
        }
        logger.info("Split {} input chunks into 128MB splits", Integer.valueOf(vector.size()));
        Vector<MetricProtos.FileChunk> vector2 = new Vector<>();
        long j2 = 0;
        Iterator<MetricProtos.FileChunk> it2 = vector.iterator();
        while (it2.hasNext()) {
            MetricProtos.FileChunk next2 = it2.next();
            vector2.add(next2);
            j2 += next2.getEnd() - next2.getBegin();
            if (j2 >= chunkSplitSize) {
                if (!writeInternal(vector2)) {
                    return false;
                }
                vector2.clear();
                j2 = 0;
            }
        }
        return true;
    }

    private boolean writeInternal(Vector<MetricProtos.FileChunk> vector) throws IOException, InterruptedException {
        if (vector.size() == 0) {
            return true;
        }
        long maxTime = getMaxTime(vector);
        String str = "chunk_" + maxTime + ".bin";
        String combinePath = MetricsUtil.combinePath(this.localDir, str);
        String combinePath2 = MetricsUtil.combinePath(this.s3dir, str);
        ChunkUtil.stitchFileChunks(vector, combinePath);
        boolean upload = ChunkUtil.upload(this.session.s3Client(), combinePath, combinePath2);
        if (upload) {
            MetricProtos.FileChunk.Builder newBuilder = MetricProtos.FileChunk.newBuilder();
            newBuilder.setMinTime(getMinTime(vector));
            newBuilder.setMaxTime(maxTime);
            newBuilder.setFilePath(combinePath2);
            long fileLength = MetricsUtil.getFileLength(combinePath);
            Assert.assertEquals(getLength(vector), fileLength);
            newBuilder.setBegin(this.streamLength);
            newBuilder.setEnd(this.streamLength + fileLength);
            newBuilder.setTime(System.currentTimeMillis());
            MetricProtos.FileChunk build = newBuilder.build();
            new RecordWriter(this.s3ChunkIndexFileLocal).appendAndFlush((RecordWriter) build);
            upload = ChunkUtil.upload(this.session.s3Client(), this.s3ChunkIndexFileLocal, this.s3ChunkIndexFile);
            this.s3Chunks.add(build);
            this.streamLength += fileLength;
        } else {
            logger.info("Failed to upload {} input chunks", Integer.valueOf(vector.size()));
        }
        MetricsUtil.deleteFile(combinePath);
        compact();
        return upload;
    }

    void compact() throws IOException, InterruptedException {
        if (this.s3Chunks.size() < 16) {
            return;
        }
        logger.info("Enter compact {} chunks", Integer.valueOf(this.s3Chunks.size()));
        ChunkBurst computeMergeChunkBurst = ChunkBurst.computeMergeChunkBurst(this.s3Chunks);
        if (computeMergeChunkBurst == null) {
            return;
        }
        Vector<MetricProtos.FileChunk> vector = new Vector<>();
        long j = 0;
        for (int i = computeMergeChunkBurst.begin; i < computeMergeChunkBurst.end; i++) {
            MetricProtos.FileChunk fileChunk = this.s3Chunks.get(i);
            logger.info("  {} {}", fileChunk.getFilePath(), Long.valueOf(fileChunk.getEnd()));
            vector.add(fileChunk);
            j = Math.max(j, fileChunk.getMaxTime());
        }
        logger.info("compact {} chunks total size {}", Integer.valueOf(computeMergeChunkBurst.count), Long.valueOf(computeMergeChunkBurst.length));
        Vector<ChunkUtil.DownloadChunkTask> downloadChunks = ChunkUtil.downloadChunks(this.session.s3Client(), vector, this.localDir);
        Vector vector2 = new Vector();
        Iterator<ChunkUtil.DownloadChunkTask> it = downloadChunks.iterator();
        while (it.hasNext()) {
            ChunkUtil.DownloadChunkTask next = it.next();
            if (!next.success) {
                return;
            } else {
                vector2.add(next.localFile);
            }
        }
        logger.info("{} local files", Integer.valueOf(vector2.size()));
        Assert.assertTrue(mergeable(vector));
        String combinePath = MetricsUtil.combinePath(this.localDir, "stitch_" + j + ".bin");
        ChunkUtil.stitchFiles(vector2, combinePath);
        String combinePath2 = MetricsUtil.combinePath(this.s3dir, "chunk_" + j + ".bin");
        if (ChunkUtil.upload(this.session.s3Client(), combinePath, combinePath2)) {
            MetricProtos.FileChunk.Builder newBuilder = MetricProtos.FileChunk.newBuilder(ChunkUtil.mergeChunksSameFile(vector));
            newBuilder.setFilePath(combinePath2);
            MetricProtos.FileChunk build = newBuilder.build();
            for (int i2 = computeMergeChunkBurst.begin; i2 < computeMergeChunkBurst.end; i2++) {
                this.s3Chunks.remove(computeMergeChunkBurst.begin);
            }
            this.s3Chunks.add(computeMergeChunkBurst.begin, build);
            MetricsUtil.deleteFile(this.s3ChunkIndexFileLocal);
            new RecordWriter(this.s3ChunkIndexFileLocal).appendAndFlush(this.s3Chunks);
            if (ChunkUtil.upload(this.session.s3Client(), this.s3ChunkIndexFileLocal, this.s3ChunkIndexFile)) {
                MetricsUtil.deleteFile(combinePath);
                Iterator it2 = vector2.iterator();
                while (it2.hasNext()) {
                    MetricsUtil.deleteFile((String) it2.next());
                }
                Iterator<MetricProtos.FileChunk> it3 = vector.iterator();
                while (it3.hasNext()) {
                    MetricProtos.FileChunk next2 = it3.next();
                    S3Path parse = S3Path.parse(next2.getFilePath());
                    if (!isActiveChunk(next2.getFilePath())) {
                        logger.info("delete {}", next2.getFilePath());
                        this.session.s3Client().deleteObject(new DeleteObjectRequest(parse.s3bucket, parse.s3key));
                    }
                }
            }
        }
        logger.info("after compaction:");
        showIndex();
        ChunkUtil.showS3Recursive(this.session.s3Client(), this.s3dir);
    }

    void cleanupOrphanedChunks() throws IOException {
        ObjectListing listObjects = this.session.s3Client().listObjects(this.s3bucket, this.s3dirkey + "/");
        HashMap hashMap = new HashMap();
        for (S3ObjectSummary s3ObjectSummary : listObjects.getObjectSummaries()) {
            String path = S3Path.getPath(this.s3bucket, s3ObjectSummary.getKey());
            boolean isActiveChunk = isActiveChunk(path);
            logger.info("{} {}", path, isActiveChunk ? "OK" : "Orphaned");
            if (isActiveChunk) {
                hashMap.put(path, 1);
            } else {
                logger.info("delete {}", path);
                this.session.s3Client().deleteObject(new DeleteObjectRequest(this.s3bucket, s3ObjectSummary.getKey()));
            }
        }
        Iterator<MetricProtos.FileChunk> it = this.s3Chunks.iterator();
        while (it.hasNext()) {
            MetricProtos.FileChunk next = it.next();
            if (!hashMap.containsKey(next.getFilePath())) {
                logger.info("expected chunk {} does not exist", next.getFilePath());
            }
        }
        logger.info("Leave cleanupOrphanedChunks");
        ChunkUtil.showS3Recursive(this.session.s3Client(), this.s3dir);
    }

    boolean isActiveChunk(String str) {
        Iterator<MetricProtos.FileChunk> it = this.s3Chunks.iterator();
        while (it.hasNext()) {
            if (it.next().getFilePath().equals(str)) {
                return true;
            }
        }
        return this.s3Chunks.size() > 0 && str.equals(this.s3ChunkIndexFile);
    }

    void showIndex() {
        ChunkUtil.showChunks(this.s3Chunks);
    }

    boolean mergeable(Vector<MetricProtos.FileChunk> vector) {
        if (vector.size() <= 1) {
            return false;
        }
        long end = vector.firstElement().getEnd();
        for (int i = 1; i < vector.size(); i++) {
            MetricProtos.FileChunk fileChunk = vector.get(i);
            if (fileChunk.getBegin() != end) {
                return false;
            }
            end = fileChunk.getEnd();
        }
        return true;
    }

    static long getMinTime(Vector<MetricProtos.FileChunk> vector) {
        long j = 0;
        Iterator<MetricProtos.FileChunk> it = vector.iterator();
        while (it.hasNext()) {
            MetricProtos.FileChunk next = it.next();
            if (j == 0 || j > next.getMinTime()) {
                j = next.getMinTime();
            }
        }
        return j;
    }

    static long getMaxTime(Vector<MetricProtos.FileChunk> vector) {
        long j = 0;
        Iterator<MetricProtos.FileChunk> it = vector.iterator();
        while (it.hasNext()) {
            MetricProtos.FileChunk next = it.next();
            if (j == 0 || j < next.getMinTime()) {
                j = next.getMaxTime();
            }
        }
        return j;
    }

    static long getLength(Vector<MetricProtos.FileChunk> vector) {
        long j = 0;
        Iterator<MetricProtos.FileChunk> it = vector.iterator();
        while (it.hasNext()) {
            MetricProtos.FileChunk next = it.next();
            j += next.getEnd() - next.getBegin();
        }
        return j;
    }
}
