/*
 * Decompiled with CFR 0.152.
 */
package amazon.emr.metrics;

import amazon.emr.MetricProtos;
import amazon.emr.metrics.ChunkDetector;
import amazon.emr.metrics.ChunkUtil;
import amazon.emr.metrics.InstanceProcessor;
import amazon.emr.metrics.IntervalAggregator;
import amazon.emr.metrics.MetricRecordReader;
import amazon.emr.metrics.MetricsConfig;
import amazon.emr.metrics.MetricsProcessorBase;
import amazon.emr.metrics.MetricsReader;
import amazon.emr.metrics.MetricsUtil;
import amazon.emr.metrics.OutputChunkWriter;
import amazon.emr.metrics.RecordWriter;
import amazon.emr.metrics.S3ChunkOutputStream;
import amazon.emr.metrics.S3Path;
import amazon.emr.metrics.SessionS3Client;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class MetricsEngine
implements Runnable {
    static final Logger logger = LoggerFactory.getLogger(MetricsEngine.class);
    public final MetricsConfig config;
    HashMap<String, InstanceProcessor> instances;
    InstanceProcessor globalProcessor;
    ChunkDetector chunkDetector;
    ExecutorService executorService;
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, new WorkerThreadFactory("e"));
    private boolean runPending;
    private int cycle;
    MetricProtos.EngineState engineState;
    long engineCursor;
    int numFailedCycles;
    HashMap<String, MetricsProcessorBase> processors;
    OutputChunkWriter outputWriter;
    RecordWriter<MetricProtos.EngineState> engineStateWriter;
    ChunkStreamUploader uploader;
    private static final long MaxExgineStateFileLength = 0x4000000L;
    public static final int LateArrivalGracePeriod = 120000;
    private static final long EngineCycleSec = 20L;

    protected MetricsEngine(MetricsConfig config) throws Exception {
        this.config = config;
        MetricsUtil.ensureDir(config.rootDir);
        MetricsUtil.ensureDir(config.rawDir);
        this.instances = new HashMap();
        this.globalProcessor = this.ensureInstanceProcessor("global");
        this.chunkDetector = new ChunkDetector(config);
        this.processors = new HashMap();
        this.outputWriter = new OutputChunkWriter(config.aggDir, config.aggChunkIndexFile);
        this.engineStateWriter = new RecordWriter(config.engineStateFile);
        this.cycle = 0;
        this.runPending = false;
        this.numFailedCycles = 0;
        this.uploader = new ChunkStreamUploader();
        this.executorService = Executors.newFixedThreadPool(10, new WorkerThreadFactory("w"));
        this.restoreState();
        boolean readonly = MetricsConfig.disableEngineCycle;
        if (!readonly) {
            this.scheduler.scheduleAtFixedRate(this, 0L, 20L, TimeUnit.SECONDS);
        }
        logger.info("Started metrics engine{}", (Object)(readonly ? " in read-only mode" : ""));
    }

    @Override
    public void run() {
        block4: {
            ++this.cycle;
            logger.info("enter cycle {}", (Object)this.cycle);
            if (this.runPending) {
                logger.info("run already pending, skip cycle {}", (Object)this.cycle);
                return;
            }
            this.runPending = true;
            try {
                this.doCycleWork(this.cycle);
                this.numFailedCycles = 0;
                if (this.cycle % 10 == 0) {
                    this.uploader.uploadNoThrow();
                }
                System.gc();
                logger.info("completed cycle {}", (Object)this.cycle);
            }
            catch (Exception e) {
                ++this.numFailedCycles;
                logger.error("doCycleWork {} error {}", (Object)this.cycle, (Object)e);
                if (this.numFailedCycles <= 10) break block4;
                logger.error("Exit after {} consecutive failed engine cycles", (Object)this.numFailedCycles);
                System.exit(1);
            }
        }
        this.runPending = false;
    }

    long cursor() {
        return this.engineCursor;
    }

    public static long getOutputCacheDuration(int interval) {
        if (MetricsConfig.outputCacheLevel == MetricsConfig.CACHE_LEVEL.HIGH) {
            if (interval >= 300) {
                return 2592000000L;
            }
            if (interval >= 60) {
                return 172800000L;
            }
            if (interval >= 10) {
                return 36000000L;
            }
            return 600000L;
        }
        if (MetricsConfig.outputCacheLevel == MetricsConfig.CACHE_LEVEL.MEDIUM) {
            if (interval >= 300) {
                return 3600000L;
            }
            if (interval >= 60) {
                return 3600000L;
            }
            if (interval >= 10) {
                return 1800000L;
            }
            return 600000L;
        }
        if (MetricsConfig.outputCacheLevel == MetricsConfig.CACHE_LEVEL.LOW) {
            if (interval >= 300) {
                return 3600000L;
            }
            if (interval >= 60) {
                return 1800000L;
            }
            if (interval >= 10) {
                return 1200000L;
            }
            return 600000L;
        }
        return 600000L;
    }

    public static long getInputCacheDuration(int interval) {
        return interval <= 10 ? 500000L : 0L;
    }

    public void shutdown() {
        logger.info("shutdown metrics engine");
        this.executorService.shutdown();
        this.scheduler.shutdown();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void register(MetricsProcessorBase p) {
        HashMap<String, MetricsProcessorBase> hashMap = this.processors;
        synchronized (hashMap) {
            if (this.processors.containsKey(p.id)) {
                throw new RuntimeException("Duplicate processor " + p.id);
            }
            this.processors.put(p.id, p);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public MetricsProcessorBase getProcessor(MetricProtos.EmrMetricKey key) {
        String id = MetricsUtil.getStreamId(key);
        HashMap<String, MetricsProcessorBase> hashMap = this.processors;
        synchronized (hashMap) {
            return this.processors.get(id);
        }
    }

    public void doCycleWork(int cycle) throws Exception {
        Vector<MetricProtos.FileChunk> chunks = this.chunkDetector.DetectNewChunks();
        logger.info("Cycle {} detected {} new file chunks", (Object)cycle, (Object)chunks.size());
        Vector<RawChunkTask> tasks = new Vector<RawChunkTask>();
        for (MetricProtos.FileChunk chunk : chunks) {
            tasks.add(new RawChunkTask(chunk));
        }
        this.executorService.invokeAll(tasks);
        Vector<MetricProtos.FileChunk> chunksNew = new Vector<MetricProtos.FileChunk>();
        long minInputTime = 0L;
        long maxInputTime = 0L;
        for (RawChunkTask task : tasks) {
            MetricProtos.FileChunk.Builder b = MetricProtos.FileChunk.newBuilder();
            b.mergeFrom(task.chunk);
            b.setMinTime(task.minTime);
            b.setMaxTime(task.maxTime);
            chunksNew.add(b.build());
            if (minInputTime == 0L || minInputTime < task.minTime) {
                minInputTime = task.minTime;
            }
            if (maxInputTime != 0L && maxInputTime >= task.maxTime) continue;
            maxInputTime = task.maxTime;
        }
        this.chunkDetector.commitChunks(chunksNew);
        this.engineCursor = this.computeNewEngineCursor(maxInputTime);
        Vector<ProcessTask> tasks10 = new Vector<ProcessTask>();
        for (InstanceProcessor p : this.instances.values()) {
            tasks10.add(new ProcessTask(p, 10));
        }
        this.executorService.invokeAll(tasks10);
        Vector<MetricProtos.EmrMetricRecord> output10 = new Vector<MetricProtos.EmrMetricRecord>();
        for (ProcessTask t : tasks10) {
            output10.addAll(t.output);
        }
        logger.info("interval10  produced {} records", (Object)output10.size());
        for (MetricProtos.EmrMetricRecord r : output10) {
            InstanceProcessor p = this.ensureInstanceProcessor(r.getKey().getInstanceId());
            p.dispatchInput(r);
        }
        Vector<MetricProtos.EmrMetricRecord> output60 = new Vector<MetricProtos.EmrMetricRecord>();
        for (InstanceProcessor p : this.instances.values()) {
            p.process(this.engineCursor, 60, output60);
        }
        logger.info("interval60  produced {} records", (Object)output60.size());
        for (MetricProtos.EmrMetricRecord r : output60) {
            InstanceProcessor p = this.ensureInstanceProcessor(r.getKey().getInstanceId());
            p.dispatchInput(r);
        }
        Vector<MetricProtos.EmrMetricRecord> output300 = new Vector<MetricProtos.EmrMetricRecord>();
        for (InstanceProcessor p : this.instances.values()) {
            p.process(this.engineCursor, 300, output300);
        }
        logger.info("interval300 produced {} records", (Object)output300.size());
        Vector<MetricProtos.EmrMetricRecord> output = new Vector<MetricProtos.EmrMetricRecord>();
        output.addAll(output10);
        output.addAll(output60);
        output.addAll(output300);
        this.outputWriter.write(output);
        this.engineState = this.getState();
        MetricsUtil.backupFileAndRecreate(this.config.engineStateFile, 0x4000000L);
        this.engineStateWriter.appendAndFlush(this.engineState);
        if (cycle % 10 == 1) {
            MetricsEngine.showState(this.engineState, null);
        }
        if (MetricsConfig.traceids != null) {
            MetricsEngine.showState(this.engineState, MetricsConfig.traceids);
        }
    }

    private long computeNewEngineCursor(long maxInputTime) {
        if (maxInputTime == 0L) {
            return System.currentTimeMillis() - 120000L;
        }
        long time = maxInputTime > 0L && maxInputTime <= System.currentTimeMillis() ? maxInputTime : System.currentTimeMillis();
        return time - 120000L > this.engineCursor ? time - 120000L : this.engineCursor;
    }

    public static void showState(MetricProtos.EngineState es, List<String> ids) {
        logger.info("engine_state cursor {}", (Object)MetricsUtil.getTimeStr(es.getCursor()));
        for (MetricProtos.ProcessorState ps : es.getProcessorsList()) {
            if (ids != null && !ids.contains(MetricsUtil.getStreamId(ps.getKey()))) continue;
            System.out.format("  ic:%14d oc:%14d %s %s\n", ps.getInputCursor(), ps.getOutputCursor(), MetricsUtil.getTimeStr(ps.getOutputCursor()), MetricsUtil.getStreamId(ps.getKey()));
        }
    }

    void restoreState() throws Exception {
        this.engineState = this.loadLastStateFromFile();
        if (this.engineState == null) {
            logger.info("No engine state found, create from scratch");
            MetricProtos.EngineState.Builder sb = MetricProtos.EngineState.newBuilder();
            sb.setTime(System.currentTimeMillis());
            sb.setCursor(0L);
            this.engineState = sb.build();
            this.engineCursor = 0L;
            return;
        }
        this.engineCursor = this.engineState.getCursor();
        logger.info("Try to restore metrics engine to {} cursor:{}", (Object)MetricsUtil.getTimeStr(this.engineState.getTime()), (Object)this.engineState.getCursor());
        MetricsEngine.showState(this.engineState, null);
        this.restoreRawStreams();
        this.restoreAggregatedStreams();
        logger.info("Metrics engine restored to cursor {}", (Object)MetricsUtil.getTimeStr(this.engineState.getCursor()));
    }

    MetricProtos.EngineState loadLastStateFromFile() throws Exception {
        MetricProtos.EngineState r;
        InputStream input = this.openStateFile();
        if (input == null) {
            return null;
        }
        MetricProtos.EngineState s = null;
        while ((r = MetricProtos.EngineState.parseDelimitedFrom(input)) != null) {
            s = r;
        }
        return s;
    }

    public InputStream openStateFile() throws Exception {
        if (!MetricsUtil.fileExists(this.config.engineStateFile)) {
            logger.info("Engine state file does NOT exist {}", (Object)this.config.engineStateFile);
            return null;
        }
        if (this.config.hdfs) {
            Configuration configuration = new Configuration();
            FileSystem hdfs = FileSystem.get((Configuration)configuration);
            Path path = new Path(this.config.engineStateFile);
            return hdfs.open(path);
        }
        return new FileInputStream(this.config.engineStateFile);
    }

    MetricProtos.EngineState getState() {
        Vector<MetricProtos.ProcessorState> states = new Vector<MetricProtos.ProcessorState>();
        MetricProtos.EngineState.Builder b = MetricProtos.EngineState.newBuilder();
        b.setTime(System.currentTimeMillis());
        b.setCursor(this.engineCursor);
        for (InstanceProcessor p : this.instances.values()) {
            p.getProcessorStates(states);
        }
        for (MetricProtos.ProcessorState s : states) {
            if (this.stateIsDefault(s)) continue;
            b.addProcessors(s);
        }
        return b.build();
    }

    public MetricProtos.ProcessorState getProcessorState(MetricProtos.EmrMetricKey key) {
        for (MetricProtos.ProcessorState ps : this.engineState.getProcessorsList()) {
            if (!ps.getKey().equals(key)) continue;
            return ps;
        }
        MetricProtos.ProcessorState.Builder psb = MetricProtos.ProcessorState.newBuilder();
        psb.setKey(key);
        psb.setInputCursor(this.getDefaultInputCursor(key.getInterval()));
        psb.setOutputCursor(this.getDefaultOutputCursor(key.getInterval()));
        return psb.build();
    }

    long getDefaultInputCursor(int interval) {
        return IntervalAggregator.TimeInterval.getIntervalStart(this.engineCursor, interval);
    }

    long getDefaultOutputCursor(int interval) {
        return Math.max(0L, this.engineCursor - MetricsEngine.getOutputCacheDuration(interval));
    }

    boolean stateIsDefault(MetricProtos.ProcessorState s) {
        long expectedInput = this.getDefaultInputCursor(s.getKey().getInterval());
        if (s.getInputCursor() != expectedInput) {
            logger.info(String.format("non-default inputCursor %d %d %s", s.getInputCursor(), s.getInputCursor() - expectedInput, MetricsUtil.getStreamId(s.getKey())));
            return false;
        }
        long duration = MetricsEngine.getOutputCacheDuration(s.getKey().getInterval());
        long expectedOutput = this.getDefaultOutputCursor(s.getKey().getInterval());
        return !((double)s.getOutputCursor() < (double)expectedOutput - 0.5 * (double)duration);
    }

    public Set<String> getInstanceIds() {
        return this.instances.keySet();
    }

    public InstanceProcessor getInstanceProcessor(String instance) {
        return this.instances.get(instance);
    }

    public Vector<MetricProtos.EmrMetricRecord> getOutputRecords(MetricProtos.EmrMetricKey key) {
        MetricsProcessorBase p = this.getProcessor(key);
        if (p == null) {
            return new Vector<MetricProtos.EmrMetricRecord>();
        }
        return p.getOutputRecords();
    }

    public Vector<MetricProtos.EmrMetricRecord> getInputRecords(MetricProtos.EmrMetricKey key) {
        MetricsProcessorBase p = this.getProcessor(key);
        if (p == null) {
            return new Vector<MetricProtos.EmrMetricRecord>();
        }
        return p.getInputRecords();
    }

    void restoreRawStreams() throws Exception {
        if (!MetricsUtil.fileExists(this.config.rawChunkIndexFile)) {
            logger.info("No raw chunk index file: {}", (Object)this.config.rawChunkIndexFile);
            return;
        }
        Vector<MetricProtos.FileChunk> rawChunks = MetricsReader.readChunkIndex(this.config.rawChunkIndexFile);
        Vector<RawChunkTask> tasks = new Vector<RawChunkTask>();
        long minTime = this.getMinRawStreamCursor();
        logger.info("minRawInputTime {} {}", (Object)minTime, (Object)MetricsUtil.getTimeStr(minTime));
        Vector<MetricProtos.FileChunk> mergedChunks = ChunkUtil.mergeChunks(rawChunks, minTime);
        int numSkipped = 0;
        for (MetricProtos.FileChunk chunk : mergedChunks) {
            if (chunk.getMaxTime() != 0L && chunk.getMaxTime() < minTime) {
                ++numSkipped;
                continue;
            }
            tasks.add(new RawChunkTask(chunk));
        }
        logger.info("Skipped {} raw chunks before input cursor {}", (Object)numSkipped, (Object)minTime);
        this.executorService.invokeAll(tasks);
    }

    void restoreAggregatedStreams() throws Exception {
        if (!MetricsUtil.fileExists(this.config.aggChunkIndexFile)) {
            logger.info("No agg chunk index file: {}", (Object)this.config.aggChunkIndexFile);
            return;
        }
        Vector<MetricProtos.FileChunk> aggChunks = MetricsReader.readChunkIndex(this.config.aggChunkIndexFile);
        Vector<AggChunkTask> tasks = new Vector<AggChunkTask>();
        long minTime = this.getMinAggregatedStreamCursor();
        logger.info("minOutputCursor {} {}", (Object)minTime, (Object)MetricsUtil.getTimeStr(minTime));
        Vector<MetricProtos.FileChunk> mergedChunks = ChunkUtil.mergeChunks(aggChunks, minTime);
        for (MetricProtos.FileChunk chunk : mergedChunks) {
            if (chunk.getMaxTime() != 0L && chunk.getMaxTime() < minTime) continue;
            tasks.add(new AggChunkTask(chunk));
        }
        this.executorService.invokeAll(tasks);
        int totalRecord = 0;
        int numTaskSucceeded = 0;
        for (AggChunkTask task : tasks) {
            totalRecord += task.numRecords;
            if (!task.success) continue;
            ++numTaskSucceeded;
        }
        logger.info(String.format("restored %d aggregated records, %d/%d tasks succeeded", totalRecord, numTaskSucceeded, tasks.size()));
    }

    long getMinRawStreamCursor() {
        long minTime = this.engineState.getCursor();
        for (MetricProtos.ProcessorState s : this.engineState.getProcessorsList()) {
            if (s.getKey().getInterval() > 10) continue;
            minTime = Math.min(minTime, s.getInputCursor() - MetricsEngine.getInputCacheDuration(10));
        }
        return minTime;
    }

    long getMinAggregatedStreamCursor() {
        long minTime = this.engineState.getCursor() - MetricsEngine.getOutputCacheDuration(300);
        for (MetricProtos.ProcessorState s : this.engineState.getProcessorsList()) {
            if (s.getKey().getInterval() <= 10) continue;
            minTime = Math.min(minTime, s.getInputCursor() - MetricsEngine.getOutputCacheDuration(s.getKey().getInterval()));
        }
        return minTime;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    InstanceProcessor ensureInstanceProcessor(String instanceId) {
        MetricsEngine metricsEngine = this;
        synchronized (metricsEngine) {
            if (!this.instances.containsKey(instanceId)) {
                this.instances.put(instanceId, new InstanceProcessor(this, instanceId));
            }
            return this.instances.get(instanceId);
        }
    }

    class ChunkStreamUploader {
        SessionS3Client session = null;
        String s3root;
        String s3rawDir;
        String s3aggDir;
        S3ChunkOutputStream rawStream = null;
        S3ChunkOutputStream aggStream = null;

        ChunkStreamUploader() {
        }

        public void uploadNoThrow() {
            try {
                this.ensureSession();
                long rawIndexLength = MetricsUtil.getFileLength(MetricsEngine.this.config.rawChunkIndexFile);
                long aggIndexLength = MetricsUtil.getFileLength(MetricsEngine.this.config.aggChunkIndexFile);
                MetricProtos.S3UploadState state = MetricsReader.readS3UploadStateFile(MetricsEngine.this.config.s3UploadStateFile);
                if (state == null || rawIndexLength < state.getRawOffset() || aggIndexLength < state.getAggOffset()) {
                    state = this.initialize();
                }
                RecordWriter<MetricProtos.S3UploadState> swriter = new RecordWriter<MetricProtos.S3UploadState>(MetricsEngine.this.config.s3UploadStateFile);
                if (this.rawStream == null) {
                    this.rawStream = new S3ChunkOutputStream(this.s3rawDir, this.session);
                }
                if (this.aggStream == null) {
                    this.aggStream = new S3ChunkOutputStream(this.s3aggDir, this.session);
                }
                Assert.assertTrue((rawIndexLength >= state.getRawOffset() ? 1 : 0) != 0);
                Assert.assertTrue((aggIndexLength >= state.getAggOffset() ? 1 : 0) != 0);
                boolean okRaw = false;
                boolean okAgg = false;
                if (rawIndexLength > state.getRawOffset()) {
                    Vector<MetricProtos.FileChunk> rawChunks = MetricsReader.readChunkIndex(MetricsEngine.this.config.rawChunkIndexFile, state.getRawOffset(), rawIndexLength);
                    okRaw = this.rawStream.write(rawChunks);
                }
                if (aggIndexLength > state.getAggOffset()) {
                    Vector<MetricProtos.FileChunk> aggChunks = MetricsReader.readChunkIndex(MetricsEngine.this.config.aggChunkIndexFile, state.getAggOffset(), aggIndexLength);
                    okAgg = this.aggStream.write(aggChunks);
                }
                if (okRaw || okAgg) {
                    MetricProtos.S3UploadState.Builder sb = MetricProtos.S3UploadState.newBuilder(state);
                    sb.setTime(System.currentTimeMillis());
                    if (okRaw) {
                        sb.setRawOffset(rawIndexLength);
                    }
                    if (okAgg) {
                        sb.setAggOffset(aggIndexLength);
                    }
                    state = sb.build();
                    logger.info(String.format("commit %s rawOffset:%d aggOffet:%d", MetricsEngine.this.config.s3UploadStateFile, state.getRawOffset(), state.getAggOffset()));
                    swriter.appendAndFlush(sb.build());
                }
            }
            catch (Exception e) {
                logger.error("Failed to upload to S3 ", (Throwable)e);
            }
        }

        void ensureSession() throws RuntimeException, IOException {
            if (this.session != null) {
                return;
            }
            if (MetricsUtil.isDesktopMachine()) {
                String bucket = "danzhi-test";
                String prefix = MetricsUtil.getHostName() + "/";
                this.session = new SessionS3Client("/home/danzhi/emr/credentials.json", bucket, prefix);
            } else {
                this.session = new SessionS3Client();
            }
            this.s3root = S3Path.getPath(this.session.getBucket(), this.session.getPrefix() + "metrics/");
            this.s3rawDir = S3Path.combine(this.s3root, "raw");
            this.s3aggDir = S3Path.combine(this.s3root, "agg");
        }

        MetricProtos.S3UploadState initialize() throws IOException {
            logger.info("(Re)Initialize raw and agg chunk streams");
            S3ChunkOutputStream.delete(this.s3rawDir, this.session);
            S3ChunkOutputStream.delete(this.s3aggDir, this.session);
            this.rawStream = null;
            this.aggStream = null;
            MetricProtos.S3UploadState.Builder sb = MetricProtos.S3UploadState.newBuilder();
            sb.setTime(System.currentTimeMillis());
            sb.setRawOffset(0L);
            sb.setAggOffset(0L);
            return sb.build();
        }
    }

    class AggChunkTask
    implements Callable<Boolean> {
        public MetricProtos.FileChunk chunk;
        public boolean success;
        public int numRecords;

        AggChunkTask(MetricProtos.FileChunk chunk) {
            this.chunk = chunk;
            this.success = false;
            this.numRecords = 0;
        }

        @Override
        public Boolean call() {
            try {
                MetricProtos.EmrMetricRecord r;
                MetricRecordReader reader = new MetricRecordReader(this.chunk);
                while ((r = reader.read()) != null) {
                    ++this.numRecords;
                    if (this.numRecords % 100000 == 0) {
                        logger.info("Read {} records {}", (Object)this.numRecords, (Object)this.chunk.getFilePath());
                    }
                    InstanceProcessor p = MetricsEngine.this.ensureInstanceProcessor(r.getKey().getInstanceId());
                    p.dispatchInput(r);
                    p.dispatchOutput(r);
                }
                reader.close();
                this.success = true;
                return true;
            }
            catch (Exception e) {
                logger.info("ChunkCallable {} exception {}", (Object)this.chunk.getFilePath(), (Object)e);
                return false;
            }
        }
    }

    class ProcessTask
    implements Callable<Boolean> {
        InstanceProcessor p;
        int interval;
        public final Vector<MetricProtos.EmrMetricRecord> output;

        ProcessTask(InstanceProcessor p, int interval) {
            this.p = p;
            this.interval = interval;
            this.output = new Vector();
        }

        @Override
        public Boolean call() {
            try {
                this.p.process(MetricsEngine.this.engineCursor, this.interval, this.output);
                return true;
            }
            catch (Exception e) {
                logger.info("{} process() exception {}", (Object)this.p.instanceId, (Object)e);
                return false;
            }
        }
    }

    class RawChunkTask
    implements Callable<Boolean> {
        public MetricProtos.FileChunk chunk;
        public long minTime;
        public long maxTime;
        public boolean success;

        RawChunkTask(MetricProtos.FileChunk chunk) {
            this.chunk = chunk;
            this.minTime = 0L;
            this.maxTime = 0L;
            this.success = false;
        }

        @Override
        public Boolean call() {
            try {
                MetricProtos.EmrMetricRecord r;
                MetricRecordReader reader = new MetricRecordReader(this.chunk);
                InstanceProcessor p = null;
                while ((r = reader.read()) != null) {
                    if (p == null) {
                        p = MetricsEngine.this.ensureInstanceProcessor(r.getKey().getInstanceId());
                    }
                    this.maxTime = Math.max(this.maxTime, MetricsUtil.getMaxTimeStamp(r));
                    this.minTime = this.minTime == 0L ? MetricsUtil.getMinTimeStamp(r) : Math.min(this.minTime, MetricsUtil.getMinTimeStamp(r));
                    p.dispatchInput(r);
                    MetricsEngine.this.globalProcessor.dispatchInput(r);
                }
                reader.close();
                this.success = true;
                return true;
            }
            catch (Exception e) {
                logger.info("ChunkCallable {} exception {}", (Object)this.chunk.getFilePath(), (Object)e);
                return false;
            }
        }
    }

    public static class WorkerThreadFactory
    implements ThreadFactory {
        private int counter = 0;
        private String prefix = "w";

        public WorkerThreadFactory(String prefix) {
            this.prefix = prefix;
        }

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, String.format("%s-%02d", this.prefix, ++this.counter));
        }
    }
}

