/*
 * Decompiled with CFR 0.152.
 */
package amazon.emr.metrics;

import amazon.emr.MetricProtos;
import amazon.emr.metrics.ChunkUtil;
import amazon.emr.metrics.MetricsReader;
import amazon.emr.metrics.MetricsUtil;
import amazon.emr.metrics.RecordWriter;
import amazon.emr.metrics.S3Path;
import amazon.emr.metrics.SessionS3Client;
import com.amazonaws.services.s3.model.DeleteObjectRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import java.io.IOException;
import java.util.HashMap;
import java.util.Vector;
import junit.framework.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class S3ChunkOutputStream {
    static final Logger logger = LoggerFactory.getLogger(S3ChunkOutputStream.class);
    static final long chunkSplitSize = 0x8000000L;
    final String s3bucket;
    final String s3dir;
    final String s3dirkey;
    final String localDir;
    final String s3ChunkIndexFile;
    final String s3ChunkIndexFileLocal;
    final String s3ChunkPrefix = "chunk_";
    static final String s3outputRoot = "/mnt/var/lib/s3output";
    long streamLength;
    SessionS3Client session;
    Vector<MetricProtos.FileChunk> s3Chunks;

    public S3ChunkOutputStream(String s3dir, SessionS3Client session) throws IOException {
        this.session = session;
        S3Path p = S3Path.parse(s3dir);
        this.s3bucket = p.s3bucket;
        this.s3dirkey = p.s3key;
        this.s3dir = s3dir;
        this.s3ChunkIndexFile = S3Path.combine(s3dir, "chunk_index.bin");
        String s3bucketkey = MetricsUtil.combinePath(this.s3bucket, this.s3dirkey);
        this.localDir = MetricsUtil.combinePath(s3outputRoot, s3bucketkey);
        this.s3ChunkIndexFileLocal = MetricsUtil.combinePath(this.localDir, "chunk_index.bin");
        MetricsUtil.recursivelyDeleteFolder(this.localDir);
        MetricsUtil.deleteFile(this.s3ChunkIndexFileLocal);
        logger.info("Construct S3ChunkOutputStream localDir {}", (Object)this.localDir);
        MetricsUtil.ensureDir(this.localDir);
        this.s3Chunks = new Vector();
        ChunkUtil.download(session.s3Client(), this.s3ChunkIndexFile, this.s3ChunkIndexFileLocal);
        if (MetricsUtil.fileExists(this.s3ChunkIndexFileLocal)) {
            this.s3Chunks = MetricsReader.readChunkIndex(this.s3ChunkIndexFileLocal);
            logger.info("{} indexed chunks from {}", (Object)this.s3Chunks.size(), (Object)this.s3ChunkIndexFile);
        } else {
            this.s3Chunks = new Vector();
        }
        this.streamLength = this.s3Chunks.size() > 0 ? this.s3Chunks.lastElement().getEnd() : 0L;
        this.showIndex();
        this.cleanupOrphanedChunks();
        logger.info(String.format("S3ChunkOutputStream %s chunks:%d length:%d", s3dir, this.s3Chunks.size(), this.streamLength));
    }

    long length() {
        return this.streamLength;
    }

    public static void delete(String s3dir, SessionS3Client session) throws IOException {
        S3Path p = S3Path.parse(s3dir);
        logger.info("delete chunk stream {}", (Object)s3dir);
        ObjectListing objectListing = session.s3Client().listObjects(p.s3bucket, p.s3key + "/");
        for (S3ObjectSummary obj : objectListing.getObjectSummaries()) {
            logger.info("delete {}", (Object)S3Path.getPath(p.s3bucket, obj.getKey()));
            session.s3Client().deleteObject(new DeleteObjectRequest(p.s3bucket, obj.getKey()));
        }
    }

    public void delete() throws IOException {
        if (this.streamLength == 0L && this.s3Chunks.size() == 0) {
            return;
        }
        logger.info("delete {}", (Object)this.s3dir);
        this.s3Chunks.clear();
        this.streamLength = 0L;
        this.cleanupOrphanedChunks();
        MetricsUtil.deleteFile(this.s3ChunkIndexFileLocal);
    }

    public boolean writeMergable(Vector<MetricProtos.FileChunk> chunks) throws IOException, InterruptedException {
        return this.write(ChunkUtil.mergeChunks(chunks));
    }

    public boolean write(Vector<MetricProtos.FileChunk> chunks) throws IOException, InterruptedException {
        if (chunks.size() == 0) {
            return true;
        }
        long totalLength = 0L;
        for (MetricProtos.FileChunk chunk : chunks) {
            totalLength += chunk.getEnd() - chunk.getBegin();
        }
        if (totalLength < 0x8000000L || chunks.size() < 2) {
            return this.writeInternal(chunks);
        }
        logger.info("Split {} input chunks into 128MB splits", (Object)chunks.size());
        Vector<MetricProtos.FileChunk> split = new Vector<MetricProtos.FileChunk>();
        long length = 0L;
        for (MetricProtos.FileChunk chunk : chunks) {
            split.add(chunk);
            if ((length += chunk.getEnd() - chunk.getBegin()) < 0x8000000L) continue;
            boolean ok = this.writeInternal(split);
            if (!ok) {
                return false;
            }
            split.clear();
            length = 0L;
        }
        return true;
    }

    private boolean writeInternal(Vector<MetricProtos.FileChunk> chunks) throws IOException, InterruptedException {
        if (chunks.size() == 0) {
            return true;
        }
        boolean ok = true;
        long maxTime = S3ChunkOutputStream.getMaxTime(chunks);
        String baseName = "chunk_" + maxTime + ".bin";
        String tmpFile = MetricsUtil.combinePath(this.localDir, baseName);
        String dstS3Path = MetricsUtil.combinePath(this.s3dir, baseName);
        ChunkUtil.stitchFileChunks(chunks, tmpFile);
        ok = ChunkUtil.upload(this.session.s3Client(), tmpFile, dstS3Path);
        if (ok) {
            MetricProtos.FileChunk.Builder cb = MetricProtos.FileChunk.newBuilder();
            cb.setMinTime(S3ChunkOutputStream.getMinTime(chunks));
            cb.setMaxTime(maxTime);
            cb.setFilePath(dstS3Path);
            long len = MetricsUtil.getFileLength(tmpFile);
            Assert.assertEquals((long)S3ChunkOutputStream.getLength(chunks), (long)len);
            cb.setBegin(this.streamLength);
            cb.setEnd(this.streamLength + len);
            cb.setTime(System.currentTimeMillis());
            MetricProtos.FileChunk newChunk = cb.build();
            new RecordWriter<MetricProtos.FileChunk>(this.s3ChunkIndexFileLocal).appendAndFlush(newChunk);
            ok = ChunkUtil.upload(this.session.s3Client(), this.s3ChunkIndexFileLocal, this.s3ChunkIndexFile);
            this.s3Chunks.add(newChunk);
            this.streamLength += len;
        } else {
            logger.info("Failed to upload {} input chunks", (Object)chunks.size());
        }
        MetricsUtil.deleteFile(tmpFile);
        this.compact();
        return ok;
    }

    void compact() throws IOException, InterruptedException {
        if (this.s3Chunks.size() < 16) {
            return;
        }
        logger.info("Enter compact {} chunks", (Object)this.s3Chunks.size());
        ChunkBurst comp = ChunkBurst.computeMergeChunkBurst(this.s3Chunks);
        if (comp == null) {
            return;
        }
        Vector<MetricProtos.FileChunk> s3chunksToMerge = new Vector<MetricProtos.FileChunk>();
        long maxTime = 0L;
        for (int i = comp.begin; i < comp.end; ++i) {
            MetricProtos.FileChunk c = this.s3Chunks.get(i);
            logger.info("  {} {}", (Object)c.getFilePath(), (Object)c.getEnd());
            s3chunksToMerge.add(c);
            maxTime = Math.max(maxTime, c.getMaxTime());
        }
        logger.info("compact {} chunks total size {}", (Object)comp.count, (Object)comp.length);
        Vector<ChunkUtil.DownloadChunkTask> tasks = ChunkUtil.downloadChunks(this.session.s3Client(), s3chunksToMerge, this.localDir);
        Vector<String> filesToMerge = new Vector<String>();
        for (ChunkUtil.DownloadChunkTask t : tasks) {
            if (!t.success) {
                return;
            }
            filesToMerge.add(t.localFile);
        }
        logger.info("{} local files", (Object)filesToMerge.size());
        Assert.assertTrue((boolean)this.mergeable(s3chunksToMerge));
        String stitchedFile = MetricsUtil.combinePath(this.localDir, "stitch_" + maxTime + ".bin");
        ChunkUtil.stitchFiles(filesToMerge, stitchedFile);
        String baseName = "chunk_" + maxTime + ".bin";
        String s3path = MetricsUtil.combinePath(this.s3dir, baseName);
        boolean ok = ChunkUtil.upload(this.session.s3Client(), stitchedFile, s3path);
        if (ok) {
            MetricProtos.FileChunk.Builder cb = MetricProtos.FileChunk.newBuilder(ChunkUtil.mergeChunksSameFile(s3chunksToMerge));
            cb.setFilePath(s3path);
            MetricProtos.FileChunk newChunk = cb.build();
            for (int i = comp.begin; i < comp.end; ++i) {
                this.s3Chunks.remove(comp.begin);
            }
            this.s3Chunks.add(comp.begin, newChunk);
            MetricsUtil.deleteFile(this.s3ChunkIndexFileLocal);
            new RecordWriter<MetricProtos.FileChunk>(this.s3ChunkIndexFileLocal).appendAndFlush(this.s3Chunks);
            ok = ChunkUtil.upload(this.session.s3Client(), this.s3ChunkIndexFileLocal, this.s3ChunkIndexFile);
            if (ok) {
                MetricsUtil.deleteFile(stitchedFile);
                for (String f : filesToMerge) {
                    MetricsUtil.deleteFile(f);
                }
                for (MetricProtos.FileChunk c : s3chunksToMerge) {
                    S3Path p = S3Path.parse(c.getFilePath());
                    if (this.isActiveChunk(c.getFilePath())) continue;
                    logger.info("delete {}", (Object)c.getFilePath());
                    this.session.s3Client().deleteObject(new DeleteObjectRequest(p.s3bucket, p.s3key));
                }
            }
        }
        logger.info("after compaction:");
        this.showIndex();
        ChunkUtil.showS3Recursive(this.session.s3Client(), this.s3dir);
    }

    void cleanupOrphanedChunks() throws IOException {
        ObjectListing objectListing = this.session.s3Client().listObjects(this.s3bucket, this.s3dirkey + "/");
        HashMap<String, Integer> objmap = new HashMap<String, Integer>();
        for (S3ObjectSummary obj : objectListing.getObjectSummaries()) {
            String s3Path;
            boolean active = this.isActiveChunk(s3Path = S3Path.getPath(this.s3bucket, obj.getKey()));
            logger.info("{} {}", (Object)s3Path, (Object)(active ? "OK" : "Orphaned"));
            if (!active) {
                logger.info("delete {}", (Object)s3Path);
                this.session.s3Client().deleteObject(new DeleteObjectRequest(this.s3bucket, obj.getKey()));
                continue;
            }
            objmap.put(s3Path, 1);
        }
        for (MetricProtos.FileChunk chunk : this.s3Chunks) {
            if (objmap.containsKey(chunk.getFilePath())) continue;
            logger.info("expected chunk {} does not exist", (Object)chunk.getFilePath());
        }
        logger.info("Leave cleanupOrphanedChunks");
        ChunkUtil.showS3Recursive(this.session.s3Client(), this.s3dir);
    }

    boolean isActiveChunk(String s3Path) {
        for (MetricProtos.FileChunk v : this.s3Chunks) {
            if (!v.getFilePath().equals(s3Path)) continue;
            return true;
        }
        return this.s3Chunks.size() > 0 && s3Path.equals(this.s3ChunkIndexFile);
    }

    void showIndex() {
        ChunkUtil.showChunks(this.s3Chunks);
    }

    boolean mergeable(Vector<MetricProtos.FileChunk> chunks) {
        if (chunks.size() <= 1) {
            return false;
        }
        long lastEnd = chunks.firstElement().getEnd();
        for (int i = 1; i < chunks.size(); ++i) {
            MetricProtos.FileChunk chunk = chunks.get(i);
            if (chunk.getBegin() != lastEnd) {
                return false;
            }
            lastEnd = chunk.getEnd();
        }
        return true;
    }

    static long getMinTime(Vector<MetricProtos.FileChunk> chunks) {
        long minTime = 0L;
        for (MetricProtos.FileChunk c : chunks) {
            if (minTime != 0L && minTime <= c.getMinTime()) continue;
            minTime = c.getMinTime();
        }
        return minTime;
    }

    static long getMaxTime(Vector<MetricProtos.FileChunk> chunks) {
        long maxTime = 0L;
        for (MetricProtos.FileChunk c : chunks) {
            if (maxTime != 0L && maxTime >= c.getMinTime()) continue;
            maxTime = c.getMaxTime();
        }
        return maxTime;
    }

    static long getLength(Vector<MetricProtos.FileChunk> chunks) {
        long length = 0L;
        for (MetricProtos.FileChunk c : chunks) {
            length += c.getEnd() - c.getBegin();
        }
        return length;
    }

    static class ChunkBurst {
        public int begin = 0;
        public int end = 0;
        public int count = 0;
        long length = 0L;
        static final long matureSize = 0x2000000L;
        static final long maxSize = 0x4000000L;

        static ChunkBurst computeMergeChunkBurst(Vector<MetricProtos.FileChunk> chunks) {
            Vector<ChunkBurst> bursts = new Vector<ChunkBurst>();
            ChunkBurst burst = null;
            for (int i = 0; i < chunks.size(); ++i) {
                long chunkLen = chunks.get(i).getEnd() - chunks.get(i).getBegin();
                if (chunkLen >= 0x2000000L || burst != null && burst.length + chunkLen > 0x4000000L) {
                    if (burst == null) continue;
                    bursts.add(burst);
                    burst = null;
                    continue;
                }
                if (burst == null) {
                    burst = new ChunkBurst();
                    burst.begin = i;
                }
                burst.length += chunkLen;
                burst.end = i + 1;
                burst.count = burst.end - burst.begin;
            }
            if (burst != null) {
                bursts.add(burst);
            }
            for (ChunkBurst b : bursts) {
                if (b.length < 0x2000000L) continue;
                return b;
            }
            for (ChunkBurst b : bursts) {
                if (b.count < 8 || b.end >= chunks.size()) continue;
                return b;
            }
            if (((ChunkBurst)bursts.lastElement()).count >= 128) {
                return (ChunkBurst)bursts.lastElement();
            }
            return null;
        }
    }
}

