/*
 * Decompiled with CFR 0.152.
 */
package amazon.emr.metrics;

import amazon.emr.MetricProtos;
import amazon.emr.metrics.ChunkUtil;
import amazon.emr.metrics.MetricsReader;
import amazon.emr.metrics.MetricsUtil;
import amazon.emr.metrics.S3Path;
import amazon.emr.metrics.SessionS3Client;
import java.io.File;
import java.io.IOException;
import java.util.Vector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class S3ChunkInputStream {
    static final Logger logger = LoggerFactory.getLogger(S3ChunkInputStream.class);
    static final String s3inputRoot = "/mnt/var/lib/s3input";
    static final String s3ChunkPrefix = "chunk_";
    final String s3bucket;
    final String s3dir;
    final String s3dirkey;
    public final String localDir;
    final String s3ChunkIndexFile;
    final String s3ChunkIndexFileLocal;
    long streamLength;
    SessionS3Client session;
    Vector<MetricProtos.FileChunk> s3Chunks;
    long position;

    public S3ChunkInputStream(String s3dir, SessionS3Client session) throws IOException {
        S3Path p = S3Path.parse(s3dir);
        this.s3bucket = p.s3bucket;
        this.s3dirkey = p.s3key;
        this.s3dir = s3dir;
        this.session = session;
        this.s3ChunkIndexFile = S3Path.combine(s3dir, "chunk_index.bin");
        String s3bucketkey = MetricsUtil.combinePath(this.s3bucket, this.s3dirkey);
        this.localDir = MetricsUtil.combinePath(s3inputRoot, s3bucketkey);
        logger.info("Construct S3ChunkInputStream {}", (Object)s3dir);
        this.s3ChunkIndexFileLocal = MetricsUtil.combinePath(this.localDir, "chunk_index.bin");
        MetricsUtil.ensureDir(this.localDir);
        this.position = 0L;
        this.syncIndex();
        this.showIndex();
        logger.info(String.format("S3ChunkInputStream %s chunks:%d length:%d", s3dir, this.s3Chunks.size(), this.streamLength));
    }

    void seek(long position) {
        this.position = Math.min(this.streamLength, position);
    }

    long position() {
        return this.position;
    }

    long length() {
        return this.streamLength;
    }

    String getS3Path(String s3key) {
        return S3Path.getPath(this.s3bucket, s3key);
    }

    void syncIndex() throws IOException {
        ChunkUtil.download(this.session.s3Client(), this.s3ChunkIndexFile, this.s3ChunkIndexFileLocal);
        if (MetricsUtil.fileExists(this.s3ChunkIndexFileLocal)) {
            this.s3Chunks = MetricsReader.readChunkIndex(this.s3ChunkIndexFileLocal);
            logger.info("{} indexed chunks from {}", (Object)this.s3Chunks.size(), (Object)this.s3ChunkIndexFile);
        } else {
            this.s3Chunks = new Vector();
        }
        this.streamLength = this.s3Chunks.size() > 0 ? this.s3Chunks.lastElement().getEnd() : 0L;
        this.position = Math.min(this.streamLength, this.position);
    }

    boolean syncChunks() throws IOException, InterruptedException {
        Vector<MetricProtos.FileChunk> download = null;
        for (MetricProtos.FileChunk chunk : this.s3Chunks) {
            String localPath = this.getLocalPath(chunk);
            long localLength = MetricsUtil.getFileLength(localPath);
            if (MetricsUtil.fileExists(localPath) && localLength == chunk.getEnd() - chunk.getBegin()) continue;
            if (download == null) {
                download = new Vector<MetricProtos.FileChunk>();
            }
            download.add(chunk);
        }
        if (download == null) {
            return true;
        }
        Vector<ChunkUtil.DownloadChunkTask> tasks = ChunkUtil.downloadChunks(this.session.s3Client(), download, this.localDir);
        for (ChunkUtil.DownloadChunkTask task : tasks) {
            if (task.success) continue;
            return false;
        }
        this.deleteOrphanedFiles();
        return true;
    }

    String getLocalPath(MetricProtos.FileChunk s3chunk) {
        S3Path p = S3Path.parse(s3chunk.getFilePath());
        return MetricsUtil.combinePath(this.localDir, p.basename);
    }

    void deleteOrphanedFiles() throws IOException {
        File folder = new File(this.localDir);
        File[] files = folder.listFiles();
        for (int i = 0; i < files.length; ++i) {
            if (files[i].isDirectory() || files[i].getName().equals("chunk_index.bin")) continue;
            boolean indexedChunk = false;
            for (MetricProtos.FileChunk chunk : this.s3Chunks) {
                if (!chunk.getFilePath().contains(files[i].getName())) continue;
                indexedChunk = true;
                break;
            }
            if (indexedChunk) continue;
            MetricsUtil.deleteFile(files[i].getPath());
        }
    }

    public Vector<MetricProtos.FileChunk> read() throws IOException, InterruptedException {
        boolean ok;
        Vector<MetricProtos.FileChunk> output = new Vector<MetricProtos.FileChunk>();
        do {
            this.syncIndex();
            if (this.position < this.streamLength) continue;
            return output;
        } while (!(ok = this.syncChunks()));
        for (MetricProtos.FileChunk s3chunk : this.s3Chunks) {
            if (s3chunk.getEnd() <= this.position) continue;
            MetricProtos.FileChunk.Builder cb = MetricProtos.FileChunk.newBuilder(s3chunk);
            if (this.position >= s3chunk.getBegin()) {
                cb.setBegin(this.position - s3chunk.getBegin());
            } else {
                cb.setBegin(0L);
            }
            cb.setEnd(s3chunk.getEnd() - s3chunk.getBegin());
            cb.setFilePath(this.getLocalPath(s3chunk));
            output.add(cb.build());
        }
        ChunkUtil.showChunks(output);
        this.position = this.streamLength;
        return output;
    }

    public void showIndex() {
        ChunkUtil.showChunks(this.s3Chunks);
    }
}

