/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.ql.exec.spark.status.impl;

import java.io.Serializable;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics;
import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hive.spark.client.Job;
import org.apache.hive.spark.client.JobContext;
import org.apache.hive.spark.client.JobHandle;
import org.apache.hive.spark.client.MetricsCollection;
import org.apache.hive.spark.client.SparkClient;
import org.apache.hive.spark.client.metrics.Metrics;
import org.apache.hive.spark.client.metrics.ShuffleReadMetrics;
import org.apache.hive.spark.counter.SparkCounters;
import org.apache.spark.JobExecutionStatus;
import org.apache.spark.SparkJobInfo;
import org.apache.spark.SparkStageInfo;
import org.apache.spark.api.java.JavaFutureAction;

public class RemoteSparkJobStatus
implements SparkJobStatus {
    private static final Log LOG = LogFactory.getLog((String)RemoteSparkJobStatus.class.getName());
    private final SparkClient sparkClient;
    private final JobHandle<Serializable> jobHandle;
    private final transient long sparkClientTimeoutInSeconds;

    public RemoteSparkJobStatus(SparkClient sparkClient, JobHandle<Serializable> jobHandle, long timeoutInSeconds) {
        this.sparkClient = sparkClient;
        this.jobHandle = jobHandle;
        this.sparkClientTimeoutInSeconds = timeoutInSeconds;
    }

    @Override
    public int getJobId() {
        return this.jobHandle.getSparkJobIds().size() == 1 ? this.jobHandle.getSparkJobIds().get(0) : -1;
    }

    @Override
    public JobExecutionStatus getState() throws HiveException {
        SparkJobInfo sparkJobInfo = this.getSparkJobInfo();
        return sparkJobInfo != null ? sparkJobInfo.status() : null;
    }

    @Override
    public int[] getStageIds() throws HiveException {
        SparkJobInfo sparkJobInfo = this.getSparkJobInfo();
        return sparkJobInfo != null ? sparkJobInfo.stageIds() : new int[]{};
    }

    @Override
    public Map<String, SparkStageProgress> getSparkStageProgress() throws HiveException {
        HashMap<String, SparkStageProgress> stageProgresses = new HashMap<String, SparkStageProgress>();
        for (int stageId : this.getStageIds()) {
            SparkStageInfo sparkStageInfo = this.getSparkStageInfo(stageId);
            if (sparkStageInfo == null || sparkStageInfo.name() == null) continue;
            int runningTaskCount = sparkStageInfo.numActiveTasks();
            int completedTaskCount = sparkStageInfo.numCompletedTasks();
            int failedTaskCount = sparkStageInfo.numFailedTasks();
            int totalTaskCount = sparkStageInfo.numTasks();
            SparkStageProgress sparkStageProgress = new SparkStageProgress(totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount);
            stageProgresses.put(String.valueOf(sparkStageInfo.stageId()) + "_" + sparkStageInfo.currentAttemptId(), sparkStageProgress);
        }
        return stageProgresses;
    }

    @Override
    public SparkCounters getCounter() {
        return this.jobHandle.getSparkCounters();
    }

    @Override
    public SparkStatistics getSparkStatistics() {
        MetricsCollection metricsCollection = this.jobHandle.getMetrics();
        if (metricsCollection == null || this.getCounter() == null) {
            return null;
        }
        SparkStatisticsBuilder sparkStatisticsBuilder = new SparkStatisticsBuilder();
        sparkStatisticsBuilder.add(this.getCounter());
        String jobIdentifier = "Spark Job[" + this.jobHandle.getClientJobId() + "] Metrics";
        Map<String, Long> flatJobMetric = this.extractMetrics(metricsCollection);
        for (Map.Entry<String, Long> entry : flatJobMetric.entrySet()) {
            sparkStatisticsBuilder.add(jobIdentifier, entry.getKey(), Long.toString(entry.getValue()));
        }
        return sparkStatisticsBuilder.build();
    }

    @Override
    public void cleanup() {
    }

    private SparkJobInfo getSparkJobInfo() throws HiveException {
        Integer sparkJobId;
        Integer n = sparkJobId = this.jobHandle.getSparkJobIds().size() == 1 ? this.jobHandle.getSparkJobIds().get(0) : null;
        if (sparkJobId == null) {
            return null;
        }
        Future<SparkJobInfo> getJobInfo = this.sparkClient.run(new GetJobInfoJob(this.jobHandle.getClientJobId(), sparkJobId));
        try {
            return getJobInfo.get(this.sparkClientTimeoutInSeconds, TimeUnit.SECONDS);
        }
        catch (Exception e) {
            LOG.warn((Object)"Failed to get job info.", (Throwable)e);
            throw new HiveException(e);
        }
    }

    private SparkStageInfo getSparkStageInfo(int stageId) {
        Future<SparkStageInfo> getStageInfo = this.sparkClient.run(new GetStageInfoJob(stageId));
        try {
            return getStageInfo.get(this.sparkClientTimeoutInSeconds, TimeUnit.SECONDS);
        }
        catch (Throwable t) {
            LOG.warn((Object)"Error getting stage info", t);
            return null;
        }
    }

    public JobHandle.State getRemoteJobState() {
        return this.jobHandle.getState();
    }

    private Map<String, Long> extractMetrics(MetricsCollection metricsCollection) {
        LinkedHashMap<String, Long> results = new LinkedHashMap<String, Long>();
        Metrics allMetrics = metricsCollection.getAllMetrics();
        results.put("ExecutorDeserializeTime", allMetrics.executorDeserializeTime);
        results.put("ExecutorRunTime", allMetrics.executorRunTime);
        results.put("ResultSize", allMetrics.resultSize);
        results.put("JvmGCTime", allMetrics.jvmGCTime);
        results.put("ResultSerializationTime", allMetrics.resultSerializationTime);
        results.put("MemoryBytesSpilled", allMetrics.memoryBytesSpilled);
        results.put("DiskBytesSpilled", allMetrics.diskBytesSpilled);
        if (allMetrics.inputMetrics != null) {
            results.put("BytesRead", allMetrics.inputMetrics.bytesRead);
        }
        if (allMetrics.shuffleReadMetrics != null) {
            ShuffleReadMetrics shuffleReadMetrics = allMetrics.shuffleReadMetrics;
            long rbf = shuffleReadMetrics.remoteBlocksFetched;
            long lbf = shuffleReadMetrics.localBlocksFetched;
            results.put("RemoteBlocksFetched", rbf);
            results.put("LocalBlocksFetched", lbf);
            results.put("TotalBlocksFetched", lbf + rbf);
            results.put("FetchWaitTime", shuffleReadMetrics.fetchWaitTime);
            results.put("RemoteBytesRead", shuffleReadMetrics.remoteBytesRead);
        }
        if (allMetrics.shuffleWriteMetrics != null) {
            results.put("ShuffleBytesWritten", allMetrics.shuffleWriteMetrics.shuffleBytesWritten);
            results.put("ShuffleWriteTime", allMetrics.shuffleWriteMetrics.shuffleWriteTime);
        }
        return results;
    }

    private static SparkJobInfo getDefaultJobInfo(final Integer jobId, final JobExecutionStatus status) {
        return new SparkJobInfo(){

            public int jobId() {
                return jobId == null ? -1 : jobId;
            }

            public int[] stageIds() {
                return new int[0];
            }

            public JobExecutionStatus status() {
                return status;
            }
        };
    }

    private static class GetStageInfoJob
    implements Job<SparkStageInfo> {
        private final int stageId;

        private GetStageInfoJob() {
            this(-1);
        }

        GetStageInfoJob(int stageId) {
            this.stageId = stageId;
        }

        @Override
        public SparkStageInfo call(JobContext jc) throws Exception {
            return jc.sc().statusTracker().getStageInfo(this.stageId);
        }
    }

    private static class GetJobInfoJob
    implements Job<SparkJobInfo> {
        private final String clientJobId;
        private final int sparkJobId;

        private GetJobInfoJob() {
            this(null, -1);
        }

        GetJobInfoJob(String clientJobId, int sparkJobId) {
            this.clientJobId = clientJobId;
            this.sparkJobId = sparkJobId;
        }

        @Override
        public SparkJobInfo call(JobContext jc) throws Exception {
            JavaFutureAction<?> futureAction;
            List<JavaFutureAction<?>> list;
            SparkJobInfo jobInfo = jc.sc().statusTracker().getJobInfo(this.sparkJobId);
            if (jobInfo == null && (list = jc.getMonitoredJobs().get(this.clientJobId)) != null && list.size() == 1 && (futureAction = list.get(0)).isDone()) {
                boolean futureSucceed = true;
                try {
                    futureAction.get();
                }
                catch (Exception e) {
                    LOG.error((Object)("Failed to run job " + this.sparkJobId), (Throwable)e);
                    futureSucceed = false;
                }
                jobInfo = RemoteSparkJobStatus.getDefaultJobInfo(this.sparkJobId, futureSucceed ? JobExecutionStatus.SUCCEEDED : JobExecutionStatus.FAILED);
            }
            if (jobInfo == null) {
                jobInfo = RemoteSparkJobStatus.getDefaultJobInfo(this.sparkJobId, JobExecutionStatus.UNKNOWN);
            }
            return jobInfo;
        }
    }
}

