package org.apache.tez.dag.history.recovery;

import java.io.IOException;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.service.AbstractService;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.SummaryEvent;
import org.apache.tez.dag.history.events.DAGSubmittedEvent;
import org.apache.tez.dag.records.TezDAGID;

/* loaded from: input_file:org/apache/tez/dag/history/recovery/RecoveryService.class */
public class RecoveryService extends AbstractService {
    private static final Log LOG = LogFactory.getLog(RecoveryService.class);
    private final AppContext appContext;
    public static final String RECOVERY_FATAL_OCCURRED_DIR = "RecoveryFatalErrorOccurred";
    private LinkedBlockingQueue<DAGHistoryEvent> eventQueue;
    private Set<TezDAGID> completedDAGs;
    private Set<TezDAGID> skippedDAGs;
    private Thread eventHandlingThread;
    private AtomicBoolean stopped;
    private AtomicBoolean started;
    private int eventCounter;
    private int eventsProcessed;
    private final Object lock;
    private FileSystem recoveryDirFS;
    Path recoveryPath;
    Map<TezDAGID, FSDataOutputStream> outputStreamMap;
    private int bufferSize;
    private FSDataOutputStream summaryStream;
    private int unflushedEventsCount;
    private long lastFlushTime;
    private int maxUnflushedEvents;
    private int flushInterval;
    private AtomicBoolean recoveryFatalErrorOccurred;

    public RecoveryService(AppContext appContext) {
        super(RecoveryService.class.getName());
        this.eventQueue = new LinkedBlockingQueue<>();
        this.completedDAGs = new HashSet();
        this.skippedDAGs = new HashSet();
        this.stopped = new AtomicBoolean(false);
        this.started = new AtomicBoolean(false);
        this.eventCounter = 0;
        this.eventsProcessed = 0;
        this.lock = new Object();
        this.outputStreamMap = new HashMap();
        this.unflushedEventsCount = 0;
        this.lastFlushTime = -1L;
        this.recoveryFatalErrorOccurred = new AtomicBoolean(false);
        this.appContext = appContext;
    }

    public void serviceInit(Configuration configuration) throws Exception {
        LOG.info("Initializing RecoveryService");
        this.recoveryPath = this.appContext.getCurrentRecoveryDir();
        this.recoveryDirFS = FileSystem.get(this.recoveryPath.toUri(), configuration);
        this.bufferSize = configuration.getInt("tez.dag.recovery.io.buffer.size", 8192);
        this.flushInterval = configuration.getInt("tez.dag.recovery.flush.interval.secs", 30);
        this.maxUnflushedEvents = configuration.getInt("tez.dag.recovery.max.unflushed.events", 100);
    }

    public void serviceStart() {
        LOG.info("Starting RecoveryService");
        this.lastFlushTime = this.appContext.getClock().getTime();
        this.eventHandlingThread = new Thread(new Runnable() { // from class: org.apache.tez.dag.history.recovery.RecoveryService.1
            @Override // java.lang.Runnable
            public void run() {
                while (!RecoveryService.this.stopped.get() && !Thread.currentThread().isInterrupted()) {
                    if (RecoveryService.this.recoveryFatalErrorOccurred.get()) {
                        RecoveryService.LOG.error("Recovery failure occurred. Stopping recovery thread. Current eventQueueSize=" + RecoveryService.this.eventQueue.size());
                        RecoveryService.this.eventQueue.clear();
                        return;
                    }
                    if (RecoveryService.this.eventCounter == 0 || RecoveryService.this.eventCounter % 1000 != 0) {
                        RecoveryService.access$404(RecoveryService.this);
                    } else {
                        RecoveryService.LOG.info("Event queue stats, eventsProcessedSinceLastUpdate=" + RecoveryService.this.eventsProcessed + ", eventQueueSize=" + RecoveryService.this.eventQueue.size());
                        RecoveryService.this.eventCounter = 0;
                        RecoveryService.this.eventsProcessed = 0;
                    }
                    try {
                        DAGHistoryEvent dAGHistoryEvent = (DAGHistoryEvent) RecoveryService.this.eventQueue.take();
                        synchronized (RecoveryService.this.lock) {
                            try {
                                RecoveryService.access$504(RecoveryService.this);
                                RecoveryService.this.handleRecoveryEvent(dAGHistoryEvent);
                            } catch (Exception e) {
                                RecoveryService.LOG.warn("Error handling recovery event", e);
                            }
                        }
                    } catch (InterruptedException e2) {
                        RecoveryService.LOG.info("EventQueue take interrupted. Returning");
                        return;
                    }
                }
            }
        }, "RecoveryEventHandlingThread");
        this.eventHandlingThread.start();
        this.started.set(true);
    }

    public void serviceStop() {
        LOG.info("Stopping RecoveryService");
        this.stopped.set(true);
        if (this.eventHandlingThread != null) {
            this.eventHandlingThread.interrupt();
        }
        if (this.summaryStream != null) {
            try {
                LOG.info("Closing Summary Stream");
                this.summaryStream.hsync();
                this.summaryStream.close();
            } catch (IOException e) {
                LOG.warn("Error when closing summary stream", e);
            }
        }
        for (Map.Entry<TezDAGID, FSDataOutputStream> entry : this.outputStreamMap.entrySet()) {
            try {
                LOG.info("Closing Output Stream for DAG " + entry.getKey());
                entry.getValue().hsync();
                entry.getValue().close();
            } catch (IOException e2) {
                LOG.warn("Error when closing output stream", e2);
            }
        }
    }

    public void handle(DAGHistoryEvent dAGHistoryEvent) throws IOException {
        String dAGName;
        if (this.stopped.get()) {
            LOG.warn("Igoring event as service stopped, eventType" + dAGHistoryEvent.getHistoryEvent().getEventType());
            return;
        }
        HistoryEventType eventType = dAGHistoryEvent.getHistoryEvent().getEventType();
        if (this.recoveryFatalErrorOccurred.get()) {
            return;
        }
        if (!this.started.get()) {
            LOG.warn("Adding event of type " + eventType + " to queue as service not started");
            this.eventQueue.add(dAGHistoryEvent);
            return;
        }
        TezDAGID dagID = dAGHistoryEvent.getDagID();
        if (eventType.equals(HistoryEventType.DAG_SUBMITTED) && (dAGName = ((DAGSubmittedEvent) dAGHistoryEvent.getHistoryEvent()).getDAGName()) != null && dAGName.startsWith("TezPreWarmDAG")) {
            this.skippedDAGs.add(dagID);
            return;
        }
        if (dagID == null || this.skippedDAGs.contains(dagID)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Skipping event for DAG, eventType=" + eventType + ", dagId=" + (dagID == null ? "null" : dagID.toString()) + ", isSkippedDAG=" + (dagID == null ? "null" : Boolean.valueOf(this.skippedDAGs.contains(dagID))));
                return;
            }
            return;
        }
        if (!(dAGHistoryEvent.getHistoryEvent() instanceof SummaryEvent)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Queueing Non-Summary Recovery event of type " + eventType.name());
            }
            this.eventQueue.add(dAGHistoryEvent);
            return;
        }
        synchronized (this.lock) {
            try {
                SummaryEvent summaryEvent = (SummaryEvent) dAGHistoryEvent.getHistoryEvent();
                handleSummaryEvent(dagID, eventType, summaryEvent);
                this.summaryStream.hsync();
                if (summaryEvent.writeToRecoveryImmediately()) {
                    handleRecoveryEvent(dAGHistoryEvent);
                    doFlush(this.outputStreamMap.get(dAGHistoryEvent.getDagID()), this.appContext.getClock().getTime(), true);
                } else {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Queueing Non-immediate Summary/Recovery event of type" + eventType.name());
                    }
                    this.eventQueue.add(dAGHistoryEvent);
                }
                if (eventType.equals(HistoryEventType.DAG_FINISHED)) {
                    LOG.info("DAG completed, dagId=" + dAGHistoryEvent.getDagID() + ", queueSize=" + this.eventQueue.size());
                    this.completedDAGs.add(dagID);
                    if (this.outputStreamMap.containsKey(dagID)) {
                        try {
                            this.outputStreamMap.get(dagID).close();
                            this.outputStreamMap.remove(dagID);
                        } catch (IOException e) {
                            LOG.warn("Error when trying to flush/close recovery file for dag, dagId=" + dAGHistoryEvent.getDagID());
                        }
                    }
                }
            } catch (IOException e2) {
                LOG.error("Error handling summary event, eventType=" + dAGHistoryEvent.getHistoryEvent().getEventType(), e2);
                Path path = new Path(this.recoveryPath, RECOVERY_FATAL_OCCURRED_DIR);
                try {
                    LOG.error("Adding a flag to ensure next AM attempt does not start up, flagFile=" + path.toString());
                    this.recoveryFatalErrorOccurred.set(true);
                    this.recoveryDirFS.mkdirs(path);
                    if (!this.recoveryDirFS.exists(path)) {
                        throw e2;
                    }
                    LOG.error("Recovery failure occurred. Skipping all events");
                    if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
                        throw e2;
                    }
                } catch (IOException e3) {
                    LOG.fatal("Failed to create fatal error flag dir " + path.toString(), e3);
                    throw e2;
                }
            }
        }
    }

    private void handleSummaryEvent(TezDAGID tezDAGID, HistoryEventType historyEventType, SummaryEvent summaryEvent) throws IOException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Handling summary event, dagID=" + tezDAGID + ", eventType=" + historyEventType);
        }
        if (this.summaryStream == null) {
            Path path = new Path(this.recoveryPath, this.appContext.getApplicationID() + ".summary");
            if (this.recoveryDirFS.exists(path)) {
                this.summaryStream = this.recoveryDirFS.append(path, this.bufferSize);
            } else {
                this.summaryStream = this.recoveryDirFS.create(path, false, this.bufferSize);
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Writing recovery event to summary stream, dagId=" + tezDAGID + ", eventType=" + historyEventType);
        }
        summaryEvent.toSummaryProtoStream(this.summaryStream);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleRecoveryEvent(DAGHistoryEvent dAGHistoryEvent) throws IOException {
        FSDataOutputStream create;
        HistoryEventType eventType = dAGHistoryEvent.getHistoryEvent().getEventType();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Handling recovery event of type " + dAGHistoryEvent.getHistoryEvent().getEventType());
        }
        TezDAGID dagID = dAGHistoryEvent.getDagID();
        if (this.completedDAGs.contains(dagID)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Skipping Recovery Event as DAG completed, dagId=" + dagID + ", completed=" + this.completedDAGs.contains(dagID) + ", skipped=" + this.skippedDAGs.contains(dagID) + ", eventType=" + eventType);
                return;
            }
            return;
        }
        if (!this.outputStreamMap.containsKey(dagID)) {
            Path path = new Path(this.recoveryPath, dagID.toString() + ".recovery");
            if (this.recoveryDirFS.exists(path)) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Opening DAG recovery file in append mode, filePath=" + path);
                }
                create = this.recoveryDirFS.append(path, this.bufferSize);
            } else {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Opening DAG recovery file in create mode, filePath=" + path);
                }
                create = this.recoveryDirFS.create(path, false, this.bufferSize);
            }
            this.outputStreamMap.put(dagID, create);
        }
        FSDataOutputStream fSDataOutputStream = this.outputStreamMap.get(dagID);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Writing recovery event to output stream, dagId=" + dagID + ", eventType=" + eventType);
        }
        this.unflushedEventsCount++;
        fSDataOutputStream.writeInt(dAGHistoryEvent.getHistoryEvent().getEventType().ordinal());
        dAGHistoryEvent.getHistoryEvent().toProtoStream(fSDataOutputStream);
        if (EnumSet.of(HistoryEventType.DAG_SUBMITTED, HistoryEventType.DAG_FINISHED).contains(eventType)) {
            return;
        }
        maybeFlush(fSDataOutputStream);
    }

    private void maybeFlush(FSDataOutputStream fSDataOutputStream) throws IOException {
        long time = this.appContext.getClock().getTime();
        boolean z = false;
        if (this.maxUnflushedEvents >= 0 && this.unflushedEventsCount >= this.maxUnflushedEvents) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Max unflushed events count reached. Flushing recovery data, unflushedEventsCount=" + this.unflushedEventsCount + ", maxUnflushedEvents=" + this.maxUnflushedEvents);
            }
            z = true;
        } else if (this.flushInterval >= 0 && time - this.lastFlushTime >= this.flushInterval * 1000) {
            LOG.debug("Flush interval time period elapsed. Flushing recovery data, lastTimeSinceFLush=" + this.lastFlushTime + ", timeSinceLastFlush=" + (time - this.lastFlushTime));
            z = true;
        }
        if (z) {
            doFlush(fSDataOutputStream, time, false);
        }
    }

    private void doFlush(FSDataOutputStream fSDataOutputStream, long j, boolean z) throws IOException {
        if (z) {
            fSDataOutputStream.hsync();
        } else {
            fSDataOutputStream.hflush();
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Flushing output stream, lastTimeSinceFLush=" + this.lastFlushTime + ", timeSinceLastFlush=" + (j - this.lastFlushTime) + ", unflushedEventsCount=" + this.unflushedEventsCount + ", maxUnflushedEvents=" + this.maxUnflushedEvents);
        }
        this.unflushedEventsCount = 0;
        this.lastFlushTime = j;
    }

    public boolean hasRecoveryFailed() {
        return this.recoveryFatalErrorOccurred.get();
    }

    static /* synthetic */ int access$404(RecoveryService recoveryService) {
        int i = recoveryService.eventCounter + 1;
        recoveryService.eventCounter = i;
        return i;
    }

    static /* synthetic */ int access$504(RecoveryService recoveryService) {
        int i = recoveryService.eventsProcessed + 1;
        recoveryService.eventsProcessed = i;
        return i;
    }
}
