package amazon.emr.metrics;

import amazon.emr.MetricProtos;
import amazon.emr.metrics.IntervalAggregator;
import amazon.emr.metrics.MetricsConfig;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:amazon/emr/metrics/MetricsEngine.class */
public class MetricsEngine implements Runnable {
    static final Logger logger = LoggerFactory.getLogger(MetricsEngine.class);
    public final MetricsConfig config;
    HashMap<String, InstanceProcessor> instances;
    InstanceProcessor globalProcessor;
    ChunkDetector chunkDetector;
    ExecutorService executorService;
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, new WorkerThreadFactory("e"));
    private boolean runPending;
    private int cycle;
    MetricProtos.EngineState engineState;
    long engineCursor;
    int numFailedCycles;
    HashMap<String, MetricsProcessorBase> processors;
    OutputChunkWriter outputWriter;
    RecordWriter<MetricProtos.EngineState> engineStateWriter;
    ChunkStreamUploader uploader;
    private static final long MaxExgineStateFileLength = 67108864;
    public static final int LateArrivalGracePeriod = 120000;
    private static final long EngineCycleSec = 20;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:amazon/emr/metrics/MetricsEngine$AggChunkTask.class */
    public class AggChunkTask implements Callable<Boolean> {
        public MetricProtos.FileChunk chunk;
        public boolean success = false;
        public int numRecords = 0;

        AggChunkTask(MetricProtos.FileChunk fileChunk) {
            this.chunk = fileChunk;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Boolean call() {
            try {
                MetricRecordReader metricRecordReader = new MetricRecordReader(this.chunk);
                while (true) {
                    MetricProtos.EmrMetricRecord read = metricRecordReader.read();
                    if (read == null) {
                        metricRecordReader.close();
                        this.success = true;
                        return true;
                    }
                    this.numRecords++;
                    if (this.numRecords % 100000 == 0) {
                        MetricsEngine.logger.info("Read {} records {}", Integer.valueOf(this.numRecords), this.chunk.getFilePath());
                    }
                    InstanceProcessor ensureInstanceProcessor = MetricsEngine.this.ensureInstanceProcessor(read.getKey().getInstanceId());
                    ensureInstanceProcessor.dispatchInput(read);
                    ensureInstanceProcessor.dispatchOutput(read);
                }
            } catch (Exception e) {
                MetricsEngine.logger.info("ChunkCallable {} exception {}", this.chunk.getFilePath(), e);
                return false;
            }
        }
    }

    /* loaded from: input_file:amazon/emr/metrics/MetricsEngine$ChunkStreamUploader.class */
    class ChunkStreamUploader {
        String s3root;
        String s3rawDir;
        String s3aggDir;
        SessionS3Client session = null;
        S3ChunkOutputStream rawStream = null;
        S3ChunkOutputStream aggStream = null;

        ChunkStreamUploader() {
        }

        public void uploadNoThrow() {
            try {
                ensureSession();
                long fileLength = MetricsUtil.getFileLength(MetricsEngine.this.config.rawChunkIndexFile);
                long fileLength2 = MetricsUtil.getFileLength(MetricsEngine.this.config.aggChunkIndexFile);
                MetricProtos.S3UploadState readS3UploadStateFile = MetricsReader.readS3UploadStateFile(MetricsEngine.this.config.s3UploadStateFile);
                if (readS3UploadStateFile == null || fileLength < readS3UploadStateFile.getRawOffset() || fileLength2 < readS3UploadStateFile.getAggOffset()) {
                    readS3UploadStateFile = initialize();
                }
                RecordWriter recordWriter = new RecordWriter(MetricsEngine.this.config.s3UploadStateFile);
                if (this.rawStream == null) {
                    this.rawStream = new S3ChunkOutputStream(this.s3rawDir, this.session);
                }
                if (this.aggStream == null) {
                    this.aggStream = new S3ChunkOutputStream(this.s3aggDir, this.session);
                }
                Assert.assertTrue(fileLength >= readS3UploadStateFile.getRawOffset());
                Assert.assertTrue(fileLength2 >= readS3UploadStateFile.getAggOffset());
                boolean z = false;
                boolean z2 = false;
                if (fileLength > readS3UploadStateFile.getRawOffset()) {
                    z = this.rawStream.write(MetricsReader.readChunkIndex(MetricsEngine.this.config.rawChunkIndexFile, readS3UploadStateFile.getRawOffset(), fileLength));
                }
                if (fileLength2 > readS3UploadStateFile.getAggOffset()) {
                    z2 = this.aggStream.write(MetricsReader.readChunkIndex(MetricsEngine.this.config.aggChunkIndexFile, readS3UploadStateFile.getAggOffset(), fileLength2));
                }
                if (z || z2) {
                    MetricProtos.S3UploadState.Builder newBuilder = MetricProtos.S3UploadState.newBuilder(readS3UploadStateFile);
                    newBuilder.setTime(System.currentTimeMillis());
                    if (z) {
                        newBuilder.setRawOffset(fileLength);
                    }
                    if (z2) {
                        newBuilder.setAggOffset(fileLength2);
                    }
                    MetricProtos.S3UploadState build = newBuilder.build();
                    MetricsEngine.logger.info(String.format("commit %s rawOffset:%d aggOffet:%d", MetricsEngine.this.config.s3UploadStateFile, Long.valueOf(build.getRawOffset()), Long.valueOf(build.getAggOffset())));
                    recordWriter.appendAndFlush((RecordWriter) newBuilder.build());
                }
            } catch (Exception e) {
                MetricsEngine.logger.error("Failed to upload to S3 ", e);
            }
        }

        void ensureSession() throws RuntimeException, IOException {
            if (this.session != null) {
                return;
            }
            if (MetricsUtil.isDesktopMachine()) {
                this.session = new SessionS3Client("/home/danzhi/emr/credentials.json", "danzhi-test", MetricsUtil.getHostName() + "/");
            } else {
                this.session = new SessionS3Client();
            }
            this.s3root = S3Path.getPath(this.session.getBucket(), this.session.getPrefix() + "metrics/");
            this.s3rawDir = S3Path.combine(this.s3root, "raw");
            this.s3aggDir = S3Path.combine(this.s3root, "agg");
        }

        MetricProtos.S3UploadState initialize() throws IOException {
            MetricsEngine.logger.info("(Re)Initialize raw and agg chunk streams");
            S3ChunkOutputStream.delete(this.s3rawDir, this.session);
            S3ChunkOutputStream.delete(this.s3aggDir, this.session);
            this.rawStream = null;
            this.aggStream = null;
            MetricProtos.S3UploadState.Builder newBuilder = MetricProtos.S3UploadState.newBuilder();
            newBuilder.setTime(System.currentTimeMillis());
            newBuilder.setRawOffset(0L);
            newBuilder.setAggOffset(0L);
            return newBuilder.build();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:amazon/emr/metrics/MetricsEngine$ProcessTask.class */
    public class ProcessTask implements Callable<Boolean> {
        InstanceProcessor p;
        int interval;
        public final Vector<MetricProtos.EmrMetricRecord> output = new Vector<>();

        ProcessTask(InstanceProcessor instanceProcessor, int i) {
            this.p = instanceProcessor;
            this.interval = i;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Boolean call() {
            try {
                this.p.process(MetricsEngine.this.engineCursor, this.interval, this.output);
                return true;
            } catch (Exception e) {
                MetricsEngine.logger.info("{} process() exception {}", this.p.instanceId, e);
                return false;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:amazon/emr/metrics/MetricsEngine$RawChunkTask.class */
    public class RawChunkTask implements Callable<Boolean> {
        public MetricProtos.FileChunk chunk;
        public long minTime = 0;
        public long maxTime = 0;
        public boolean success = false;

        RawChunkTask(MetricProtos.FileChunk fileChunk) {
            this.chunk = fileChunk;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Boolean call() {
            try {
                MetricRecordReader metricRecordReader = new MetricRecordReader(this.chunk);
                InstanceProcessor instanceProcessor = null;
                while (true) {
                    MetricProtos.EmrMetricRecord read = metricRecordReader.read();
                    if (read == null) {
                        metricRecordReader.close();
                        this.success = true;
                        return true;
                    }
                    if (instanceProcessor == null) {
                        instanceProcessor = MetricsEngine.this.ensureInstanceProcessor(read.getKey().getInstanceId());
                    }
                    this.maxTime = Math.max(this.maxTime, MetricsUtil.getMaxTimeStamp(read));
                    if (this.minTime == 0) {
                        this.minTime = MetricsUtil.getMinTimeStamp(read);
                    } else {
                        this.minTime = Math.min(this.minTime, MetricsUtil.getMinTimeStamp(read));
                    }
                    instanceProcessor.dispatchInput(read);
                    MetricsEngine.this.globalProcessor.dispatchInput(read);
                }
            } catch (Exception e) {
                MetricsEngine.logger.info("ChunkCallable {} exception {}", this.chunk.getFilePath(), e);
                return false;
            }
        }
    }

    /* loaded from: input_file:amazon/emr/metrics/MetricsEngine$WorkerThreadFactory.class */
    public static class WorkerThreadFactory implements ThreadFactory {
        private int counter = 0;
        private String prefix;

        public WorkerThreadFactory(String str) {
            this.prefix = "w";
            this.prefix = str;
        }

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            int i = this.counter + 1;
            this.counter = i;
            return new Thread(runnable, String.format("%s-%02d", this.prefix, Integer.valueOf(i)));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MetricsEngine(MetricsConfig metricsConfig) throws Exception {
        this.config = metricsConfig;
        MetricsUtil.ensureDir(metricsConfig.rootDir);
        MetricsUtil.ensureDir(metricsConfig.rawDir);
        this.instances = new HashMap<>();
        this.globalProcessor = ensureInstanceProcessor("global");
        this.chunkDetector = new ChunkDetector(metricsConfig);
        this.processors = new HashMap<>();
        this.outputWriter = new OutputChunkWriter(metricsConfig.aggDir, metricsConfig.aggChunkIndexFile);
        this.engineStateWriter = new RecordWriter<>(metricsConfig.engineStateFile);
        this.cycle = 0;
        this.runPending = false;
        this.numFailedCycles = 0;
        this.uploader = new ChunkStreamUploader();
        this.executorService = Executors.newFixedThreadPool(10, new WorkerThreadFactory("w"));
        restoreState();
        boolean z = MetricsConfig.disableEngineCycle;
        if (!z) {
            this.scheduler.scheduleAtFixedRate(this, 0L, EngineCycleSec, TimeUnit.SECONDS);
        }
        logger.info("Started metrics engine{}", z ? " in read-only mode" : "");
    }

    @Override // java.lang.Runnable
    public void run() {
        this.cycle++;
        logger.info("enter cycle {}", Integer.valueOf(this.cycle));
        if (this.runPending) {
            logger.info("run already pending, skip cycle {}", Integer.valueOf(this.cycle));
            return;
        }
        this.runPending = true;
        try {
            doCycleWork(this.cycle);
            this.numFailedCycles = 0;
            if (this.cycle % 10 == 0) {
                this.uploader.uploadNoThrow();
            }
            System.gc();
            logger.info("completed cycle {}", Integer.valueOf(this.cycle));
        } catch (Exception e) {
            this.numFailedCycles++;
            logger.error("doCycleWork {} error {}", Integer.valueOf(this.cycle), e);
            if (this.numFailedCycles > 10) {
                logger.error("Exit after {} consecutive failed engine cycles", Integer.valueOf(this.numFailedCycles));
                System.exit(1);
            }
        }
        this.runPending = false;
    }

    long cursor() {
        return this.engineCursor;
    }

    public static long getOutputCacheDuration(int i) {
        if (MetricsConfig.outputCacheLevel == MetricsConfig.CACHE_LEVEL.HIGH) {
            if (i >= 300) {
                return 2592000000L;
            }
            if (i >= 60) {
                return 172800000L;
            }
            return i >= 10 ? 36000000L : 600000L;
        }
        if (MetricsConfig.outputCacheLevel == MetricsConfig.CACHE_LEVEL.MEDIUM) {
            if (i < 300 && i < 60) {
                return i >= 10 ? 1800000L : 600000L;
            }
            return 3600000L;
        }
        if (MetricsConfig.outputCacheLevel != MetricsConfig.CACHE_LEVEL.LOW) {
            return 600000L;
        }
        if (i >= 300) {
            return 3600000L;
        }
        if (i >= 60) {
            return 1800000L;
        }
        return i >= 10 ? 1200000L : 600000L;
    }

    public static long getInputCacheDuration(int i) {
        return i <= 10 ? 500000L : 0L;
    }

    public void shutdown() {
        logger.info("shutdown metrics engine");
        this.executorService.shutdown();
        this.scheduler.shutdown();
    }

    public void register(MetricsProcessorBase metricsProcessorBase) {
        synchronized (this.processors) {
            if (this.processors.containsKey(metricsProcessorBase.id)) {
                throw new RuntimeException("Duplicate processor " + metricsProcessorBase.id);
            }
            this.processors.put(metricsProcessorBase.id, metricsProcessorBase);
        }
    }

    public MetricsProcessorBase getProcessor(MetricProtos.EmrMetricKey emrMetricKey) {
        MetricsProcessorBase metricsProcessorBase;
        String streamId = MetricsUtil.getStreamId(emrMetricKey);
        synchronized (this.processors) {
            metricsProcessorBase = this.processors.get(streamId);
        }
        return metricsProcessorBase;
    }

    public void doCycleWork(int i) throws Exception {
        Vector<MetricProtos.FileChunk> DetectNewChunks = this.chunkDetector.DetectNewChunks();
        logger.info("Cycle {} detected {} new file chunks", Integer.valueOf(i), Integer.valueOf(DetectNewChunks.size()));
        Vector vector = new Vector();
        Iterator<MetricProtos.FileChunk> it = DetectNewChunks.iterator();
        while (it.hasNext()) {
            vector.add(new RawChunkTask(it.next()));
        }
        this.executorService.invokeAll(vector);
        Vector<MetricProtos.FileChunk> vector2 = new Vector<>();
        long j = 0;
        long j2 = 0;
        Iterator it2 = vector.iterator();
        while (it2.hasNext()) {
            RawChunkTask rawChunkTask = (RawChunkTask) it2.next();
            MetricProtos.FileChunk.Builder newBuilder = MetricProtos.FileChunk.newBuilder();
            newBuilder.mergeFrom(rawChunkTask.chunk);
            newBuilder.setMinTime(rawChunkTask.minTime);
            newBuilder.setMaxTime(rawChunkTask.maxTime);
            vector2.add(newBuilder.build());
            if (j == 0 || j < rawChunkTask.minTime) {
                j = rawChunkTask.minTime;
            }
            if (j2 == 0 || j2 < rawChunkTask.maxTime) {
                j2 = rawChunkTask.maxTime;
            }
        }
        this.chunkDetector.commitChunks(vector2);
        this.engineCursor = computeNewEngineCursor(j2);
        Vector vector3 = new Vector();
        Iterator<InstanceProcessor> it3 = this.instances.values().iterator();
        while (it3.hasNext()) {
            vector3.add(new ProcessTask(it3.next(), 10));
        }
        this.executorService.invokeAll(vector3);
        Vector vector4 = new Vector();
        Iterator it4 = vector3.iterator();
        while (it4.hasNext()) {
            vector4.addAll(((ProcessTask) it4.next()).output);
        }
        logger.info("interval10  produced {} records", Integer.valueOf(vector4.size()));
        Iterator it5 = vector4.iterator();
        while (it5.hasNext()) {
            MetricProtos.EmrMetricRecord emrMetricRecord = (MetricProtos.EmrMetricRecord) it5.next();
            ensureInstanceProcessor(emrMetricRecord.getKey().getInstanceId()).dispatchInput(emrMetricRecord);
        }
        Vector<MetricProtos.EmrMetricRecord> vector5 = new Vector<>();
        Iterator<InstanceProcessor> it6 = this.instances.values().iterator();
        while (it6.hasNext()) {
            it6.next().process(this.engineCursor, 60, vector5);
        }
        logger.info("interval60  produced {} records", Integer.valueOf(vector5.size()));
        Iterator<MetricProtos.EmrMetricRecord> it7 = vector5.iterator();
        while (it7.hasNext()) {
            MetricProtos.EmrMetricRecord next = it7.next();
            ensureInstanceProcessor(next.getKey().getInstanceId()).dispatchInput(next);
        }
        Vector<MetricProtos.EmrMetricRecord> vector6 = new Vector<>();
        Iterator<InstanceProcessor> it8 = this.instances.values().iterator();
        while (it8.hasNext()) {
            it8.next().process(this.engineCursor, 300, vector6);
        }
        logger.info("interval300 produced {} records", Integer.valueOf(vector6.size()));
        Vector<MetricProtos.EmrMetricRecord> vector7 = new Vector<>();
        vector7.addAll(vector4);
        vector7.addAll(vector5);
        vector7.addAll(vector6);
        this.outputWriter.write(vector7);
        this.engineState = getState();
        MetricsUtil.backupFileAndRecreate(this.config.engineStateFile, MaxExgineStateFileLength);
        this.engineStateWriter.appendAndFlush((RecordWriter<MetricProtos.EngineState>) this.engineState);
        if (i % 10 == 1) {
            showState(this.engineState, null);
        }
        if (MetricsConfig.traceids != null) {
            showState(this.engineState, MetricsConfig.traceids);
        }
    }

    private long computeNewEngineCursor(long j) {
        if (j == 0) {
            return System.currentTimeMillis() - 120000;
        }
        long currentTimeMillis = (j <= 0 || j > System.currentTimeMillis()) ? System.currentTimeMillis() : j;
        return currentTimeMillis - 120000 > this.engineCursor ? currentTimeMillis - 120000 : this.engineCursor;
    }

    public static void showState(MetricProtos.EngineState engineState, List<String> list) {
        logger.info("engine_state cursor {}", MetricsUtil.getTimeStr(engineState.getCursor()));
        for (MetricProtos.ProcessorState processorState : engineState.getProcessorsList()) {
            if (list == null || list.contains(MetricsUtil.getStreamId(processorState.getKey()))) {
                System.out.format("  ic:%14d oc:%14d %s %s\n", Long.valueOf(processorState.getInputCursor()), Long.valueOf(processorState.getOutputCursor()), MetricsUtil.getTimeStr(processorState.getOutputCursor()), MetricsUtil.getStreamId(processorState.getKey()));
            }
        }
    }

    void restoreState() throws Exception {
        this.engineState = loadLastStateFromFile();
        if (this.engineState == null) {
            logger.info("No engine state found, create from scratch");
            MetricProtos.EngineState.Builder newBuilder = MetricProtos.EngineState.newBuilder();
            newBuilder.setTime(System.currentTimeMillis());
            newBuilder.setCursor(0L);
            this.engineState = newBuilder.build();
            this.engineCursor = 0L;
            return;
        }
        this.engineCursor = this.engineState.getCursor();
        logger.info("Try to restore metrics engine to {} cursor:{}", MetricsUtil.getTimeStr(this.engineState.getTime()), Long.valueOf(this.engineState.getCursor()));
        showState(this.engineState, null);
        restoreRawStreams();
        restoreAggregatedStreams();
        logger.info("Metrics engine restored to cursor {}", MetricsUtil.getTimeStr(this.engineState.getCursor()));
    }

    MetricProtos.EngineState loadLastStateFromFile() throws Exception {
        InputStream openStateFile = openStateFile();
        if (openStateFile == null) {
            return null;
        }
        MetricProtos.EngineState engineState = null;
        while (true) {
            MetricProtos.EngineState engineState2 = engineState;
            MetricProtos.EngineState parseDelimitedFrom = MetricProtos.EngineState.parseDelimitedFrom(openStateFile);
            if (parseDelimitedFrom == null) {
                return engineState2;
            }
            engineState = parseDelimitedFrom;
        }
    }

    public InputStream openStateFile() throws Exception {
        if (MetricsUtil.fileExists(this.config.engineStateFile)) {
            return this.config.hdfs ? FileSystem.get(new Configuration()).open(new Path(this.config.engineStateFile)) : new FileInputStream(this.config.engineStateFile);
        }
        logger.info("Engine state file does NOT exist {}", this.config.engineStateFile);
        return null;
    }

    MetricProtos.EngineState getState() {
        Vector<MetricProtos.ProcessorState> vector = new Vector<>();
        MetricProtos.EngineState.Builder newBuilder = MetricProtos.EngineState.newBuilder();
        newBuilder.setTime(System.currentTimeMillis());
        newBuilder.setCursor(this.engineCursor);
        Iterator<InstanceProcessor> it = this.instances.values().iterator();
        while (it.hasNext()) {
            it.next().getProcessorStates(vector);
        }
        Iterator<MetricProtos.ProcessorState> it2 = vector.iterator();
        while (it2.hasNext()) {
            MetricProtos.ProcessorState next = it2.next();
            if (!stateIsDefault(next)) {
                newBuilder.addProcessors(next);
            }
        }
        return newBuilder.build();
    }

    public MetricProtos.ProcessorState getProcessorState(MetricProtos.EmrMetricKey emrMetricKey) {
        for (MetricProtos.ProcessorState processorState : this.engineState.getProcessorsList()) {
            if (processorState.getKey().equals(emrMetricKey)) {
                return processorState;
            }
        }
        MetricProtos.ProcessorState.Builder newBuilder = MetricProtos.ProcessorState.newBuilder();
        newBuilder.setKey(emrMetricKey);
        newBuilder.setInputCursor(getDefaultInputCursor(emrMetricKey.getInterval()));
        newBuilder.setOutputCursor(getDefaultOutputCursor(emrMetricKey.getInterval()));
        return newBuilder.build();
    }

    long getDefaultInputCursor(int i) {
        return IntervalAggregator.TimeInterval.getIntervalStart(this.engineCursor, i);
    }

    long getDefaultOutputCursor(int i) {
        return Math.max(0L, this.engineCursor - getOutputCacheDuration(i));
    }

    boolean stateIsDefault(MetricProtos.ProcessorState processorState) {
        long defaultInputCursor = getDefaultInputCursor(processorState.getKey().getInterval());
        if (processorState.getInputCursor() == defaultInputCursor) {
            return ((double) processorState.getOutputCursor()) >= ((double) getDefaultOutputCursor(processorState.getKey().getInterval())) - (0.5d * ((double) getOutputCacheDuration(processorState.getKey().getInterval())));
        }
        logger.info(String.format("non-default inputCursor %d %d %s", Long.valueOf(processorState.getInputCursor()), Long.valueOf(processorState.getInputCursor() - defaultInputCursor), MetricsUtil.getStreamId(processorState.getKey())));
        return false;
    }

    public Set<String> getInstanceIds() {
        return this.instances.keySet();
    }

    public InstanceProcessor getInstanceProcessor(String str) {
        return this.instances.get(str);
    }

    public Vector<MetricProtos.EmrMetricRecord> getOutputRecords(MetricProtos.EmrMetricKey emrMetricKey) {
        MetricsProcessorBase processor = getProcessor(emrMetricKey);
        return processor == null ? new Vector<>() : processor.getOutputRecords();
    }

    public Vector<MetricProtos.EmrMetricRecord> getInputRecords(MetricProtos.EmrMetricKey emrMetricKey) {
        MetricsProcessorBase processor = getProcessor(emrMetricKey);
        return processor == null ? new Vector<>() : processor.getInputRecords();
    }

    void restoreRawStreams() throws Exception {
        if (!MetricsUtil.fileExists(this.config.rawChunkIndexFile)) {
            logger.info("No raw chunk index file: {}", this.config.rawChunkIndexFile);
            return;
        }
        Vector<MetricProtos.FileChunk> readChunkIndex = MetricsReader.readChunkIndex(this.config.rawChunkIndexFile);
        Vector vector = new Vector();
        long minRawStreamCursor = getMinRawStreamCursor();
        logger.info("minRawInputTime {} {}", Long.valueOf(minRawStreamCursor), MetricsUtil.getTimeStr(minRawStreamCursor));
        int i = 0;
        Iterator<MetricProtos.FileChunk> it = ChunkUtil.mergeChunks(readChunkIndex, minRawStreamCursor).iterator();
        while (it.hasNext()) {
            MetricProtos.FileChunk next = it.next();
            if (next.getMaxTime() == 0 || next.getMaxTime() >= minRawStreamCursor) {
                vector.add(new RawChunkTask(next));
            } else {
                i++;
            }
        }
        logger.info("Skipped {} raw chunks before input cursor {}", Integer.valueOf(i), Long.valueOf(minRawStreamCursor));
        this.executorService.invokeAll(vector);
    }

    void restoreAggregatedStreams() throws Exception {
        if (!MetricsUtil.fileExists(this.config.aggChunkIndexFile)) {
            logger.info("No agg chunk index file: {}", this.config.aggChunkIndexFile);
            return;
        }
        Vector<MetricProtos.FileChunk> readChunkIndex = MetricsReader.readChunkIndex(this.config.aggChunkIndexFile);
        Vector vector = new Vector();
        long minAggregatedStreamCursor = getMinAggregatedStreamCursor();
        logger.info("minOutputCursor {} {}", Long.valueOf(minAggregatedStreamCursor), MetricsUtil.getTimeStr(minAggregatedStreamCursor));
        Iterator<MetricProtos.FileChunk> it = ChunkUtil.mergeChunks(readChunkIndex, minAggregatedStreamCursor).iterator();
        while (it.hasNext()) {
            MetricProtos.FileChunk next = it.next();
            if (next.getMaxTime() == 0 || next.getMaxTime() >= minAggregatedStreamCursor) {
                vector.add(new AggChunkTask(next));
            }
        }
        this.executorService.invokeAll(vector);
        int i = 0;
        int i2 = 0;
        Iterator it2 = vector.iterator();
        while (it2.hasNext()) {
            AggChunkTask aggChunkTask = (AggChunkTask) it2.next();
            i += aggChunkTask.numRecords;
            if (aggChunkTask.success) {
                i2++;
            }
        }
        logger.info(String.format("restored %d aggregated records, %d/%d tasks succeeded", Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(vector.size())));
    }

    long getMinRawStreamCursor() {
        long cursor = this.engineState.getCursor();
        for (MetricProtos.ProcessorState processorState : this.engineState.getProcessorsList()) {
            if (processorState.getKey().getInterval() <= 10) {
                cursor = Math.min(cursor, processorState.getInputCursor() - getInputCacheDuration(10));
            }
        }
        return cursor;
    }

    long getMinAggregatedStreamCursor() {
        long cursor = this.engineState.getCursor() - getOutputCacheDuration(300);
        for (MetricProtos.ProcessorState processorState : this.engineState.getProcessorsList()) {
            if (processorState.getKey().getInterval() > 10) {
                cursor = Math.min(cursor, processorState.getInputCursor() - getOutputCacheDuration(processorState.getKey().getInterval()));
            }
        }
        return cursor;
    }

    InstanceProcessor ensureInstanceProcessor(String str) {
        InstanceProcessor instanceProcessor;
        synchronized (this) {
            if (!this.instances.containsKey(str)) {
                this.instances.put(str, new InstanceProcessor(this, str));
            }
            instanceProcessor = this.instances.get(str);
        }
        return instanceProcessor;
    }
}
