/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.ql.hooks;

import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.exec.ExplainTask;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
import org.apache.hadoop.hive.ql.hooks.HookContext;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.json.JSONObject;
import org.spark_project.guava.util.concurrent.ThreadFactoryBuilder;

public class ATSHook
implements ExecuteWithHookContext {
    private static final Log LOG = LogFactory.getLog((String)ATSHook.class.getName());
    private static final Object LOCK = new Object();
    private static ExecutorService executor;
    private static TimelineClient timelineClient;
    private static final int WAIT_TIME = 3;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ATSHook() {
        Object object = LOCK;
        synchronized (object) {
            if (executor == null) {
                executor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ATS Logger %d").build());
                YarnConfiguration yarnConf = new YarnConfiguration();
                timelineClient = TimelineClient.createTimelineClient();
                timelineClient.init((Configuration)yarnConf);
                timelineClient.start();
                Runtime.getRuntime().addShutdownHook(new Thread(){

                    @Override
                    public void run() {
                        try {
                            executor.shutdown();
                            executor.awaitTermination(3L, TimeUnit.SECONDS);
                            executor = null;
                        }
                        catch (InterruptedException interruptedException) {
                            // empty catch block
                        }
                        timelineClient.stop();
                    }
                });
            }
        }
        LOG.info((Object)"Created ATS Hook");
    }

    @Override
    public void run(final HookContext hookContext) throws Exception {
        final long currentTime = System.currentTimeMillis();
        final HiveConf conf = new HiveConf(hookContext.getConf());
        executor.submit(new Runnable(){

            @Override
            public void run() {
                try {
                    int numTezJobs;
                    QueryPlan plan = hookContext.getQueryPlan();
                    if (plan == null) {
                        return;
                    }
                    String queryId = plan.getQueryId();
                    long queryStartTime = plan.getQueryStartTime();
                    String user = hookContext.getUgi().getUserName();
                    int numMrJobs = Utilities.getMRTasks(plan.getRootTasks()).size();
                    if (numMrJobs + (numTezJobs = Utilities.getTezTasks(plan.getRootTasks()).size()) <= 0) {
                        return;
                    }
                    switch (hookContext.getHookType()) {
                        case PRE_EXEC_HOOK: {
                            ExplainTask explain = new ExplainTask();
                            explain.initialize(conf, plan, null);
                            String query = plan.getQueryStr();
                            ArrayList<Task<?>> rootTasks = plan.getRootTasks();
                            JSONObject explainPlan = explain.getJSONPlan(null, null, rootTasks, plan.getFetchTask(), true, false, false);
                            ATSHook.this.fireAndForget(conf, ATSHook.this.createPreHookEvent(queryId, query, explainPlan, queryStartTime, user, numMrJobs, numTezJobs));
                            break;
                        }
                        case POST_EXEC_HOOK: {
                            ATSHook.this.fireAndForget(conf, ATSHook.this.createPostHookEvent(queryId, currentTime, user, true));
                            break;
                        }
                        case ON_FAILURE_HOOK: {
                            ATSHook.this.fireAndForget(conf, ATSHook.this.createPostHookEvent(queryId, currentTime, user, false));
                            break;
                        }
                    }
                }
                catch (Exception e) {
                    LOG.info((Object)("Failed to submit plan to ATS: " + StringUtils.stringifyException((Throwable)e)));
                }
            }
        });
    }

    TimelineEntity createPreHookEvent(String queryId, String query, JSONObject explainPlan, long startTime, String user, int numMrJobs, int numTezJobs) throws Exception {
        JSONObject queryObj = new JSONObject();
        queryObj.put("queryText", (Object)query);
        queryObj.put("queryPlan", (Object)explainPlan);
        LOG.info((Object)("Received pre-hook notification for :" + queryId));
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Otherinfo: " + queryObj.toString()));
        }
        TimelineEntity atsEntity = new TimelineEntity();
        atsEntity.setEntityId(queryId);
        atsEntity.setEntityType(EntityTypes.HIVE_QUERY_ID.name());
        atsEntity.addPrimaryFilter(PrimaryFilterTypes.user.name(), (Object)user);
        TimelineEvent startEvt = new TimelineEvent();
        startEvt.setEventType(EventTypes.QUERY_SUBMITTED.name());
        startEvt.setTimestamp(startTime);
        atsEntity.addEvent(startEvt);
        atsEntity.addOtherInfo(OtherInfoTypes.QUERY.name(), (Object)queryObj.toString());
        atsEntity.addOtherInfo(OtherInfoTypes.TEZ.name(), (Object)(numTezJobs > 0 ? 1 : 0));
        atsEntity.addOtherInfo(OtherInfoTypes.MAPRED.name(), (Object)(numMrJobs > 0 ? 1 : 0));
        return atsEntity;
    }

    TimelineEntity createPostHookEvent(String queryId, long stopTime, String user, boolean success) {
        LOG.info((Object)("Received post-hook notification for :" + queryId));
        TimelineEntity atsEntity = new TimelineEntity();
        atsEntity.setEntityId(queryId);
        atsEntity.setEntityType(EntityTypes.HIVE_QUERY_ID.name());
        atsEntity.addPrimaryFilter(PrimaryFilterTypes.user.name(), (Object)user);
        TimelineEvent stopEvt = new TimelineEvent();
        stopEvt.setEventType(EventTypes.QUERY_COMPLETED.name());
        stopEvt.setTimestamp(stopTime);
        atsEntity.addEvent(stopEvt);
        atsEntity.addOtherInfo(OtherInfoTypes.STATUS.name(), (Object)success);
        return atsEntity;
    }

    synchronized void fireAndForget(Configuration conf, TimelineEntity entity) throws Exception {
        timelineClient.putEntities(new TimelineEntity[]{entity});
    }

    private static enum PrimaryFilterTypes {
        user;

    }

    private static enum OtherInfoTypes {
        QUERY,
        STATUS,
        TEZ,
        MAPRED;

    }

    private static enum EventTypes {
        QUERY_SUBMITTED,
        QUERY_COMPLETED;

    }

    private static enum EntityTypes {
        HIVE_QUERY_ID;

    }
}

