package org.apache.tez.dag.history.ats;

import com.google.common.annotations.VisibleForTesting;
import java.util.HashSet;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.events.DAGSubmittedEvent;
import org.apache.tez.dag.records.TezDAGID;

/* loaded from: input_file:org/apache/tez/dag/history/ats/ATSService.class */
public class ATSService extends AbstractService {
    private static final Log LOG = LogFactory.getLog(ATSService.class);
    private LinkedBlockingQueue<DAGHistoryEvent> eventQueue;
    private Thread eventHandlingThread;
    private AtomicBoolean stopped;
    private int eventCounter;
    private int eventsProcessed;
    private final Object lock;

    @VisibleForTesting
    TimelineClient timelineClient;
    private HashSet<TezDAGID> skippedDAGs;
    private final AppContext appContext;
    private long maxTimeToWaitOnShutdown;

    public ATSService(AppContext appContext) {
        super(ATSService.class.getName());
        this.eventQueue = new LinkedBlockingQueue<>();
        this.stopped = new AtomicBoolean(false);
        this.eventCounter = 0;
        this.eventsProcessed = 0;
        this.lock = new Object();
        this.skippedDAGs = new HashSet<>();
        this.appContext = appContext;
    }

    public void serviceInit(Configuration configuration) throws Exception {
        LOG.info("Initializing ATSService");
        this.timelineClient = TimelineClient.createTimelineClient();
        this.timelineClient.init(configuration);
        this.maxTimeToWaitOnShutdown = configuration.getLong("tez.yarn.ats.event.flush.timeout.millis", 3000L);
    }

    public void serviceStart() {
        LOG.info("Starting ATSService");
        this.timelineClient.start();
        this.eventHandlingThread = new Thread(new Runnable() { // from class: org.apache.tez.dag.history.ats.ATSService.1
            @Override // java.lang.Runnable
            public void run() {
                while (!ATSService.this.stopped.get() && !Thread.currentThread().isInterrupted()) {
                    if (ATSService.this.eventCounter == 0 || ATSService.this.eventCounter % 1000 != 0) {
                        ATSService.access$104(ATSService.this);
                    } else {
                        ATSService.LOG.info("Event queue stats, eventsProcessedSinceLastUpdate=" + ATSService.this.eventsProcessed + ", eventQueueSize=" + ATSService.this.eventQueue.size());
                        ATSService.this.eventCounter = 0;
                        ATSService.this.eventsProcessed = 0;
                    }
                    try {
                        DAGHistoryEvent dAGHistoryEvent = (DAGHistoryEvent) ATSService.this.eventQueue.take();
                        synchronized (ATSService.this.lock) {
                            ATSService.access$204(ATSService.this);
                            try {
                                ATSService.this.handleEvent(dAGHistoryEvent);
                            } catch (Exception e) {
                                ATSService.LOG.warn("Error handling event", e);
                            }
                        }
                    } catch (InterruptedException e2) {
                        ATSService.LOG.info("EventQueue take interrupted. Returning");
                        return;
                    }
                }
            }
        }, "HistoryEventHandlingThread");
        this.eventHandlingThread.start();
    }

    public void serviceStop() {
        DAGHistoryEvent poll;
        LOG.info("Stopping ATSService, eventQueueBacklog=" + this.eventQueue.size());
        this.stopped.set(true);
        if (this.eventHandlingThread != null) {
            this.eventHandlingThread.interrupt();
        }
        synchronized (this.lock) {
            if (!this.eventQueue.isEmpty()) {
                LOG.warn("ATSService being stopped, eventQueueBacklog=" + this.eventQueue.size() + ", maxTimeLeftToFlush=" + this.maxTimeToWaitOnShutdown);
                long time = this.appContext.getClock().getTime();
                if (this.maxTimeToWaitOnShutdown > 0) {
                    long j = time + this.maxTimeToWaitOnShutdown;
                    while (j >= this.appContext.getClock().getTime() && (poll = this.eventQueue.poll()) != null) {
                        try {
                            handleEvent(poll);
                        } catch (Exception e) {
                            LOG.warn("Error handling event", e);
                        }
                    }
                }
            }
        }
        if (!this.eventQueue.isEmpty()) {
            LOG.warn("Did not finish flushing eventQueue before stopping ATSService, eventQueueBacklog=" + this.eventQueue.size());
        }
        this.timelineClient.stop();
    }

    public void handle(DAGHistoryEvent dAGHistoryEvent) {
        this.eventQueue.add(dAGHistoryEvent);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleEvent(DAGHistoryEvent dAGHistoryEvent) {
        String dAGName;
        HistoryEventType eventType = dAGHistoryEvent.getHistoryEvent().getEventType();
        TezDAGID dagID = dAGHistoryEvent.getDagID();
        if (eventType.equals(HistoryEventType.DAG_SUBMITTED) && (dAGName = ((DAGSubmittedEvent) dAGHistoryEvent.getHistoryEvent()).getDAGName()) != null && dAGName.startsWith("TezPreWarmDAG")) {
            this.skippedDAGs.add(dagID);
            return;
        }
        if (eventType.equals(HistoryEventType.DAG_FINISHED) && this.skippedDAGs.remove(dagID)) {
            return;
        }
        if (dagID == null || !this.skippedDAGs.contains(dagID)) {
            try {
                TimelinePutResponse putEntities = this.timelineClient.putEntities(new TimelineEntity[]{dAGHistoryEvent.getHistoryEvent().convertToTimelineEntity()});
                if (putEntities != null && !putEntities.getErrors().isEmpty()) {
                    TimelinePutResponse.TimelinePutError timelinePutError = (TimelinePutResponse.TimelinePutError) putEntities.getErrors().get(0);
                    if (timelinePutError.getErrorCode() != 0) {
                        LOG.warn("Could not post history event to ATS, eventType=" + eventType + ", atsPutError=" + timelinePutError.getErrorCode());
                    }
                }
            } catch (Exception e) {
                LOG.warn("Could not handle history event, eventType=" + eventType, e);
            }
        }
    }

    static /* synthetic */ int access$104(ATSService aTSService) {
        int i = aTSService.eventCounter + 1;
        aTSService.eventCounter = i;
        return i;
    }

    static /* synthetic */ int access$204(ATSService aTSService) {
        int i = aTSService.eventsProcessed + 1;
        aTSService.eventsProcessed = i;
        return i;
    }
}
