/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.llap.daemon.impl;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.LlapNodeId;
import org.apache.hadoop.hive.llap.daemon.impl.LlapTokenChecker;
import org.apache.hadoop.hive.llap.daemon.impl.QueryFragmentInfo;
import org.apache.hadoop.hive.llap.daemon.impl.QueryIdentifier;
import org.apache.hadoop.hive.llap.daemon.impl.QueryInfo;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
import org.apache.hadoop.hive.llap.log.Log4jQueryCompleteMarker;
import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler;
import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.AbstractService;
import org.apache.logging.slf4j.Log4jMarker;
import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.TokenCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.slf4j.Marker;

public class QueryTracker
extends AbstractService {
    private static final Logger LOG = LoggerFactory.getLogger(QueryTracker.class);
    private static final Marker QUERY_COMPLETE_MARKER = new Log4jMarker((org.apache.logging.log4j.Marker)new Log4jQueryCompleteMarker());
    private final ScheduledExecutorService executorService;
    private final ConcurrentHashMap<QueryIdentifier, QueryInfo> queryInfoMap = new ConcurrentHashMap();
    private final String[] localDirsBase;
    private final FileSystem localFs;
    private final String clusterId;
    private final long defaultDeleteDelaySeconds;
    private final boolean routeBasedLoggingEnabled;
    private final Set<QueryIdentifier> completedDagMap = Collections.newSetFromMap(new ConcurrentHashMap());
    private final Lock lock = new ReentrantLock();
    private final ConcurrentMap<QueryIdentifier, ReentrantReadWriteLock> dagSpecificLocks = new ConcurrentHashMap<QueryIdentifier, ReentrantReadWriteLock>();
    private final ConcurrentMap<QueryIdentifier, ConcurrentMap<String, LlapDaemonProtocolProtos.SourceStateProto>> sourceCompletionMap = new ConcurrentHashMap<QueryIdentifier, ConcurrentMap<String, LlapDaemonProtocolProtos.SourceStateProto>>();
    private final ConcurrentHashMap<QueryIdentifier, String> queryIdentifierToHiveQueryId = new ConcurrentHashMap();

    public QueryTracker(Configuration conf, String[] localDirsBase, String clusterId) {
        super("QueryTracker");
        this.localDirsBase = localDirsBase;
        this.clusterId = clusterId;
        try {
            this.localFs = FileSystem.getLocal((Configuration)conf);
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to setup local filesystem instance", e);
        }
        this.defaultDeleteDelaySeconds = HiveConf.getTimeVar((Configuration)conf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_FILE_CLEANUP_DELAY_SECONDS, (TimeUnit)TimeUnit.SECONDS);
        int numCleanerThreads = HiveConf.getIntVar((Configuration)conf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_DAEMON_NUM_FILE_CLEANER_THREADS);
        this.executorService = Executors.newScheduledThreadPool(numCleanerThreads, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("QueryCompletionThread %d").build());
        String logger = HiveConf.getVar((Configuration)conf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_DAEMON_LOGGER);
        this.routeBasedLoggingEnabled = logger != null && logger.equalsIgnoreCase("query-routing");
        LOG.info("QueryTracker setup with numCleanerThreads={}, defaultCleanupDelay(s)={}, routeBasedLogging={}", new Object[]{numCleanerThreads, this.defaultDeleteDelaySeconds, this.routeBasedLoggingEnabled});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appIdString, String dagIdString, String dagName, String hiveQueryIdString, int dagIdentifier, String vertexName, int fragmentNumber, int attemptNumber, String user, LlapDaemonProtocolProtos.SignableVertexSpec vertex, Token<JobTokenIdentifier> appToken, String fragmentIdString, LlapTokenChecker.LlapTokenInfo tokenInfo, LlapNodeId amNodeId) throws IOException {
        ReentrantReadWriteLock dagLock = this.getDagLock(queryIdentifier);
        dagLock.readLock().lock();
        try {
            if (this.completedDagMap.contains(queryIdentifier)) {
                this.dagSpecificLocks.remove(queryIdentifier);
                String message = "Dag " + dagName + " already complete. Rejecting fragment [" + vertexName + ", " + fragmentNumber + ", " + attemptNumber + "]";
                LOG.info(message);
                throw new RuntimeException(message);
            }
            if (tokenInfo == null) {
                tokenInfo = LlapTokenChecker.getTokenInfo(this.clusterId);
            }
            boolean isExistingQueryInfo = true;
            QueryInfo queryInfo = this.queryInfoMap.get(queryIdentifier);
            if (queryInfo == null) {
                QueryInfo old;
                if (UserGroupInformation.isSecurityEnabled()) {
                    Preconditions.checkNotNull((Object)tokenInfo.userName);
                }
                if ((old = this.queryInfoMap.putIfAbsent(queryIdentifier, queryInfo = new QueryInfo(queryIdentifier, appIdString, dagIdString, dagName, hiveQueryIdString, dagIdentifier, user, this.getSourceCompletionMap(queryIdentifier), this.localDirsBase, this.localFs, tokenInfo.userName, tokenInfo.appId, amNodeId, vertex.getTokenIdentifier(), appToken, vertex.getIsExternalSubmission()))) != null) {
                    queryInfo = old;
                } else {
                    isExistingQueryInfo = false;
                }
            }
            if (isExistingQueryInfo) {
                LlapTokenChecker.checkPermissions(tokenInfo, queryInfo.getTokenUserName(), queryInfo.getTokenAppId(), (Object)queryInfo.getQueryIdentifier());
            }
            this.queryIdentifierToHiveQueryId.putIfAbsent(queryIdentifier, hiveQueryIdString);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Registering request for {} with the ShuffleHandler", (Object)queryIdentifier);
            }
            if (!vertex.getIsExternalSubmission()) {
                ShuffleHandler.get().registerDag(appIdString, dagIdentifier, appToken, user, queryInfo.getLocalDirs());
            }
            QueryFragmentInfo queryFragmentInfo = queryInfo.registerFragment(vertexName, fragmentNumber, attemptNumber, vertex, fragmentIdString);
            return queryFragmentInfo;
        }
        finally {
            dagLock.readLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void registerDag(String applicationId, int dagId, String user, Credentials credentials) {
        Token jobToken = TokenCache.getSessionToken((Credentials)credentials);
        QueryIdentifier queryIdentifier = new QueryIdentifier(applicationId, dagId);
        ReentrantReadWriteLock dagLock = this.getDagLock(queryIdentifier);
        dagLock.readLock().lock();
        try {
            ShuffleHandler.get().registerDag(applicationId, dagId, (Token<JobTokenIdentifier>)jobToken, user, null);
        }
        finally {
            dagLock.readLock().unlock();
        }
    }

    void fragmentComplete(QueryFragmentInfo fragmentInfo) {
        QueryIdentifier qId = fragmentInfo.getQueryInfo().getQueryIdentifier();
        QueryInfo queryInfo = this.queryInfoMap.get(qId);
        if (queryInfo == null) {
            LOG.info("Ignoring fragmentComplete message for unknown query: {}", (Object)qId);
        } else {
            queryInfo.unregisterFragment(fragmentInfo);
            this.handleFragmentCompleteExternalQuery(queryInfo);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    List<QueryFragmentInfo> getRegisteredFragments(QueryIdentifier queryIdentifier) {
        ReentrantReadWriteLock dagLock = this.getDagLock(queryIdentifier);
        dagLock.readLock().lock();
        try {
            QueryInfo queryInfo = this.queryInfoMap.get(queryIdentifier);
            if (queryInfo == null) {
                LOG.warn("Unknown query: Returning an empty list of fragments");
                List<QueryFragmentInfo> list = Collections.emptyList();
                return list;
            }
            List<QueryFragmentInfo> list = queryInfo.getRegisteredFragments();
            return list;
        }
        finally {
            dagLock.readLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    QueryInfo queryComplete(QueryIdentifier queryIdentifier, long deleteDelay, boolean isExternalQuery) throws IOException {
        if (deleteDelay == -1L) {
            deleteDelay = this.defaultDeleteDelaySeconds;
        }
        ReentrantReadWriteLock dagLock = this.getDagLock(queryIdentifier);
        dagLock.writeLock().lock();
        try {
            QueryInfo queryInfo;
            QueryInfo queryInfo2 = queryInfo = isExternalQuery ? this.queryInfoMap.get(queryIdentifier) : this.checkPermissionsAndGetQuery(queryIdentifier);
            if (queryInfo == null) {
                LOG.warn("Ignoring query complete for unknown dag: {}", (Object)queryIdentifier);
                QueryInfo queryInfo3 = null;
                return queryInfo3;
            }
            LOG.info("Processing queryComplete for queryIdentifier={}, isExternalQuery={}, with deleteDelay={} seconds", new Object[]{queryIdentifier, isExternalQuery, deleteDelay});
            this.queryInfoMap.remove(queryIdentifier);
            if (!isExternalQuery) {
                this.rememberCompletedDag(queryIdentifier);
                this.cleanupLocalDirs(queryInfo, deleteDelay);
                this.handleLogOnQueryCompletion(queryInfo.getHiveQueryIdString(), queryInfo.getDagIdString());
            } else if (queryInfo.getRegisteredFragments().size() == 0) {
                LOG.debug("Queueing future cleanup for external queryId: {}", (Object)queryInfo.getHiveQueryIdString());
                this.executorService.schedule(new ExternalQueryCleanerCallable(queryInfo.getHiveQueryIdString(), queryInfo.getDagIdString(), queryInfo.getQueryIdentifier()), 1L, TimeUnit.MINUTES);
            } else if (LOG.isTraceEnabled()) {
                LOG.trace("NumRegisterFragments={}, Not queuing cleanup for external queryId={}", (Object)queryInfo.getRegisteredFragments().size(), (Object)queryInfo.getHiveQueryIdString());
            }
            this.sourceCompletionMap.remove(queryIdentifier);
            String savedQueryId = this.queryIdentifierToHiveQueryId.remove(queryIdentifier);
            if (!isExternalQuery) {
                this.removeQuerySpecificLock(queryIdentifier);
            }
            if (savedQueryId != null) {
                ObjectCacheFactory.removeLlapQueryCache((String)savedQueryId);
            }
            QueryInfo queryInfo4 = queryInfo;
            return queryInfo4;
        }
        finally {
            dagLock.writeLock().unlock();
        }
    }

    private void cleanupLocalDirs(QueryInfo queryInfo, long deleteDelay) {
        String[] localDirs = queryInfo.getLocalDirsNoCreate();
        if (localDirs != null) {
            for (String localDir : localDirs) {
                this.cleanupDir(localDir, deleteDelay);
                ShuffleHandler.get().unregisterDag(localDir, queryInfo.getAppIdString(), queryInfo.getDagIdentifier());
            }
        }
    }

    private void handleLogOnQueryCompletion(String queryIdString, String dagIdString) {
        if (this.routeBasedLoggingEnabled) {
            MDC.put((String)"dagId", (String)dagIdString);
            MDC.put((String)"queryId", (String)queryIdString);
            try {
                LOG.error(QUERY_COMPLETE_MARKER, "Ignore this. Log line to interact with logger. Query complete: " + queryIdString + ", " + dagIdString);
            }
            finally {
                MDC.clear();
            }
        }
    }

    private void removeQuerySpecificLock(QueryIdentifier queryIdentifier) {
        this.dagSpecificLocks.remove(queryIdentifier);
    }

    public void rememberCompletedDag(QueryIdentifier queryIdentifier) {
        if (this.completedDagMap.add(queryIdentifier)) {
            this.executorService.schedule(new DagMapCleanerCallable(queryIdentifier), 1L, TimeUnit.HOURS);
        } else {
            LOG.warn("Couldn't add {} to completed dag set", (Object)queryIdentifier);
        }
    }

    void registerSourceStateChange(QueryIdentifier queryIdentifier, String sourceName, LlapDaemonProtocolProtos.SourceStateProto sourceState) throws IOException {
        this.getSourceCompletionMap(queryIdentifier).put(sourceName, sourceState);
        QueryInfo queryInfo = this.checkPermissionsAndGetQuery(queryIdentifier);
        if (queryInfo != null) {
            queryInfo.sourceStateUpdated(sourceName);
        }
    }

    private ReentrantReadWriteLock getDagLockNoCreate(QueryIdentifier queryIdentifier) {
        return (ReentrantReadWriteLock)this.dagSpecificLocks.get(queryIdentifier);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ReentrantReadWriteLock getDagLock(QueryIdentifier queryIdentifier) {
        this.lock.lock();
        try {
            ReentrantReadWriteLock dagLock = (ReentrantReadWriteLock)this.dagSpecificLocks.get(queryIdentifier);
            if (dagLock == null) {
                dagLock = new ReentrantReadWriteLock();
                this.dagSpecificLocks.put(queryIdentifier, dagLock);
            }
            ReentrantReadWriteLock reentrantReadWriteLock = dagLock;
            return reentrantReadWriteLock;
        }
        finally {
            this.lock.unlock();
        }
    }

    private ConcurrentMap<String, LlapDaemonProtocolProtos.SourceStateProto> getSourceCompletionMap(QueryIdentifier queryIdentifier) {
        ConcurrentHashMap dagMap = (ConcurrentHashMap)this.sourceCompletionMap.get(queryIdentifier);
        if (dagMap == null) {
            dagMap = new ConcurrentHashMap();
            ConcurrentHashMap old = this.sourceCompletionMap.putIfAbsent(queryIdentifier, dagMap);
            dagMap = old != null ? old : dagMap;
        }
        return dagMap;
    }

    public void serviceStart() {
        LOG.info(this.getName() + " started");
    }

    public void serviceStop() {
        this.executorService.shutdown();
        try {
            this.executorService.awaitTermination(10000L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            LOG.warn("cannot finish QueryTracker cleanup because of InterruptedException", (Throwable)e);
        }
        LOG.info(this.getName() + " stopped");
    }

    private void cleanupDir(String dir, long deleteDelay) {
        LOG.info("Scheduling deletion of {} after {} seconds", (Object)dir, (Object)deleteDelay);
        this.executorService.schedule(new FileCleanerCallable(dir), deleteDelay, TimeUnit.SECONDS);
    }

    private QueryInfo checkPermissionsAndGetQuery(QueryIdentifier queryId) throws IOException {
        QueryInfo queryInfo = this.queryInfoMap.get(queryId);
        if (queryInfo == null) {
            return null;
        }
        LlapTokenChecker.checkPermissions(this.clusterId, queryInfo.getTokenUserName(), queryInfo.getTokenAppId(), (Object)queryInfo.getQueryIdentifier());
        return queryInfo;
    }

    public boolean checkPermissionsForQuery(QueryIdentifier queryId) throws IOException {
        return this.checkPermissionsAndGetQuery(queryId) != null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleFragmentCompleteExternalQuery(QueryInfo queryInfo) {
        if (queryInfo.isExternalQuery()) {
            boolean locked;
            ReentrantReadWriteLock dagLock = this.getDagLock(queryInfo.getQueryIdentifier());
            if (dagLock == null) {
                LOG.warn("Ignoring fragment completion for unknown query: {}", (Object)queryInfo.getQueryIdentifier());
            }
            if (!(locked = dagLock.writeLock().tryLock())) {
                return;
            }
            try {
                if (queryInfo.getRegisteredFragments().size() == 0) {
                    this.queryComplete(queryInfo.getQueryIdentifier(), -1L, true);
                } else if (LOG.isTraceEnabled()) {
                    LOG.trace("Not invoking queryComplete on fragmentComplete for {}, since there are known fragments. count={}", (Object)queryInfo.getHiveQueryIdString(), (Object)queryInfo.getRegisteredFragments().size());
                }
            }
            catch (IOException e) {
                LOG.error("Failed to process query complete for external submission: {}", (Object)queryInfo.getQueryIdentifier());
            }
            finally {
                dagLock.writeLock().unlock();
            }
        }
    }

    private class ExternalQueryCleanerCallable
    extends CallableWithNdc<Void> {
        private final String queryIdString;
        private final String dagIdString;
        private final QueryIdentifier queryIdentifier;

        public ExternalQueryCleanerCallable(String queryIdString, String dagIdString, QueryIdentifier queryIdentifier) {
            this.queryIdString = queryIdString;
            this.dagIdString = dagIdString;
            this.queryIdentifier = queryIdentifier;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected Void callInternal() {
            LOG.info("External cleanup callable for {}", (Object)this.queryIdentifier);
            ReentrantReadWriteLock dagLock = QueryTracker.this.getDagLockNoCreate(this.queryIdentifier);
            if (dagLock == null) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("null dagLock. No cleanup required at the moment for {}", (Object)this.queryIdString);
                }
                return null;
            }
            boolean locked = dagLock.writeLock().tryLock();
            if (!locked) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Lock not obtained. Skipping cleanup for {}", (Object)this.queryIdString);
                }
                return null;
            }
            try {
                QueryInfo queryInfo = (QueryInfo)QueryTracker.this.queryInfoMap.get(this.queryIdentifier);
                if (queryInfo != null) {
                    if (LOG.isTraceEnabled()) {
                        LOG.info("QueryInfo found for {}. Expecting future cleanup", (Object)this.queryIdString);
                    }
                    Void void_ = null;
                    return void_;
                }
                LOG.info("Processing cleanup for {}", (Object)this.queryIdString);
                QueryTracker.this.handleLogOnQueryCompletion(this.queryIdString, this.dagIdString);
                QueryTracker.this.removeQuerySpecificLock(this.queryIdentifier);
            }
            finally {
                dagLock.writeLock().unlock();
            }
            return null;
        }
    }

    private class DagMapCleanerCallable
    extends CallableWithNdc<Void> {
        private final QueryIdentifier queryIdentifier;

        private DagMapCleanerCallable(QueryIdentifier queryIdentifier) {
            this.queryIdentifier = queryIdentifier;
        }

        protected Void callInternal() {
            QueryTracker.this.completedDagMap.remove(this.queryIdentifier);
            return null;
        }
    }

    private class FileCleanerCallable
    extends CallableWithNdc<Void> {
        private final String dirToDelete;

        private FileCleanerCallable(String dirToDelete) {
            this.dirToDelete = dirToDelete;
        }

        protected Void callInternal() {
            Path pathToDelete = new Path(this.dirToDelete);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Deleting path: " + pathToDelete);
            }
            try {
                QueryTracker.this.localFs.delete(new Path(this.dirToDelete), true);
            }
            catch (IOException e) {
                LOG.warn("Ignoring exception while cleaning up path: " + pathToDelete, (Throwable)e);
            }
            return null;
        }
    }
}

