/*
 * Decompiled with CFR 0.152.
 */
package amazon.emr.metrics;

import amazon.emr.MetricProtos;
import amazon.emr.metrics.FileType;
import amazon.emr.metrics.IntervalAggregator;
import amazon.emr.metrics.MetricRecordReader;
import amazon.emr.metrics.MetricsUtil;
import com.google.protobuf.GeneratedMessage;
import com.ibm.icu.text.DecimalFormat;
import java.io.File;
import java.io.FileInputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Vector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MetricsReader {
    static final Logger logger = LoggerFactory.getLogger(MetricsReader.class);

    public static Vector<MetricProtos.EmrMetricRecord> readFileChunk(MetricProtos.FileChunk chunk) throws Exception {
        Vector<MetricProtos.EmrMetricRecord> records = null;
        records = MetricsUtil.getFileType(chunk.getFilePath()) == FileType.HDFS ? MetricsReader.readFileChunkHDFS(chunk) : MetricsReader.readFileChunkLocal(chunk);
        logger.info(String.format("Read %4d records from %s [%d,%d)", records.size(), chunk.getFilePath(), chunk.getBegin(), chunk.getEnd()));
        return records;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static Vector<MetricProtos.EmrMetricRecord> readFileChunkHDFS(MetricProtos.FileChunk chunk) throws Exception {
        try (FSDataInputStream hdfsInput = null;){
            MetricProtos.EmrMetricRecord r;
            Configuration configuration = new Configuration();
            FileSystem hdfs = FileSystem.get((Configuration)configuration);
            Path path = new Path(chunk.getFilePath());
            hdfsInput = hdfs.open(path);
            hdfsInput.seek(chunk.getBegin());
            Vector<MetricProtos.EmrMetricRecord> records = new Vector<MetricProtos.EmrMetricRecord>();
            while (hdfsInput.getPos() < chunk.getEnd() && (r = MetricProtos.EmrMetricRecord.parseDelimitedFrom((InputStream)hdfsInput)) != null) {
                records.add(r);
            }
            Vector<MetricProtos.EmrMetricRecord> vector = records;
            return vector;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static Vector<MetricProtos.EmrMetricRecord> readFileChunkLocal(MetricProtos.FileChunk chunk) throws Exception {
        FileInputStream fileInput = null;
        FilterInputStream ps = null;
        try {
            MetricProtos.EmrMetricRecord r;
            fileInput = new FileInputStream(chunk.getFilePath());
            ps = new PositionInputStream(fileInput);
            ((PositionInputStream)ps).skip(chunk.getBegin());
            Vector<MetricProtos.EmrMetricRecord> records = new Vector<MetricProtos.EmrMetricRecord>();
            while (((PositionInputStream)ps).getPosition() < chunk.getEnd() && (r = MetricProtos.EmrMetricRecord.parseDelimitedFrom(ps)) != null) {
                records.add(r);
            }
            Vector<MetricProtos.EmrMetricRecord> vector = records;
            return vector;
        }
        finally {
            if (ps != null) {
                ps.close();
            }
            if (fileInput != null) {
                fileInput.close();
            }
        }
    }

    public static Vector<MetricProtos.FileChunk> readChunkIndex(String filename) throws IOException {
        return MetricsReader.readChunkIndex(filename, 0L, 0L);
    }

    public static Vector<MetricProtos.FileChunk> readChunkIndex(String filePath, long beginOffset, long endOffset) throws IOException {
        Vector<MetricProtos.FileChunk> records = null;
        records = MetricsUtil.getFileType(filePath) == FileType.HDFS ? MetricsReader.readChunkIndexHDFS(filePath, beginOffset, endOffset) : MetricsReader.readChunkIndexLocal(filePath, beginOffset, endOffset);
        logger.info(String.format("Read %4d records from %s [%d,%d)", records.size(), filePath, beginOffset, endOffset));
        return records;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static Vector<MetricProtos.FileChunk> readChunkIndexHDFS(String filePath, long beginOffset, long endOffset) throws IOException {
        try (FSDataInputStream hdfsInput = null;){
            MetricProtos.FileChunk r;
            Configuration configuration = new Configuration();
            FileSystem hdfs = FileSystem.get((Configuration)configuration);
            Path path = new Path(filePath);
            hdfsInput = hdfs.open(path);
            if (beginOffset > 0L) {
                hdfsInput.seek(beginOffset);
            }
            Vector<MetricProtos.FileChunk> records = new Vector<MetricProtos.FileChunk>();
            while ((endOffset <= 0L || hdfsInput.getPos() < endOffset) && (r = MetricProtos.FileChunk.parseDelimitedFrom((InputStream)hdfsInput)) != null) {
                records.add(r);
            }
            Vector<MetricProtos.FileChunk> vector = records;
            return vector;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static Vector<MetricProtos.FileChunk> readChunkIndexLocal(String filePath, long beginOffset, long endOffset) throws IOException {
        FileInputStream fileInput = null;
        FilterInputStream ps = null;
        try {
            MetricProtos.FileChunk r;
            fileInput = new FileInputStream(filePath);
            ps = new PositionInputStream(fileInput);
            if (beginOffset > 0L) {
                ((PositionInputStream)ps).skip(beginOffset);
            }
            Vector<MetricProtos.FileChunk> records = new Vector<MetricProtos.FileChunk>();
            while ((endOffset <= 0L || ((PositionInputStream)ps).getPosition() < endOffset) && (r = MetricProtos.FileChunk.parseDelimitedFrom(ps)) != null) {
                records.add(r);
            }
            Vector<MetricProtos.FileChunk> vector = records;
            return vector;
        }
        finally {
            if (ps != null) {
                ps.close();
            }
            if (fileInput != null) {
                fileInput.close();
            }
        }
    }

    public static InputStream getInputStream(String filename) throws IOException {
        if (MetricsUtil.getFileType(filename) == FileType.HDFS) {
            Configuration config = new Configuration();
            FileSystem hdfs = FileSystem.get((Configuration)config);
            Path path = new Path(filename);
            return hdfs.open(path);
        }
        return new FileInputStream(filename);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static Vector<MetricProtos.EngineState> readEngineStateFile(String filename) throws Exception {
        Vector<MetricProtos.EngineState> records = new Vector<MetricProtos.EngineState>();
        if (!MetricsUtil.fileExists(filename)) {
            return records;
        }
        try (InputStream input = MetricsReader.getInputStream(filename);){
            MetricProtos.EngineState r;
            while ((r = MetricProtos.EngineState.parseDelimitedFrom(input)) != null) {
                records.add(r);
                if (records.size() <= 100) continue;
                records.remove(0);
            }
            logger.info("Read {} tail records from {}", (Object)records.size(), (Object)filename);
            Vector<MetricProtos.EngineState> vector = records;
            return vector;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static MetricProtos.S3UploadState readS3UploadStateFile(String filename) throws IOException {
        if (!MetricsUtil.fileExists(filename)) {
            return null;
        }
        MetricProtos.S3UploadState output = null;
        try (InputStream input = MetricsReader.getInputStream(filename);){
            MetricProtos.S3UploadState r;
            while ((r = MetricProtos.S3UploadState.parseDelimitedFrom(input)) != null) {
                output = r;
            }
            MetricProtos.S3UploadState s3UploadState = output;
            return s3UploadState;
        }
    }

    static MetricFileType getFileTypeFromName(String name) {
        if (name.contains("chunk_index")) {
            return MetricFileType.ChunkIndexFile;
        }
        if (name.contains("engine_state")) {
            return MetricFileType.EngineStateFile;
        }
        if (name.contains("aggregated")) {
            return MetricFileType.MetricRecordFile;
        }
        if (name.contains("_raw")) {
            return MetricFileType.MetricRecordFile;
        }
        if (name.contains("chunk_")) {
            return MetricFileType.MetricRecordFile;
        }
        if (name.contains("output_")) {
            return MetricFileType.MetricRecordFile;
        }
        if (name.contains("upload_state")) {
            return MetricFileType.S3UploadStateFile;
        }
        return MetricFileType.Unknown;
    }

    static void readMetricsFile(String file, String filter) throws Exception {
        MetricFileType type = MetricsReader.getFileTypeFromName(file);
        DecimalFormat df = new DecimalFormat("0000.00");
        long lastTime = 0L;
        if (type == MetricFileType.ChunkIndexFile) {
            Vector<MetricProtos.FileChunk> chunks = MetricsReader.readChunkIndex(file);
            if (chunks.size() == 0) {
                return;
            }
            for (int i = 0; i < chunks.size(); ++i) {
                if (lastTime == 0L) {
                    lastTime = chunks.firstElement().getTime();
                }
                MetricProtos.FileChunk r = chunks.get(i);
                if (filter != null && !r.getFilePath().contains(filter)) continue;
                System.out.format("%4d %13d %s [%d,%d) %d\n", i, r.getMinTime(), r.getFilePath(), r.getBegin(), r.getEnd(), r.getTime() - lastTime);
                lastTime = r.getTime();
            }
        } else if (type == MetricFileType.S3UploadStateFile) {
            MetricProtos.S3UploadState record = MetricsReader.readS3UploadStateFile(file);
            if (record == null) {
                return;
            }
            System.out.format("time:%s rawOffset:%lld aggOffset:%lld %s\n", MetricsUtil.getTimeStr(record.getTime()), record.getRawOffset(), record.getAggOffset());
        } else if (type == MetricFileType.EngineStateFile) {
            Vector<MetricProtos.EngineState> records = MetricsReader.readEngineStateFile(file);
            if (records.size() == 0) {
                return;
            }
            MetricProtos.EngineState r = records.get(records.size() - 1);
            logger.info("engine_state time {} curosr: {}", (Object)MetricsUtil.getTimeStr(r.getTime()), (Object)MetricsUtil.getTimeStr(r.getCursor()));
            for (MetricProtos.ProcessorState s : r.getProcessorsList()) {
                System.out.format("  %14d %14d %s %s\n", s.getInputCursor(), s.getOutputCursor(), MetricsUtil.getTimeStr(s.getOutputCursor()), MetricsUtil.getStreamId(s.getKey()));
            }
        } else if (type == MetricFileType.MetricRecordFile) {
            MetricProtos.EmrMetricRecord r;
            MetricRecordReader reader = new MetricRecordReader(file);
            Vector<MetricProtos.EmrMetricRecord> records = new Vector<MetricProtos.EmrMetricRecord>();
            int index = -1;
            while ((r = reader.read()) != null) {
                StringBuilder sb;
                GeneratedMessage v;
                int j;
                String id = MetricsUtil.getStreamId(r.getKey());
                if (filter != null && !id.contains(filter)) continue;
                records.add(r);
                ++index;
                for (j = 0; j < r.getValuesExCount(); ++j) {
                    v = r.getValuesEx(j);
                    if (lastTime == 0L) {
                        lastTime = v.getStart();
                    }
                    sb = new StringBuilder(256);
                    sb.append(String.format("%4d:%-3d %d %s ", index, j, v.getStart(), id));
                    if (v.getCount() > 0) {
                        sb.append(String.format("ct:%3d avg:%s tp90:%s sum:%s ", v.getCount(), df.format(v.getAverage()), df.format(v.getTp90()), df.format(v.getSum())));
                    }
                    if (v.getErrorsCount() > 0) {
                        sb.append(String.format("er:%d ", v.getNumError()));
                        for (int k = 0; k < Math.min(v.getErrorsCount(), 1); ++k) {
                            MetricProtos.EmrMetricErrorItem e = v.getErrors(k);
                            sb.append(String.format("%s %d ", e.getError(), e.getCount()));
                        }
                    }
                    sb.append(String.format("%d\n", v.getStart() - lastTime));
                    System.out.print(sb.toString());
                    lastTime = v.getStart();
                }
                for (j = 0; j < r.getValuesCount(); ++j) {
                    v = r.getValues(j);
                    if (lastTime == 0L) {
                        lastTime = v.getTime();
                    }
                    sb = new StringBuilder(256);
                    sb.append(String.format("%4d:%-3d %d %s ", index, j, v.getTime(), id));
                    if (v.hasValue()) {
                        sb.append(String.format("v:%d ", v.getValue()));
                    }
                    if (v.hasError()) {
                        sb.append(String.format("e:%s ", v.getError()));
                    }
                    sb.append(String.format("%d\n", v.getTime() - lastTime));
                    System.out.print(sb.toString());
                    lastTime = v.getTime();
                }
            }
            reader.close();
            if (records.size() == 0) {
                return;
            }
            MetricProtos.EmrMetricAggregatedValue sv = IntervalAggregator.aggregateRecords(records);
            System.out.format("Total duration:%d avg:%s ct:%d t90:%s sum:%s er:%d\n", sv.getStop() - sv.getStart(), df.format(sv.getAverage()), sv.getCount(), df.format(sv.getTp90()), df.format(sv.getSum()), sv.getNumError());
        } else {
            logger.info("Could not determine file type");
        }
        long current = System.currentTimeMillis();
        System.out.format("current:%d %s last:%d %s\n", current, MetricsUtil.getTimeStr(current), lastTime, MetricsUtil.getTimeStr(lastTime));
    }

    public static void showDirFileRecursive(String dir) throws Exception {
        if (MetricsUtil.getFileType(dir) == FileType.HDFS) {
            Configuration configuration = new Configuration();
            FileSystem hdfs = FileSystem.get((Configuration)configuration);
            FileStatus me = hdfs.getFileStatus(new Path(dir));
            if (!me.isDir()) {
                System.out.format("%9d %s\n", me.getLen(), MetricsUtil.getPathName(me));
            } else {
                FileStatus[] children;
                for (FileStatus child : children = hdfs.listStatus(new Path(dir))) {
                    if (child.isDir()) {
                        MetricsReader.showDirFileRecursive(MetricsUtil.getPathName(child));
                        continue;
                    }
                    System.out.format("%9d %s\n", child.getLen(), MetricsUtil.getPathName(child));
                }
            }
        } else {
            File folder = new File(dir);
            if (!folder.isDirectory()) {
                System.out.format("%9d %s\n", folder.length(), folder.getPath());
            } else {
                File[] files = folder.listFiles();
                for (int i = 0; i < files.length; ++i) {
                    if (files[i].isDirectory()) {
                        MetricsReader.showDirFileRecursive(files[i].getPath());
                        continue;
                    }
                    System.out.format("%9d %s\n", files[i].length(), files[i].getPath());
                }
            }
        }
    }

    public static final class PositionInputStream
    extends FilterInputStream {
        private long pos = 0L;
        private long mark = 0L;

        public PositionInputStream(InputStream in) {
            super(in);
        }

        public synchronized long getPosition() {
            return this.pos;
        }

        @Override
        public synchronized int read() throws IOException {
            int b = super.read();
            if (b >= 0) {
                ++this.pos;
            }
            return b;
        }

        @Override
        public synchronized int read(byte[] b, int off, int len) throws IOException {
            int n = super.read(b, off, len);
            if (n > 0) {
                this.pos += (long)n;
            }
            return n;
        }

        @Override
        public synchronized long skip(long skip) throws IOException {
            long n = super.skip(skip);
            if (n > 0L) {
                this.pos += n;
            }
            return n;
        }

        @Override
        public synchronized void mark(int readlimit) {
            super.mark(readlimit);
            this.mark = this.pos;
        }

        @Override
        public synchronized void reset() throws IOException {
            if (!this.markSupported()) {
                throw new IOException("Mark not supported.");
            }
            super.reset();
            this.pos = this.mark;
        }
    }

    public static enum MetricFileType {
        Unknown,
        MetricRecordFile,
        ChunkIndexFile,
        EngineStateFile,
        S3UploadStateFile;

    }
}

