/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.ql.scheduled;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.Closeable;
import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.ScheduledQueryKey;
import org.apache.hadoop.hive.metastore.api.ScheduledQueryPollResponse;
import org.apache.hadoop.hive.metastore.api.ScheduledQueryProgressInfo;
import org.apache.hadoop.hive.ql.DriverFactory;
import org.apache.hadoop.hive.ql.IDriver;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
import org.apache.hadoop.hive.ql.scheduled.MetastoreBasedScheduledQueryService;
import org.apache.hadoop.hive.ql.scheduled.ScheduledQueryExecutionContext;
import org.apache.hadoop.hive.ql.security.SessionStateUserAuthenticator;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ScheduledQueryExecutionService
implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(ScheduledQueryExecutionService.class);
    private static ScheduledQueryExecutionService INSTANCE = null;
    private ScheduledQueryExecutionContext context;
    private AtomicInteger forcedScheduleCheckCounter = new AtomicInteger();
    private AtomicInteger usedExecutors = new AtomicInteger(0);
    private Queue<ScheduledQueryExecutor> runningExecutors = new ConcurrentLinkedQueue<ScheduledQueryExecutor>();

    public static ScheduledQueryExecutionService startScheduledQueryExecutorService(HiveConf inputConf) {
        HiveConf conf = new HiveConf(inputConf);
        MetastoreBasedScheduledQueryService qService = new MetastoreBasedScheduledQueryService(conf);
        ExecutorService executor = ScheduledQueryExecutionService.buildExecutor(conf);
        ScheduledQueryExecutionContext ctx = new ScheduledQueryExecutionContext(executor, conf, qService);
        return ScheduledQueryExecutionService.startScheduledQueryExecutorService(ctx);
    }

    private static ExecutorService buildExecutor(HiveConf conf) {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Scheduled Query Thread %d").build();
        int systemThreads = 2;
        int minServiceThreads = 1;
        int maxServiceThreads = conf.getIntVar(HiveConf.ConfVars.HIVE_SCHEDULED_QUERIES_MAX_EXECUTORS);
        return new ThreadPoolExecutor(systemThreads + minServiceThreads, systemThreads + maxServiceThreads, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static ScheduledQueryExecutionService startScheduledQueryExecutorService(ScheduledQueryExecutionContext ctx) {
        Class<ScheduledQueryExecutionService> clazz = ScheduledQueryExecutionService.class;
        synchronized (ScheduledQueryExecutionService.class) {
            if (INSTANCE != null) {
                throw new IllegalStateException("There is already a ScheduledQueryExecutionService in service; check it and close it explicitly if neccessary");
            }
            INSTANCE = new ScheduledQueryExecutionService(ctx);
            // ** MonitorExit[var1_1] (shouldn't be in output)
            return INSTANCE;
        }
    }

    private ScheduledQueryExecutionService(ScheduledQueryExecutionContext ctx) {
        this.context = ctx;
        ctx.executor.submit(new ScheduledQueryPoller());
        ctx.executor.submit(new ProgressReporter());
    }

    static boolean isTerminalState(org.apache.hadoop.hive.metastore.api.QueryState state) {
        return state == org.apache.hadoop.hive.metastore.api.QueryState.FINISHED || state == org.apache.hadoop.hive.metastore.api.QueryState.FAILED;
    }

    private void executorStarted(ScheduledQueryExecutor executor) {
        this.runningExecutors.add(executor);
        this.usedExecutors.incrementAndGet();
    }

    private void executorStopped(ScheduledQueryExecutor executor) {
        this.usedExecutors.decrementAndGet();
        this.runningExecutors.remove(executor);
        ScheduledQueryExecutionService.forceScheduleCheck();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() throws IOException {
        Class<ScheduledQueryExecutionService> clazz = ScheduledQueryExecutionService.class;
        synchronized (ScheduledQueryExecutionService.class) {
            if (INSTANCE == null || INSTANCE != this) {
                throw new IllegalStateException("The current ScheduledQueryExecutionService INSTANCE is invalid");
            }
            this.context.executor.shutdown();
            ScheduledQueryExecutionService.forceScheduleCheck();
            try {
                this.context.executor.awaitTermination(1L, TimeUnit.SECONDS);
                this.context.executor.shutdownNow();
                INSTANCE = null;
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return;
        }
    }

    public static void forceScheduleCheck() {
        ScheduledQueryExecutionService.INSTANCE.forcedScheduleCheckCounter.incrementAndGet();
    }

    @VisibleForTesting
    public static int getForcedScheduleCheckCount() {
        return ScheduledQueryExecutionService.INSTANCE.forcedScheduleCheckCounter.get();
    }

    class ProgressReporter
    implements Runnable {
        ProgressReporter() {
        }

        @Override
        public void run() {
            try (NamedThread namedThread = new NamedThread("Scheduled Query Progress Reporter");){
                while (!((ScheduledQueryExecutionService)ScheduledQueryExecutionService.this).context.executor.isShutdown()) {
                    try {
                        Thread.sleep(ScheduledQueryExecutionService.this.context.getProgressReporterSleepTime());
                    }
                    catch (InterruptedException e) {
                        LOG.warn("interrupt discarded");
                    }
                    try {
                        for (ScheduledQueryExecutor worker : ScheduledQueryExecutionService.this.runningExecutors) {
                            worker.reportQueryProgress();
                        }
                    }
                    catch (Exception e) {
                        LOG.error("ProgressReporter encountered exception ", (Throwable)e);
                    }
                }
            }
        }
    }

    class ScheduledQueryExecutor
    implements Runnable {
        private ScheduledQueryProgressInfo info;
        private final ScheduledQueryPollResponse pollResponse;

        public ScheduledQueryExecutor(ScheduledQueryPollResponse pollResponse) {
            this.pollResponse = pollResponse;
            ScheduledQueryExecutionService.this.executorStarted(this);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try (NamedThread namedThread = new NamedThread(this.getThreadName());){
                this.processQuery(this.pollResponse);
            }
            finally {
                ScheduledQueryExecutionService.this.executorStopped(this);
            }
        }

        private String getThreadName() {
            return String.format("Scheduled Query Executor(schedule:%s, execution_id:%d)", this.pollResponse.getScheduleKey().getScheduleName(), this.pollResponse.getExecutionId());
        }

        public synchronized void reportQueryProgress() {
            if (this.info != null) {
                LOG.info("Reporting query progress of {} as {} err:{}", new Object[]{this.info.getScheduledExecutionId(), this.info.getState(), this.info.getErrorMessage()});
                ((ScheduledQueryExecutionService)ScheduledQueryExecutionService.this).context.schedulerService.scheduledQueryProgress(this.info);
                if (ScheduledQueryExecutionService.isTerminalState(this.info.getState())) {
                    this.info = null;
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void processQuery(ScheduledQueryPollResponse q) {
            LOG.info("Executing schq:{}, executionId: {}", (Object)q.getScheduleKey().getScheduleName(), (Object)q.getExecutionId());
            this.info = new ScheduledQueryProgressInfo();
            this.info.setScheduledExecutionId(this.pollResponse.getExecutionId());
            this.info.setState(org.apache.hadoop.hive.metastore.api.QueryState.EXECUTING);
            this.info.setExecutorQueryId(this.buildExecutorQueryId(""));
            SessionState state = null;
            try {
                HiveConf conf = new HiveConf(((ScheduledQueryExecutionService)ScheduledQueryExecutionService.this).context.conf);
                conf.set("hive.query.exclusive.lock", this.lockNameFor(q.getScheduleKey()));
                conf.setVar(HiveConf.ConfVars.HIVE_AUTHENTICATOR_MANAGER, SessionStateUserAuthenticator.class.getName());
                conf.set("scheduled.query.namespace", q.getScheduleKey().getClusterNamespace());
                conf.set("scheduled.query.schedulename", q.getScheduleKey().getScheduleName());
                conf.set("scheduled.query.user", q.getUser());
                conf.set("scheduled.query.executionid", Long.toString(q.getExecutionId()));
                conf.unset(HiveConf.ConfVars.HIVESESSIONID.varname);
                state = new SessionState(conf, q.getUser());
                state.setIsHiveServerQuery(true);
                SessionState.start(state);
                this.reportQueryProgress();
                QueryState queryState = DriverFactory.getNewQueryState(conf);
                try (IDriver driver = DriverFactory.newDriver(queryState, q.getUser(), null);){
                    this.info.setExecutorQueryId(this.buildExecutorQueryId(driver));
                    this.reportQueryProgress();
                    driver.run(q.getQuery());
                    this.info.setState(org.apache.hadoop.hive.metastore.api.QueryState.FINISHED);
                }
            }
            catch (Throwable t) {
                this.info.setErrorMessage(this.getErrorStringForException(t));
                this.info.setState(org.apache.hadoop.hive.metastore.api.QueryState.FAILED);
            }
            finally {
                if (state != null) {
                    try {
                        state.close();
                    }
                    catch (Throwable conf) {}
                }
                this.reportQueryProgress();
            }
        }

        private String buildExecutorQueryId(IDriver driver) {
            return this.buildExecutorQueryId(driver.getQueryState().getQueryId());
        }

        private String buildExecutorQueryId(String queryId) {
            return String.format("%s/%s", ((ScheduledQueryExecutionService)ScheduledQueryExecutionService.this).context.executorHostName, queryId);
        }

        private String lockNameFor(ScheduledQueryKey scheduleKey) {
            return String.format("scheduled_query_%s_%s", scheduleKey.getClusterNamespace(), scheduleKey.getScheduleName());
        }

        private String getErrorStringForException(Throwable t) {
            if (t instanceof CommandProcessorException) {
                CommandProcessorException cpr = (CommandProcessorException)t;
                return String.format("%s", cpr.getErrorMessage());
            }
            return String.format("%s: %s", t.getClass().getName(), t.getMessage());
        }
    }

    class ScheduledQueryPoller
    implements Runnable {
        ScheduledQueryPoller() {
        }

        @Override
        public void run() {
            block20: {
                NamedThread namedThread = new NamedThread("Scheduled Query Poller");
                Throwable throwable = null;
                block13: while (true) {
                    try {
                        while (!((ScheduledQueryExecutionService)ScheduledQueryExecutionService.this).context.executor.isShutdown()) {
                            int origResets = ScheduledQueryExecutionService.this.forcedScheduleCheckCounter.get();
                            if (ScheduledQueryExecutionService.this.usedExecutors.get() < ScheduledQueryExecutionService.this.context.getNumberOfExecutors()) {
                                try {
                                    ScheduledQueryPollResponse q = ((ScheduledQueryExecutionService)ScheduledQueryExecutionService.this).context.schedulerService.scheduledQueryPoll();
                                    if (q.isSetExecutionId()) {
                                        ((ScheduledQueryExecutionService)ScheduledQueryExecutionService.this).context.executor.submit(new ScheduledQueryExecutor(q));
                                        continue;
                                    }
                                }
                                catch (Throwable t) {
                                    LOG.error("Unexpected exception during scheduled query submission", t);
                                }
                            }
                            try {
                                this.sleep(ScheduledQueryExecutionService.this.context.getIdleSleepTime(), origResets);
                                continue block13;
                            }
                            catch (InterruptedException e) {
                                LOG.warn("interrupt discarded");
                            }
                        }
                        break block20;
                    }
                    catch (Throwable throwable2) {
                        throwable = throwable2;
                        throw throwable2;
                    }
                }
                finally {
                    if (namedThread != null) {
                        if (throwable != null) {
                            try {
                                namedThread.close();
                            }
                            catch (Throwable throwable3) {
                                throwable.addSuppressed(throwable3);
                            }
                        } else {
                            namedThread.close();
                        }
                    }
                }
            }
        }

        private void sleep(long idleSleepTime, int origResets) throws InterruptedException {
            long checkIntrvalMs = 1000L;
            for (long i = 0L; i < idleSleepTime; i += checkIntrvalMs) {
                Thread.sleep(checkIntrvalMs);
                if (ScheduledQueryExecutionService.this.forcedScheduleCheckCounter.get() == origResets) continue;
                return;
            }
        }
    }

    static class NamedThread
    implements Closeable {
        private final String oldName;

        public NamedThread(String newName) {
            LOG.info("Starting {} thread - renaming accordingly.", (Object)newName);
            this.oldName = Thread.currentThread().getName();
            Thread.currentThread().setName(newName);
        }

        @Override
        public void close() {
            LOG.info("Thread finished; renaming back to: {}", (Object)this.oldName);
            Thread.currentThread().setName(this.oldName);
        }
    }
}

