package org.apache.tez.dag.history.logging.ats;

import java.io.Closeable;
import java.io.Flushable;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.security.HistoryACLPolicyManager;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.events.DAGRecoveredEvent;
import org.apache.tez.dag.history.events.DAGSubmittedEvent;
import org.apache.tez.dag.history.logging.HistoryLoggingService;
import org.apache.tez.dag.records.TezDAGID;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializationConfig;
import org.codehaus.jackson.map.annotate.JsonSerialize;
import org.codehaus.jackson.util.MinimalPrettyPrinter;
import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;

/* loaded from: input_file:org/apache/tez/dag/history/logging/ats/EntityFileLoggingService.class */
public class EntityFileLoggingService extends HistoryLoggingService {
    private static final Log LOG = LogFactory.getLog(EntityFileLoggingService.class);
    private static final short APP_LOG_DIR_PERMISSIONS = 504;
    private static final short FILE_LOG_PERMISSIONS = 416;
    private static final String TEZ_ENTITY_FILE_HISTORY_FLUSH_INTERVAL_SECS = "tez.entity-file-history.flush-interval-secs";
    private static final long TEZ_ENTITY_FILE_HISTORY_FLUSH_INTERVAL_SECS_DEFAULT = 10;
    public static final String TIMELINE_SERVICE_ENTITYFILE_ACTIVE_DIR = "yarn.timeline-service.entity-file-store.active-dir";
    public static final String TIMELINE_SERVICE_ENTITYFILE_ACTIVE_DIR_DEFAULT = "/tmp/entity-file-history/active";
    public static final String TIMELINE_SERVICE_ENTITYFILE_SUMMARY_ENTITY_TYPES = "yarn.timeline-service.entity-file-store.summary-entity-types";
    private static final String ACL_MANAGER_CLASS_NAME = "org.apache.tez.dag.history.ats.acls.EntityFileHistoryACLPolicyManager";
    public static final String SUMMARY_LOG_PREFIX = "summarylog-";
    public static final String ENTITY_LOG_PREFIX = "entitylog-";
    private LinkedBlockingQueue<DAGHistoryEvent> eventQueue;
    private Thread eventHandlerThread;
    private AtomicBoolean stopped;
    private int eventCounter;
    private HistoryACLPolicyManager aclPolicyManager;
    private HashSet<TezDAGID> skippedDAGs;
    private Map<TezDAGID, String> dagDomainIdMap;
    private String sessionDomainId;
    private EntityLog summaryLog;
    private EntityLog entityLog;
    private ObjectMapper objMapper;
    private Set<String> summaryEntityTypes;
    private volatile Exception exception;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tez/dag/history/logging/ats/EntityFileLoggingService$EntityLog.class */
    public static class EntityLog implements Closeable, Flushable {
        private FSDataOutputStream stream;
        private Timer flushTimer;
        private FlushTimerTask flushTimerTask;
        private ObjectMapper objMapper;
        private JsonGenerator jsonGenerator;
        private volatile IOException exception = null;

        /* loaded from: input_file:org/apache/tez/dag/history/logging/ats/EntityFileLoggingService$EntityLog$FlushTimerTask.class */
        private class FlushTimerTask extends TimerTask {
            private FlushTimerTask() {
            }

            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                try {
                    if (EntityLog.this.exception == null) {
                        EntityLog.this.flush();
                    }
                } catch (IOException e) {
                    EntityLog.this.exception = e;
                }
            }
        }

        public EntityLog(Configuration configuration, FileSystem fileSystem, Path path, ObjectMapper objectMapper) throws IOException {
            EntityFileLoggingService.LOG.info("Writing history to " + path);
            this.objMapper = objectMapper;
            this.stream = fileSystem.create(path, false);
            fileSystem.setPermission(path, new FsPermission((short) 416));
            this.jsonGenerator = new JsonFactory().createJsonGenerator(this.stream);
            this.jsonGenerator.setPrettyPrinter(new MinimalPrettyPrinter("\n"));
            this.flushTimer = new Timer(EntityLog.class.getSimpleName() + "FlushTimer", true);
            this.flushTimerTask = new FlushTimerTask();
            long j = configuration.getLong(EntityFileLoggingService.TEZ_ENTITY_FILE_HISTORY_FLUSH_INTERVAL_SECS, EntityFileLoggingService.TEZ_ENTITY_FILE_HISTORY_FLUSH_INTERVAL_SECS_DEFAULT) * 1000;
            this.flushTimer.schedule(this.flushTimerTask, j, j);
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public synchronized void close() throws IOException {
            cancelFlushTimer();
            if (this.stream != null) {
                this.jsonGenerator.close();
                this.stream.close();
                this.stream = null;
            }
            if (this.exception != null) {
                throw new IOException("Error detected at close", this.exception);
            }
        }

        @Override // java.io.Flushable
        public synchronized void flush() throws IOException {
            if (this.stream == null || this.exception != null) {
                return;
            }
            this.stream.hflush();
        }

        public synchronized void writeEntity(TimelineEntity timelineEntity) throws IOException {
            if (this.exception != null) {
                throw new IOException("Error writing entity", this.exception);
            }
            if (this.stream != null) {
                this.objMapper.writeValue(this.jsonGenerator, timelineEntity);
            }
        }

        public synchronized void cancelFlushTimer() {
            this.flushTimer.cancel();
        }
    }

    /* loaded from: input_file:org/apache/tez/dag/history/logging/ats/EntityFileLoggingService$EventProcessor.class */
    private class EventProcessor implements Runnable {
        private EventProcessor() {
        }

        @Override // java.lang.Runnable
        public void run() {
            boolean z = false;
            while (!EntityFileLoggingService.this.stopped.get() && !Thread.currentThread().isInterrupted()) {
                EntityFileLoggingService.access$204(EntityFileLoggingService.this);
                if (EntityFileLoggingService.this.eventCounter % 1000 == 0) {
                    EntityFileLoggingService.LOG.info("Event queue size=" + EntityFileLoggingService.this.eventQueue.size());
                }
                try {
                    DAGHistoryEvent dAGHistoryEvent = (DAGHistoryEvent) EntityFileLoggingService.this.eventQueue.take();
                    if (!z) {
                        try {
                            EntityFileLoggingService.this.handleEvent(dAGHistoryEvent);
                        } catch (Exception e) {
                            EntityFileLoggingService.this.exception = e;
                            z = true;
                            EntityFileLoggingService.LOG.error("Error writing entity log", e);
                        }
                    }
                } catch (InterruptedException e2) {
                    EntityFileLoggingService.LOG.info("EventQueue take interrupted. Returning");
                    return;
                }
            }
        }
    }

    public EntityFileLoggingService() {
        super(EntityFileLoggingService.class.getSimpleName());
        this.eventQueue = new LinkedBlockingQueue<>();
        this.stopped = new AtomicBoolean(false);
        this.eventCounter = 0;
        this.skippedDAGs = new HashSet<>();
        this.dagDomainIdMap = new HashMap();
        this.exception = null;
    }

    private static Path setupLogDir(Configuration configuration, ApplicationId applicationId) throws IOException {
        Path path = new Path(configuration.get(TIMELINE_SERVICE_ENTITYFILE_ACTIVE_DIR, TIMELINE_SERVICE_ENTITYFILE_ACTIVE_DIR_DEFAULT));
        FileSystem fileSystem = path.getFileSystem(configuration);
        if (!fileSystem.exists(path)) {
            throw new IOException(path + " does not exist");
        }
        Path path2 = new Path(path, applicationId.toString());
        if (!fileSystem.exists(path2)) {
            FileSystem.mkdirs(fileSystem, path2, new FsPermission((short) 504));
        }
        return path2;
    }

    public void serviceInit(Configuration configuration) throws Exception {
        LOG.info("Initializing " + EntityFileLoggingService.class.getSimpleName());
        this.sessionDomainId = configuration.get("tez.yarn.ats.acl.session.domain.id");
        LOG.info("Using org.apache.tez.dag.history.ats.acls.EntityFileHistoryACLPolicyManager to manage Timeline ACLs");
        try {
            this.aclPolicyManager = (HistoryACLPolicyManager) ReflectionUtils.createClazzInstance(ACL_MANAGER_CLASS_NAME);
            this.aclPolicyManager.setConf(configuration);
        } catch (TezUncheckedException e) {
            LOG.warn("Could not instantiate object for org.apache.tez.dag.history.ats.acls.EntityFileHistoryACLPolicyManager. ACLs cannot be enforced correctly for history data in Timeline", e);
            if (!configuration.getBoolean("tez.allow.disabled.timeline-domains", false)) {
                throw e;
            }
            this.aclPolicyManager = null;
        }
        Collection stringCollection = configuration.getStringCollection(TIMELINE_SERVICE_ENTITYFILE_SUMMARY_ENTITY_TYPES);
        if (stringCollection.isEmpty()) {
            throw new IllegalArgumentException("yarn.timeline-service.entity-file-store.summary-entity-types is not set");
        }
        LOG.info("Entity types for summary store: " + stringCollection);
        this.summaryEntityTypes = new HashSet(stringCollection);
    }

    public void serviceStart() {
        LOG.info("Starting " + EntityFileLoggingService.class.getSimpleName());
        Configuration config = getConfig();
        this.objMapper = createObjectMapper();
        try {
            ApplicationAttemptId applicationAttemptId = this.appContext.getApplicationAttemptId();
            Path path = setupLogDir(config, applicationAttemptId.getApplicationId());
            FileSystem fileSystem = path.getFileSystem(config);
            this.summaryLog = new EntityLog(config, fileSystem, new Path(path, SUMMARY_LOG_PREFIX + applicationAttemptId.toString()), this.objMapper);
            this.entityLog = new EntityLog(config, fileSystem, new Path(path, ENTITY_LOG_PREFIX + applicationAttemptId.toString()), this.objMapper);
            this.eventHandlerThread = new Thread(new EventProcessor(), EntityFileLoggingService.class.getSimpleName() + "EventHandler");
            this.eventHandlerThread.start();
        } catch (IOException e) {
            throw new TezUncheckedException("Error initializing entity logs", e);
        }
    }

    private ObjectMapper createObjectMapper() {
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setAnnotationIntrospector(new JaxbAnnotationIntrospector());
        objectMapper.setSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
        objectMapper.configure(SerializationConfig.Feature.CLOSE_CLOSEABLE, false);
        return objectMapper;
    }

    public void serviceStop() {
        LOG.info("Stopping " + EntityFileLoggingService.class.getSimpleName() + ", eventQueueBacklog=" + this.eventQueue.size());
        this.stopped.set(true);
        try {
            if (this.eventHandlerThread != null) {
                this.eventHandlerThread.interrupt();
                LOG.debug("Waiting for event handler thread to complete");
                this.eventHandlerThread.join();
            }
        } catch (InterruptedException e) {
            LOG.info("Interrupted Exception while stopping", e);
        }
        if (!this.eventQueue.isEmpty()) {
            LOG.info("Writing the remaining " + this.eventQueue.size() + " events");
            Iterator<DAGHistoryEvent> it = this.eventQueue.iterator();
            while (it.hasNext()) {
                try {
                    handleEvent(it.next());
                } catch (IOException e2) {
                    throw new YarnRuntimeException("Error writing log", e2);
                }
            }
        }
        IOUtils.cleanup(LOG, new Closeable[]{this.summaryLog});
        IOUtils.cleanup(LOG, new Closeable[]{this.entityLog});
    }

    public void handle(DAGHistoryEvent dAGHistoryEvent) {
        if (this.exception != null) {
            throw new YarnRuntimeException("Error writing log", this.exception);
        }
        this.eventQueue.add(dAGHistoryEvent);
    }

    private boolean isValidEvent(DAGHistoryEvent dAGHistoryEvent) {
        String str;
        HistoryEventType eventType = dAGHistoryEvent.getHistoryEvent().getEventType();
        TezDAGID dagID = dAGHistoryEvent.getDagID();
        if (dagID == null) {
            return false;
        }
        if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
            DAGSubmittedEvent historyEvent = dAGHistoryEvent.getHistoryEvent();
            String dAGName = historyEvent.getDAGName();
            if (dAGName != null && dAGName.startsWith("TezPreWarmDAG")) {
                this.skippedDAGs.add(dagID);
                return false;
            }
            if (this.aclPolicyManager != null && (str = historyEvent.getConf().get("tez.yarn.ats.acl.dag.domain.id")) != null) {
                this.dagDomainIdMap.put(dagID, str);
            }
        }
        if (eventType.equals(HistoryEventType.DAG_RECOVERED)) {
            DAGRecoveredEvent historyEvent2 = dAGHistoryEvent.getHistoryEvent();
            if (!historyEvent2.isHistoryLoggingEnabled()) {
                this.skippedDAGs.add(historyEvent2.getDagID());
                return false;
            }
        }
        if (eventType.equals(HistoryEventType.DAG_FINISHED) && this.skippedDAGs.remove(dagID)) {
            return false;
        }
        return dagID == null || !this.skippedDAGs.contains(dagID);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleEvent(DAGHistoryEvent dAGHistoryEvent) throws IOException {
        String str;
        if (isValidEvent(dAGHistoryEvent)) {
            String str2 = this.sessionDomainId;
            TezDAGID dagID = dAGHistoryEvent.getDagID();
            if (this.aclPolicyManager != null && dagID != null && (str = this.dagDomainIdMap.get(dagID)) != null) {
                str2 = str;
            }
            TimelineEntity convertToTimelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(dAGHistoryEvent.getHistoryEvent());
            if (this.aclPolicyManager != null && str2 != null && !str2.isEmpty()) {
                this.aclPolicyManager.updateTimelineEntityDomain(convertToTimelineEntity, str2);
            }
            EntityLog entityLog = this.entityLog;
            if (this.summaryEntityTypes.contains(convertToTimelineEntity.getEntityType())) {
                entityLog = this.summaryLog;
            }
            entityLog.writeEntity(convertToTimelineEntity);
        }
    }

    static /* synthetic */ int access$204(EntityFileLoggingService entityFileLoggingService) {
        int i = entityFileLoggingService.eventCounter + 1;
        entityFileLoggingService.eventCounter = i;
        return i;
    }
}
