/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.ql.exec.spark;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
import org.apache.hadoop.hive.ql.exec.spark.GroupByShuffler;
import org.apache.hadoop.hive.ql.exec.spark.HiveMapFunction;
import org.apache.hadoop.hive.ql.exec.spark.HiveReduceFunction;
import org.apache.hadoop.hive.ql.exec.spark.KryoSerializer;
import org.apache.hadoop.hive.ql.exec.spark.MapInput;
import org.apache.hadoop.hive.ql.exec.spark.MapTran;
import org.apache.hadoop.hive.ql.exec.spark.ReduceTran;
import org.apache.hadoop.hive.ql.exec.spark.ShuffleTran;
import org.apache.hadoop.hive.ql.exec.spark.SortByShuffler;
import org.apache.hadoop.hive.ql.exec.spark.SparkPlan;
import org.apache.hadoop.hive.ql.exec.spark.SparkReporter;
import org.apache.hadoop.hive.ql.exec.spark.SparkShuffler;
import org.apache.hadoop.hive.ql.exec.spark.SparkTran;
import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper;
import org.apache.hadoop.hive.ql.io.merge.MergeFileOutputFormat;
import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
import org.apache.hadoop.hive.ql.plan.SparkWork;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.stats.StatsFactory;
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;

public class SparkPlanGenerator {
    private static final String CLASS_NAME = SparkPlanGenerator.class.getName();
    private final PerfLogger perfLogger = SessionState.getPerfLogger();
    private static final Log LOG = LogFactory.getLog(SparkPlanGenerator.class);
    private JavaSparkContext sc;
    private final JobConf jobConf;
    private Context context;
    private Path scratchDir;
    private SparkReporter sparkReporter;
    private Map<BaseWork, BaseWork> cloneToWork;
    private final Map<BaseWork, SparkTran> workToTranMap;
    private final Map<BaseWork, SparkTran> workToParentWorkTranMap;
    private final Map<BaseWork, JobConf> workToJobConf;

    public SparkPlanGenerator(JavaSparkContext sc, Context context, JobConf jobConf, Path scratchDir, SparkReporter sparkReporter) {
        this.sc = sc;
        this.context = context;
        this.jobConf = jobConf;
        this.scratchDir = scratchDir;
        this.workToTranMap = new HashMap<BaseWork, SparkTran>();
        this.workToParentWorkTranMap = new HashMap<BaseWork, SparkTran>();
        this.sparkReporter = sparkReporter;
        this.workToJobConf = new HashMap<BaseWork, JobConf>();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public SparkPlan generate(SparkWork sparkWork) throws Exception {
        this.perfLogger.PerfLogBegin(CLASS_NAME, "SparkBuildPlan");
        SparkPlan sparkPlan = new SparkPlan();
        this.cloneToWork = sparkWork.getCloneToWork();
        this.workToTranMap.clear();
        this.workToParentWorkTranMap.clear();
        try {
            for (BaseWork work : sparkWork.getAllWork()) {
                this.perfLogger.PerfLogBegin(CLASS_NAME, "SparkCreateTran." + work.getName());
                SparkTran tran = this.generate(work);
                SparkTran parentTran = this.generateParentTran(sparkPlan, sparkWork, work);
                sparkPlan.addTran(tran);
                sparkPlan.connect(parentTran, tran);
                this.workToTranMap.put(work, tran);
                this.perfLogger.PerfLogEnd(CLASS_NAME, "SparkCreateTran." + work.getName());
            }
        }
        finally {
            Utilities.clearWorkMap();
        }
        this.perfLogger.PerfLogEnd(CLASS_NAME, "SparkBuildPlan");
        return sparkPlan;
    }

    private SparkTran generateParentTran(SparkPlan sparkPlan, SparkWork sparkWork, BaseWork work) throws Exception {
        SparkTran<WritableComparable, Writable, WritableComparable, Writable> result;
        BaseWork originalWork;
        if (this.cloneToWork.containsKey(work) && this.workToParentWorkTranMap.containsKey(originalWork = this.cloneToWork.get(work))) {
            return this.workToParentWorkTranMap.get(originalWork);
        }
        if (work instanceof MapWork) {
            result = this.generateMapInput(sparkPlan, (MapWork)work);
            sparkPlan.addTran(result);
        } else if (work instanceof ReduceWork) {
            List<BaseWork> parentWorks = sparkWork.getParents(work);
            result = this.generate(sparkPlan, sparkWork.getEdgeProperty(parentWorks.get(0), work), this.cloneToWork.containsKey(work));
            sparkPlan.addTran(result);
            for (BaseWork parentWork : parentWorks) {
                sparkPlan.connect(this.workToTranMap.get(parentWork), result);
            }
        } else {
            throw new IllegalStateException("AssertionError: expected either MapWork or ReduceWork, but found " + work.getClass().getName());
        }
        if (this.cloneToWork.containsKey(work)) {
            this.workToParentWorkTranMap.put(this.cloneToWork.get(work), result);
        }
        return result;
    }

    private Class<?> getInputFormat(JobConf jobConf, MapWork mWork) throws HiveException {
        Class<?> inputFormatClass;
        if (mWork.getInputformat() != null) {
            HiveConf.setVar((Configuration)jobConf, HiveConf.ConfVars.HIVEINPUTFORMAT, mWork.getInputformat());
        }
        String inpFormat = HiveConf.getVar((Configuration)jobConf, HiveConf.ConfVars.HIVEINPUTFORMAT);
        if (mWork.isUseBucketizedHiveInputFormat()) {
            inpFormat = BucketizedHiveInputFormat.class.getName();
        }
        try {
            inputFormatClass = Class.forName(inpFormat);
        }
        catch (ClassNotFoundException e) {
            String message = "Failed to load specified input format class:" + inpFormat;
            LOG.error((Object)message, (Throwable)e);
            throw new HiveException(message, e);
        }
        return inputFormatClass;
    }

    private MapInput generateMapInput(SparkPlan sparkPlan, MapWork mapWork) throws Exception {
        JobConf jobConf = this.cloneJobConf(mapWork);
        Class<?> ifClass = this.getInputFormat(jobConf, mapWork);
        JavaPairRDD hadoopRDD = this.sc.hadoopRDD(jobConf, ifClass, WritableComparable.class, Writable.class);
        MapInput result = new MapInput(sparkPlan, (JavaPairRDD<WritableComparable, Writable>)hadoopRDD, false);
        return result;
    }

    private ShuffleTran generate(SparkPlan sparkPlan, SparkEdgeProperty edge, boolean toCache) {
        Preconditions.checkArgument(!edge.isShuffleNone(), "AssertionError: SHUFFLE_NONE should only be used for UnionWork.");
        SparkShuffler shuffler = edge.isMRShuffle() ? new SortByShuffler(false) : (edge.isShuffleSort() ? new SortByShuffler(true) : new GroupByShuffler());
        return new ShuffleTran(sparkPlan, shuffler, edge.getNumPartitions(), toCache);
    }

    private SparkTran generate(BaseWork work) throws Exception {
        this.initStatsPublisher(work);
        JobConf newJobConf = this.cloneJobConf(work);
        this.checkSpecs(work, newJobConf);
        byte[] confBytes = KryoSerializer.serializeJobConf(newJobConf);
        if (work instanceof MapWork) {
            if (work instanceof MergeFileWork) {
                Path outputPath = ((MergeFileWork)work).getOutputDir();
                Path tempOutPath = Utilities.toTempPath(outputPath);
                FileSystem fs = outputPath.getFileSystem((Configuration)this.jobConf);
                try {
                    if (!fs.exists(tempOutPath)) {
                        fs.mkdirs(tempOutPath);
                    }
                }
                catch (IOException e) {
                    throw new RuntimeException("Can't make path " + outputPath + " : " + e.getMessage());
                }
            }
            MapTran mapTran = new MapTran();
            HiveMapFunction mapFunc = new HiveMapFunction(confBytes, this.sparkReporter);
            mapTran.setMapFunction(mapFunc);
            return mapTran;
        }
        if (work instanceof ReduceWork) {
            ReduceTran reduceTran = new ReduceTran();
            HiveReduceFunction reduceFunc = new HiveReduceFunction(confBytes, this.sparkReporter);
            reduceTran.setReduceFunction(reduceFunc);
            return reduceTran;
        }
        throw new IllegalStateException("AssertionError: expected either MapWork or ReduceWork, but found " + work.getClass().getName());
    }

    private void checkSpecs(BaseWork work, JobConf jc) throws Exception {
        Set<Operator<?>> opList = work.getAllOperators();
        for (Operator<?> op : opList) {
            if (!(op instanceof FileSinkOperator)) continue;
            ((FileSinkOperator)op).checkOutputSpecs(null, jc);
        }
    }

    private JobConf cloneJobConf(BaseWork work) throws Exception {
        if (this.workToJobConf.containsKey(work)) {
            return this.workToJobConf.get(work);
        }
        JobConf cloned = new JobConf((Configuration)this.jobConf);
        HiveConf.setVar((Configuration)cloned, HiveConf.ConfVars.PLAN, "");
        try {
            cloned.setPartitionerClass(Class.forName(HiveConf.getVar((Configuration)cloned, HiveConf.ConfVars.HIVEPARTITIONER)));
        }
        catch (ClassNotFoundException e) {
            String msg = "Could not find partitioner class: " + e.getMessage() + " which is specified by: " + HiveConf.ConfVars.HIVEPARTITIONER.varname;
            throw new IllegalArgumentException(msg, e);
        }
        if (work instanceof MapWork) {
            cloned.setBoolean("mapred.task.is.map", true);
            List<Path> inputPaths = Utilities.getInputPaths(cloned, (MapWork)work, this.scratchDir, this.context, false);
            Utilities.setInputPaths(cloned, inputPaths);
            Utilities.setMapWork((Configuration)cloned, (MapWork)work, this.scratchDir, false);
            Utilities.createTmpDirs((Configuration)cloned, (MapWork)work);
            if (work instanceof MergeFileWork) {
                MergeFileWork mergeFileWork = (MergeFileWork)work;
                cloned.set("mapred.mapper.class", MergeFileMapper.class.getName());
                cloned.set("mapred.input.format.class", mergeFileWork.getInputformat());
                cloned.setClass("mapred.output.format.class", MergeFileOutputFormat.class, FileOutputFormat.class);
            } else {
                cloned.set("mapred.mapper.class", ExecMapper.class.getName());
            }
            this.workToJobConf.put(work, cloned);
        } else if (work instanceof ReduceWork) {
            cloned.setBoolean("mapred.task.is.map", false);
            Utilities.setReduceWork((Configuration)cloned, (ReduceWork)work, this.scratchDir, false);
            Utilities.createTmpDirs((Configuration)cloned, (ReduceWork)work);
            cloned.set("mapred.reducer.class", ExecReducer.class.getName());
        }
        return cloned;
    }

    private void initStatsPublisher(BaseWork work) throws HiveException {
        StatsPublisher statsPublisher;
        StatsFactory factory;
        if (work.isGatheringStats() && (factory = StatsFactory.newFactory((Configuration)this.jobConf)) != null && !(statsPublisher = factory.getStatsPublisher()).init((Configuration)this.jobConf) && HiveConf.getBoolVar((Configuration)this.jobConf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) {
            throw new HiveException(ErrorMsg.STATSPUBLISHER_INITIALIZATION_ERROR.getErrorCodedMsg());
        }
    }
}

