package amazon.emr.metrics;

import amazon.emr.MetricProtos;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Vector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:amazon/emr/metrics/ChunkDetector.class */
public class ChunkDetector {
    static final Logger logger = LoggerFactory.getLogger(ChunkDetector.class);
    private String inputDir;
    private String chunkHistoryFile;
    private FileOutputStream fileOutput = null;
    private FSDataOutputStream hdfsOutput = null;
    private HashMap<String, MetricProtos.FileChunk> snapshot = new HashMap<>();

    public ChunkDetector(MetricsConfig metricsConfig) throws Exception {
        this.inputDir = metricsConfig.rawDir;
        this.chunkHistoryFile = metricsConfig.rawChunkIndexFile;
        LoadChunkMap(this.chunkHistoryFile);
    }

    public Vector<MetricProtos.FileChunk> DetectNewChunks() throws Exception {
        Vector<MetricProtos.FileChunk> DetectNewChunksHDFS = MetricsUtil.getFileType(this.inputDir) == FileType.HDFS ? DetectNewChunksHDFS() : DetectNewChunksLocal();
        ShowChunks(DetectNewChunksHDFS);
        return DetectNewChunksHDFS;
    }

    public void commitChunks(Vector<MetricProtos.FileChunk> vector) throws Exception {
        if (vector.size() == 0) {
            return;
        }
        logger.info("Commit {} mature chunks age {}ms", Integer.valueOf(vector.size()), Long.valueOf(MetricsUtil.getAgeMs(vector.firstElement().getTime())));
        OpenOutputStream();
        Iterator<MetricProtos.FileChunk> it = vector.iterator();
        while (it.hasNext()) {
            MetricProtos.FileChunk next = it.next();
            if (this.hdfsOutput != null) {
                next.writeDelimitedTo(this.hdfsOutput);
            } else {
                next.writeDelimitedTo(this.fileOutput);
            }
        }
        CloseOutputStream();
    }

    void LoadChunkMap(String str) throws IOException {
        logger.info("LoadChunkMap " + str);
        if (!MetricsUtil.fileExists(str)) {
            return;
        }
        FileType fileType = MetricsUtil.getFileType(str);
        FSDataInputStream fSDataInputStream = null;
        FSDataInputStream fSDataInputStream2 = null;
        if (fileType == FileType.HDFS) {
            fSDataInputStream2 = FileSystem.get(new Configuration()).open(new Path(str));
        } else {
            fSDataInputStream = new FileInputStream(str);
        }
        while (true) {
            MetricProtos.FileChunk parseDelimitedFrom = MetricProtos.FileChunk.parseDelimitedFrom(fileType == FileType.HDFS ? fSDataInputStream2 : fSDataInputStream);
            if (parseDelimitedFrom == null) {
                return;
            } else {
                AddChunkToMap(parseDelimitedFrom);
            }
        }
    }

    void AddChunkToMap(MetricProtos.FileChunk fileChunk) {
        if (this.snapshot.containsKey(fileChunk.getFilePath())) {
            this.snapshot.remove(fileChunk.getFilePath());
        }
        this.snapshot.put(fileChunk.getFilePath(), fileChunk);
    }

    MetricProtos.FileChunk CreateFileChunk(String str, long j, long j2) {
        MetricProtos.FileChunk.Builder newBuilder = MetricProtos.FileChunk.newBuilder();
        newBuilder.setFilePath(str);
        newBuilder.setBegin(j);
        newBuilder.setEnd(j2);
        newBuilder.setTime(System.currentTimeMillis());
        return newBuilder.build();
    }

    Vector<MetricProtos.FileChunk> DetectNewChunksLocal() throws Exception {
        File[] listFiles = new File(this.inputDir).listFiles();
        Vector<MetricProtos.FileChunk> vector = new Vector<>();
        for (File file : listFiles) {
            if (file.isFile()) {
                MetricProtos.FileChunk fileChunk = this.snapshot.get(file.getPath().toString());
                if (fileChunk == null || fileChunk.getEnd() < file.length()) {
                    MetricProtos.FileChunk CreateFileChunk = CreateFileChunk(file.getAbsolutePath(), fileChunk == null ? 0L : fileChunk.getEnd(), file.length());
                    AddChunkToMap(CreateFileChunk);
                    vector.add(CreateFileChunk);
                }
            }
        }
        return vector;
    }

    Vector<MetricProtos.FileChunk> DetectNewChunksHDFS() throws Exception {
        String pathName;
        MetricProtos.FileChunk fileChunk;
        FileStatus[] listStatus = FileSystem.get(new Configuration()).listStatus(new Path(this.inputDir));
        Vector<MetricProtos.FileChunk> vector = new Vector<>();
        for (FileStatus fileStatus : listStatus) {
            if (!fileStatus.isDir() && ((fileChunk = this.snapshot.get((pathName = MetricsUtil.getPathName(fileStatus)))) == null || fileChunk.getEnd() < fileStatus.getLen())) {
                MetricProtos.FileChunk CreateFileChunk = CreateFileChunk(pathName, fileChunk == null ? 0L : fileChunk.getEnd(), fileStatus.getLen());
                AddChunkToMap(CreateFileChunk);
                vector.add(CreateFileChunk);
            }
        }
        return vector;
    }

    void ShowChunks(Vector<MetricProtos.FileChunk> vector) {
        if (vector.size() > 0) {
            logger.info("Detected {} new chunks (total {}):", Integer.valueOf(vector.size()), Integer.valueOf(this.snapshot.size()));
            Iterator<MetricProtos.FileChunk> it = vector.iterator();
            while (it.hasNext()) {
                MetricProtos.FileChunk next = it.next();
                logger.info(next.getFilePath() + " [{}, {})", Long.valueOf(next.getBegin()), Long.valueOf(next.getEnd()));
            }
        }
    }

    void OpenOutputStream() throws Exception {
        if (MetricsUtil.getFileType(this.inputDir) != FileType.HDFS) {
            this.fileOutput = new FileOutputStream(this.chunkHistoryFile, true);
            return;
        }
        FileSystem fileSystem = FileSystem.get(new Configuration());
        Path path = new Path(this.chunkHistoryFile);
        this.hdfsOutput = fileSystem.exists(path) ? fileSystem.append(path) : fileSystem.create(path);
    }

    void CloseOutputStream() throws Exception {
        if (this.hdfsOutput != null) {
            this.hdfsOutput.close();
            this.hdfsOutput = null;
        } else if (this.fileOutput != null) {
            this.fileOutput.close();
            this.fileOutput = null;
        }
    }
}
