package org.apache.hadoop.hive.llap.daemon.impl;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.LlapNodeId;
import org.apache.hadoop.hive.llap.daemon.impl.ContainerRunnerImpl;
import org.apache.hadoop.hive.llap.daemon.impl.LlapTokenChecker;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
import org.apache.hadoop.hive.llap.log.Log4jQueryCompleteMarker;
import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler;
import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.AbstractService;
import org.apache.logging.slf4j.Log4jMarker;
import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.TokenCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.slf4j.Marker;

/* loaded from: input_file:org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.class */
public class QueryTracker extends AbstractService {
    private static final Logger LOG = LoggerFactory.getLogger(QueryTracker.class);
    private static final Marker QUERY_COMPLETE_MARKER = new Log4jMarker(new Log4jQueryCompleteMarker());
    private final ScheduledExecutorService executorService;
    private final ConcurrentHashMap<QueryIdentifier, QueryInfo> queryInfoMap;
    private final String[] localDirsBase;
    private final FileSystem localFs;
    private final String clusterId;
    private final long defaultDeleteDelaySeconds;
    private final boolean routeBasedLoggingEnabled;
    private final Set<QueryIdentifier> completedDagMap;
    private final Lock lock;
    private final ConcurrentMap<QueryIdentifier, ReentrantReadWriteLock> dagSpecificLocks;
    private final ConcurrentMap<QueryIdentifier, ConcurrentMap<String, LlapDaemonProtocolProtos.SourceStateProto>> sourceCompletionMap;
    private final ConcurrentHashMap<QueryIdentifier, String> queryIdentifierToHiveQueryId;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/hive/llap/daemon/impl/QueryTracker$DagMapCleanerCallable.class */
    public class DagMapCleanerCallable extends CallableWithNdc<Void> {
        private final QueryIdentifier queryIdentifier;

        private DagMapCleanerCallable(QueryIdentifier queryIdentifier) {
            this.queryIdentifier = queryIdentifier;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: callInternal, reason: merged with bridge method [inline-methods] */
        public Void m54callInternal() {
            QueryTracker.this.completedDagMap.remove(this.queryIdentifier);
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/hive/llap/daemon/impl/QueryTracker$ExternalQueryCleanerCallable.class */
    public class ExternalQueryCleanerCallable extends CallableWithNdc<Void> {
        private final String queryIdString;
        private final String dagIdString;
        private final QueryIdentifier queryIdentifier;

        public ExternalQueryCleanerCallable(String str, String str2, QueryIdentifier queryIdentifier) {
            this.queryIdString = str;
            this.dagIdString = str2;
            this.queryIdentifier = queryIdentifier;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: callInternal, reason: merged with bridge method [inline-methods] */
        public Void m55callInternal() {
            QueryTracker.LOG.info("External cleanup callable for {}", this.queryIdentifier);
            ReentrantReadWriteLock dagLockNoCreate = QueryTracker.this.getDagLockNoCreate(this.queryIdentifier);
            if (dagLockNoCreate == null) {
                if (!QueryTracker.LOG.isTraceEnabled()) {
                    return null;
                }
                QueryTracker.LOG.trace("null dagLock. No cleanup required at the moment for {}", this.queryIdString);
                return null;
            }
            if (!dagLockNoCreate.writeLock().tryLock()) {
                if (!QueryTracker.LOG.isTraceEnabled()) {
                    return null;
                }
                QueryTracker.LOG.trace("Lock not obtained. Skipping cleanup for {}", this.queryIdString);
                return null;
            }
            try {
                if (((QueryInfo) QueryTracker.this.queryInfoMap.get(this.queryIdentifier)) != null) {
                    if (QueryTracker.LOG.isTraceEnabled()) {
                        QueryTracker.LOG.info("QueryInfo found for {}. Expecting future cleanup", this.queryIdString);
                    }
                    return null;
                }
                QueryTracker.LOG.info("Processing cleanup for {}", this.queryIdString);
                QueryTracker.this.handleLogOnQueryCompletion(this.queryIdString, this.dagIdString);
                QueryTracker.this.removeQuerySpecificLock(this.queryIdentifier);
                dagLockNoCreate.writeLock().unlock();
                return null;
            } finally {
                dagLockNoCreate.writeLock().unlock();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/hive/llap/daemon/impl/QueryTracker$FileCleanerCallable.class */
    public class FileCleanerCallable extends CallableWithNdc<Void> {
        private final String dirToDelete;

        private FileCleanerCallable(String str) {
            this.dirToDelete = str;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: callInternal, reason: merged with bridge method [inline-methods] */
        public Void m56callInternal() {
            Path path = new Path(this.dirToDelete);
            if (QueryTracker.LOG.isDebugEnabled()) {
                QueryTracker.LOG.debug("Deleting path: " + path);
            }
            try {
                QueryTracker.this.localFs.delete(new Path(this.dirToDelete), true);
                return null;
            } catch (IOException e) {
                QueryTracker.LOG.warn("Ignoring exception while cleaning up path: " + path, e);
                return null;
            }
        }
    }

    public QueryTracker(Configuration configuration, String[] strArr, String str) {
        super("QueryTracker");
        this.queryInfoMap = new ConcurrentHashMap<>();
        this.completedDagMap = Collections.newSetFromMap(new ConcurrentHashMap());
        this.lock = new ReentrantLock();
        this.dagSpecificLocks = new ConcurrentHashMap();
        this.sourceCompletionMap = new ConcurrentHashMap();
        this.queryIdentifierToHiveQueryId = new ConcurrentHashMap<>();
        this.localDirsBase = strArr;
        this.clusterId = str;
        try {
            this.localFs = FileSystem.getLocal(configuration);
            this.defaultDeleteDelaySeconds = HiveConf.getTimeVar(configuration, HiveConf.ConfVars.LLAP_FILE_CLEANUP_DELAY_SECONDS, TimeUnit.SECONDS);
            int intVar = HiveConf.getIntVar(configuration, HiveConf.ConfVars.LLAP_DAEMON_NUM_FILE_CLEANER_THREADS);
            this.executorService = Executors.newScheduledThreadPool(intVar, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("QueryCompletionThread %d").build());
            String var = HiveConf.getVar(configuration, HiveConf.ConfVars.LLAP_DAEMON_LOGGER);
            this.routeBasedLoggingEnabled = var != null && var.equalsIgnoreCase("query-routing");
            LOG.info("QueryTracker setup with numCleanerThreads={}, defaultCleanupDelay(s)={}, routeBasedLogging={}", new Object[]{Integer.valueOf(intVar), Long.valueOf(this.defaultDeleteDelaySeconds), Boolean.valueOf(this.routeBasedLoggingEnabled)});
        } catch (IOException e) {
            throw new RuntimeException("Failed to setup local filesystem instance", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String str, String str2, String str3, String str4, int i, String str5, int i2, int i3, String str6, LlapDaemonProtocolProtos.SignableVertexSpec signableVertexSpec, Token<JobTokenIdentifier> token, String str7, LlapTokenChecker.LlapTokenInfo llapTokenInfo, LlapNodeId llapNodeId, ContainerRunnerImpl.UgiPool ugiPool) throws IOException {
        ReentrantReadWriteLock dagLock = getDagLock(queryIdentifier);
        dagLock.readLock().lock();
        try {
            if (this.completedDagMap.contains(queryIdentifier)) {
                this.dagSpecificLocks.remove(queryIdentifier);
                String str8 = "Dag " + str3 + " already complete. Rejecting fragment [" + str5 + ", " + i2 + ", " + i3 + "]";
                LOG.info(str8);
                throw new RuntimeException(str8);
            }
            if (llapTokenInfo == null) {
                llapTokenInfo = LlapTokenChecker.getTokenInfo(this.clusterId);
            }
            boolean z = true;
            QueryInfo queryInfo = this.queryInfoMap.get(queryIdentifier);
            if (queryInfo == null) {
                if (UserGroupInformation.isSecurityEnabled()) {
                    Preconditions.checkNotNull(llapTokenInfo.userName);
                }
                queryInfo = new QueryInfo(queryIdentifier, str, str2, str3, str4, i, str6, getSourceCompletionMap(queryIdentifier), this.localDirsBase, this.localFs, llapTokenInfo.userName, llapTokenInfo.appId, llapNodeId, signableVertexSpec.getTokenIdentifier(), token, signableVertexSpec.getIsExternalSubmission(), ugiPool);
                QueryInfo putIfAbsent = this.queryInfoMap.putIfAbsent(queryIdentifier, queryInfo);
                if (putIfAbsent != null) {
                    queryInfo = putIfAbsent;
                } else {
                    z = false;
                }
            }
            if (z) {
                LlapTokenChecker.checkPermissions(llapTokenInfo, queryInfo.getTokenUserName(), queryInfo.getTokenAppId(), queryInfo.getQueryIdentifier());
            }
            this.queryIdentifierToHiveQueryId.putIfAbsent(queryIdentifier, str4);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Registering request for {} with the ShuffleHandler", queryIdentifier);
            }
            if (!signableVertexSpec.getIsExternalSubmission()) {
                ShuffleHandler.get().registerDag(str, i, token, str6, ShuffleHandler.get().isDirWatcherEnabled() ? queryInfo.getLocalDirs() : null);
            }
            QueryFragmentInfo registerFragment = queryInfo.registerFragment(str5, i2, i3, signableVertexSpec, str7);
            dagLock.readLock().unlock();
            return registerFragment;
        } catch (Throwable th) {
            dagLock.readLock().unlock();
            throw th;
        }
    }

    public void registerDag(String str, int i, String str2, Credentials credentials) {
        Token<JobTokenIdentifier> sessionToken = TokenCache.getSessionToken(credentials);
        ReentrantReadWriteLock dagLock = getDagLock(new QueryIdentifier(str, i));
        dagLock.readLock().lock();
        try {
            ShuffleHandler.get().registerDag(str, i, sessionToken, str2, null);
            dagLock.readLock().unlock();
        } catch (Throwable th) {
            dagLock.readLock().unlock();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void fragmentComplete(QueryFragmentInfo queryFragmentInfo) {
        QueryIdentifier queryIdentifier = queryFragmentInfo.getQueryInfo().getQueryIdentifier();
        QueryInfo queryInfo = this.queryInfoMap.get(queryIdentifier);
        if (queryInfo == null) {
            LOG.info("Ignoring fragmentComplete message for unknown query: {}", queryIdentifier);
        } else {
            queryInfo.unregisterFragment(queryFragmentInfo);
            handleFragmentCompleteExternalQuery(queryInfo);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<QueryFragmentInfo> getRegisteredFragments(QueryIdentifier queryIdentifier) {
        ReentrantReadWriteLock dagLock = getDagLock(queryIdentifier);
        dagLock.readLock().lock();
        try {
            QueryInfo queryInfo = this.queryInfoMap.get(queryIdentifier);
            if (queryInfo != null) {
                List<QueryFragmentInfo> registeredFragments = queryInfo.getRegisteredFragments();
                dagLock.readLock().unlock();
                return registeredFragments;
            }
            LOG.warn("Unknown query: Returning an empty list of fragments");
            List<QueryFragmentInfo> emptyList = Collections.emptyList();
            dagLock.readLock().unlock();
            return emptyList;
        } catch (Throwable th) {
            dagLock.readLock().unlock();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public QueryInfo queryComplete(QueryIdentifier queryIdentifier, long j, boolean z) throws IOException {
        if (j == -1) {
            j = this.defaultDeleteDelaySeconds;
        }
        ReentrantReadWriteLock dagLock = getDagLock(queryIdentifier);
        dagLock.writeLock().lock();
        try {
            QueryInfo checkPermissionsAndGetQuery = z ? this.queryInfoMap.get(queryIdentifier) : checkPermissionsAndGetQuery(queryIdentifier);
            if (checkPermissionsAndGetQuery == null) {
                LOG.warn("Ignoring query complete for unknown dag: {}", queryIdentifier);
                dagLock.writeLock().unlock();
                return null;
            }
            LOG.info("Processing queryComplete for queryIdentifier={}, isExternalQuery={}, with deleteDelay={} seconds", new Object[]{queryIdentifier, Boolean.valueOf(z), Long.valueOf(j)});
            this.queryInfoMap.remove(queryIdentifier);
            if (!z) {
                rememberCompletedDag(queryIdentifier);
                cleanupLocalDirs(checkPermissionsAndGetQuery, j);
                handleLogOnQueryCompletion(checkPermissionsAndGetQuery.getHiveQueryIdString(), checkPermissionsAndGetQuery.getDagIdString());
            } else if (checkPermissionsAndGetQuery.getRegisteredFragments().size() == 0) {
                LOG.debug("Queueing future cleanup for external queryId: {}", checkPermissionsAndGetQuery.getHiveQueryIdString());
                this.executorService.schedule((Callable) new ExternalQueryCleanerCallable(checkPermissionsAndGetQuery.getHiveQueryIdString(), checkPermissionsAndGetQuery.getDagIdString(), checkPermissionsAndGetQuery.getQueryIdentifier()), 1L, TimeUnit.MINUTES);
            } else if (LOG.isTraceEnabled()) {
                LOG.trace("NumRegisterFragments={}, Not queuing cleanup for external queryId={}", Integer.valueOf(checkPermissionsAndGetQuery.getRegisteredFragments().size()), checkPermissionsAndGetQuery.getHiveQueryIdString());
            }
            this.sourceCompletionMap.remove(queryIdentifier);
            String remove = this.queryIdentifierToHiveQueryId.remove(queryIdentifier);
            if (!z) {
                removeQuerySpecificLock(queryIdentifier);
            }
            if (remove != null) {
                ObjectCacheFactory.removeLlapQueryCache(remove);
            }
            return checkPermissionsAndGetQuery;
        } finally {
            dagLock.writeLock().unlock();
        }
    }

    private void cleanupLocalDirs(QueryInfo queryInfo, long j) {
        String[] localDirsNoCreate = queryInfo.getLocalDirsNoCreate();
        if (localDirsNoCreate != null) {
            for (String str : localDirsNoCreate) {
                cleanupDir(str, j);
                ShuffleHandler.get().unregisterDag(str, queryInfo.getAppIdString(), queryInfo.getDagIdentifier());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleLogOnQueryCompletion(String str, String str2) {
        if (this.routeBasedLoggingEnabled) {
            MDC.put("dagId", str2);
            MDC.put("queryId", str);
            try {
                LOG.error(QUERY_COMPLETE_MARKER, "Ignore this. Log line to interact with logger. Query complete: " + str + ", " + str2);
            } finally {
                MDC.clear();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void removeQuerySpecificLock(QueryIdentifier queryIdentifier) {
        this.dagSpecificLocks.remove(queryIdentifier);
    }

    public void rememberCompletedDag(QueryIdentifier queryIdentifier) {
        if (this.completedDagMap.add(queryIdentifier)) {
            this.executorService.schedule((Callable) new DagMapCleanerCallable(queryIdentifier), 1L, TimeUnit.HOURS);
        } else {
            LOG.warn("Couldn't add {} to completed dag set", queryIdentifier);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void registerSourceStateChange(QueryIdentifier queryIdentifier, String str, LlapDaemonProtocolProtos.SourceStateProto sourceStateProto) throws IOException {
        getSourceCompletionMap(queryIdentifier).put(str, sourceStateProto);
        QueryInfo checkPermissionsAndGetQuery = checkPermissionsAndGetQuery(queryIdentifier);
        if (checkPermissionsAndGetQuery != null) {
            checkPermissionsAndGetQuery.sourceStateUpdated(str);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ReentrantReadWriteLock getDagLockNoCreate(QueryIdentifier queryIdentifier) {
        return this.dagSpecificLocks.get(queryIdentifier);
    }

    private ReentrantReadWriteLock getDagLock(QueryIdentifier queryIdentifier) {
        this.lock.lock();
        try {
            ReentrantReadWriteLock reentrantReadWriteLock = this.dagSpecificLocks.get(queryIdentifier);
            if (reentrantReadWriteLock == null) {
                reentrantReadWriteLock = new ReentrantReadWriteLock();
                this.dagSpecificLocks.put(queryIdentifier, reentrantReadWriteLock);
            }
            return reentrantReadWriteLock;
        } finally {
            this.lock.unlock();
        }
    }

    private ConcurrentMap<String, LlapDaemonProtocolProtos.SourceStateProto> getSourceCompletionMap(QueryIdentifier queryIdentifier) {
        ConcurrentMap<String, LlapDaemonProtocolProtos.SourceStateProto> concurrentMap = this.sourceCompletionMap.get(queryIdentifier);
        if (concurrentMap == null) {
            ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
            ConcurrentMap<String, LlapDaemonProtocolProtos.SourceStateProto> putIfAbsent = this.sourceCompletionMap.putIfAbsent(queryIdentifier, concurrentHashMap);
            concurrentMap = putIfAbsent != null ? putIfAbsent : concurrentHashMap;
        }
        return concurrentMap;
    }

    public void serviceStart() {
        LOG.info(getName() + " started");
    }

    public void serviceStop() {
        this.executorService.shutdownNow();
        LOG.info(getName() + " stopped");
    }

    private void cleanupDir(String str, long j) {
        LOG.info("Scheduling deletion of {} after {} seconds", str, Long.valueOf(j));
        this.executorService.schedule((Callable) new FileCleanerCallable(str), j, TimeUnit.SECONDS);
    }

    private QueryInfo checkPermissionsAndGetQuery(QueryIdentifier queryIdentifier) throws IOException {
        QueryInfo queryInfo = this.queryInfoMap.get(queryIdentifier);
        if (queryInfo == null) {
            return null;
        }
        LlapTokenChecker.checkPermissions(this.clusterId, queryInfo.getTokenUserName(), queryInfo.getTokenAppId(), queryInfo.getQueryIdentifier());
        return queryInfo;
    }

    public boolean checkPermissionsForQuery(QueryIdentifier queryIdentifier) throws IOException {
        return checkPermissionsAndGetQuery(queryIdentifier) != null;
    }

    private void handleFragmentCompleteExternalQuery(QueryInfo queryInfo) {
        if (queryInfo.isExternalQuery()) {
            ReentrantReadWriteLock dagLock = getDagLock(queryInfo.getQueryIdentifier());
            if (dagLock == null) {
                LOG.warn("Ignoring fragment completion for unknown query: {}", queryInfo.getQueryIdentifier());
            }
            try {
                if (dagLock.writeLock().tryLock()) {
                    try {
                        if (queryInfo.getRegisteredFragments().size() == 0) {
                            queryComplete(queryInfo.getQueryIdentifier(), -1L, true);
                        } else if (LOG.isTraceEnabled()) {
                            LOG.trace("Not invoking queryComplete on fragmentComplete for {}, since there are known fragments. count={}", queryInfo.getHiveQueryIdString(), Integer.valueOf(queryInfo.getRegisteredFragments().size()));
                        }
                        dagLock.writeLock().unlock();
                    } catch (IOException e) {
                        LOG.error("Failed to process query complete for external submission: {}", queryInfo.getQueryIdentifier());
                        dagLock.writeLock().unlock();
                    }
                }
            } catch (Throwable th) {
                dagLock.writeLock().unlock();
                throw th;
            }
        }
    }
}
