/*
 * Decompiled with CFR 0.152.
 */
package amazon.emr.metrics;

import amazon.emr.MetricProtos;
import amazon.emr.metrics.FileType;
import amazon.emr.metrics.MetricsConfig;
import amazon.emr.metrics.MetricsUtil;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Vector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ChunkDetector {
    static final Logger logger = LoggerFactory.getLogger(ChunkDetector.class);
    private HashMap<String, MetricProtos.FileChunk> snapshot;
    private String inputDir;
    private String chunkHistoryFile;
    private FileOutputStream fileOutput;
    private FSDataOutputStream hdfsOutput;

    public ChunkDetector(MetricsConfig config) throws Exception {
        this.inputDir = config.rawDir;
        this.chunkHistoryFile = config.rawChunkIndexFile;
        this.fileOutput = null;
        this.hdfsOutput = null;
        this.snapshot = new HashMap();
        this.LoadChunkMap(this.chunkHistoryFile);
    }

    public Vector<MetricProtos.FileChunk> DetectNewChunks() throws Exception {
        Vector<MetricProtos.FileChunk> chunks = null;
        chunks = MetricsUtil.getFileType(this.inputDir) == FileType.HDFS ? this.DetectNewChunksHDFS() : this.DetectNewChunksLocal();
        this.ShowChunks(chunks);
        return chunks;
    }

    public void commitChunks(Vector<MetricProtos.FileChunk> chunks) throws Exception {
        if (chunks.size() == 0) {
            return;
        }
        long age = MetricsUtil.getAgeMs(chunks.firstElement().getTime());
        logger.info("Commit {} mature chunks age {}ms", (Object)chunks.size(), (Object)age);
        this.OpenOutputStream();
        for (MetricProtos.FileChunk chunk : chunks) {
            if (this.hdfsOutput != null) {
                chunk.writeDelimitedTo((OutputStream)this.hdfsOutput);
                continue;
            }
            chunk.writeDelimitedTo(this.fileOutput);
        }
        this.CloseOutputStream();
    }

    void LoadChunkMap(String chunkFile) throws IOException {
        MetricProtos.FileChunk r;
        logger.info("LoadChunkMap " + chunkFile);
        if (!MetricsUtil.fileExists(chunkFile)) {
            return;
        }
        FileType type = MetricsUtil.getFileType(chunkFile);
        FileInputStream fileInput = null;
        FileInputStream hdfsInput = null;
        if (type == FileType.HDFS) {
            Configuration config = new Configuration();
            FileSystem hdfs = FileSystem.get((Configuration)config);
            hdfsInput = hdfs.open(new Path(chunkFile));
        } else {
            fileInput = new FileInputStream(chunkFile);
        }
        while ((r = MetricProtos.FileChunk.parseDelimitedFrom(type == FileType.HDFS ? hdfsInput : fileInput)) != null) {
            this.AddChunkToMap(r);
        }
    }

    void AddChunkToMap(MetricProtos.FileChunk r) {
        if (this.snapshot.containsKey(r.getFilePath())) {
            this.snapshot.remove(r.getFilePath());
        }
        this.snapshot.put(r.getFilePath(), r);
    }

    MetricProtos.FileChunk CreateFileChunk(String file, long begin, long end) {
        MetricProtos.FileChunk.Builder b = MetricProtos.FileChunk.newBuilder();
        b.setFilePath(file);
        b.setBegin(begin);
        b.setEnd(end);
        b.setTime(System.currentTimeMillis());
        return b.build();
    }

    Vector<MetricProtos.FileChunk> DetectNewChunksLocal() throws Exception {
        File dir = new File(this.inputDir);
        File[] files = dir.listFiles();
        Vector<MetricProtos.FileChunk> chunks = new Vector<MetricProtos.FileChunk>();
        for (File child : files) {
            String filepath;
            MetricProtos.FileChunk chunk;
            if (!child.isFile() || (chunk = this.snapshot.get(filepath = child.getPath().toString())) != null && chunk.getEnd() >= child.length()) continue;
            MetricProtos.FileChunk chunkNew = this.CreateFileChunk(child.getAbsolutePath(), chunk == null ? 0L : chunk.getEnd(), child.length());
            this.AddChunkToMap(chunkNew);
            chunks.add(chunkNew);
        }
        return chunks;
    }

    Vector<MetricProtos.FileChunk> DetectNewChunksHDFS() throws Exception {
        Configuration config = new Configuration();
        FileSystem hdfs = FileSystem.get((Configuration)config);
        FileStatus[] children = hdfs.listStatus(new Path(this.inputDir));
        Vector<MetricProtos.FileChunk> chunks = new Vector<MetricProtos.FileChunk>();
        for (FileStatus child : children) {
            String filepath;
            MetricProtos.FileChunk chunk;
            if (child.isDir() || (chunk = this.snapshot.get(filepath = MetricsUtil.getPathName(child))) != null && chunk.getEnd() >= child.getLen()) continue;
            MetricProtos.FileChunk chunkNew = this.CreateFileChunk(filepath, chunk == null ? 0L : chunk.getEnd(), child.getLen());
            this.AddChunkToMap(chunkNew);
            chunks.add(chunkNew);
        }
        return chunks;
    }

    void ShowChunks(Vector<MetricProtos.FileChunk> chunks) {
        if (chunks.size() > 0) {
            logger.info("Detected {} new chunks (total {}):", (Object)chunks.size(), (Object)this.snapshot.size());
            for (MetricProtos.FileChunk chunk : chunks) {
                logger.info(chunk.getFilePath() + " [{}, {})", (Object)chunk.getBegin(), (Object)chunk.getEnd());
            }
        }
    }

    void OpenOutputStream() throws Exception {
        if (MetricsUtil.getFileType(this.inputDir) == FileType.HDFS) {
            Path path;
            Configuration config = new Configuration();
            FileSystem hdfs = FileSystem.get((Configuration)config);
            this.hdfsOutput = hdfs.exists(path = new Path(this.chunkHistoryFile)) ? hdfs.append(path) : hdfs.create(path);
        } else {
            this.fileOutput = new FileOutputStream(this.chunkHistoryFile, true);
        }
    }

    void CloseOutputStream() throws Exception {
        if (this.hdfsOutput != null) {
            this.hdfsOutput.close();
            this.hdfsOutput = null;
        } else if (this.fileOutput != null) {
            this.fileOutput.close();
            this.fileOutput = null;
        }
    }
}

