/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.ql.exec.spark;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
import org.apache.hadoop.hive.ql.exec.spark.GroupByShuffler;
import org.apache.hadoop.hive.ql.exec.spark.HiveMapFunction;
import org.apache.hadoop.hive.ql.exec.spark.HiveReduceFunction;
import org.apache.hadoop.hive.ql.exec.spark.KryoSerializer;
import org.apache.hadoop.hive.ql.exec.spark.MapInput;
import org.apache.hadoop.hive.ql.exec.spark.MapTran;
import org.apache.hadoop.hive.ql.exec.spark.ReduceTran;
import org.apache.hadoop.hive.ql.exec.spark.ShuffleKryoSerializer;
import org.apache.hadoop.hive.ql.exec.spark.ShuffleTran;
import org.apache.hadoop.hive.ql.exec.spark.SortByShuffler;
import org.apache.hadoop.hive.ql.exec.spark.SparkPlan;
import org.apache.hadoop.hive.ql.exec.spark.SparkReporter;
import org.apache.hadoop.hive.ql.exec.spark.SparkShuffler;
import org.apache.hadoop.hive.ql.exec.spark.SparkTran;
import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper;
import org.apache.hadoop.hive.ql.io.merge.MergeFileOutputFormat;
import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
import org.apache.hadoop.hive.ql.plan.SparkWork;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.stats.StatsCollectionContext;
import org.apache.hadoop.hive.ql.stats.StatsFactory;
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.util.CallSite;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkPlanGenerator {
    private static final String CLASS_NAME = SparkPlanGenerator.class.getName();
    private static final Logger LOG = LoggerFactory.getLogger(SparkPlanGenerator.class);
    private final PerfLogger perfLogger = SessionState.getPerfLogger();
    private final JavaSparkContext sc;
    private final JobConf jobConf;
    private final Context context;
    private final Path scratchDir;
    private final SparkReporter sparkReporter;
    private Map<BaseWork, BaseWork> cloneToWork;
    private final Map<BaseWork, SparkTran> workToTranMap;
    private final Map<BaseWork, SparkTran> workToParentWorkTranMap;
    private final Map<BaseWork, JobConf> workToJobConf;
    private final org.apache.spark.serializer.KryoSerializer shuffleSerializer;

    public SparkPlanGenerator(JavaSparkContext sc, Context context, JobConf jobConf, Path scratchDir, SparkReporter sparkReporter) {
        this.sc = sc;
        this.context = context;
        this.jobConf = jobConf;
        this.scratchDir = scratchDir;
        this.workToTranMap = new HashMap<BaseWork, SparkTran>();
        this.workToParentWorkTranMap = new HashMap<BaseWork, SparkTran>();
        this.sparkReporter = sparkReporter;
        this.workToJobConf = new HashMap<BaseWork, JobConf>();
        this.shuffleSerializer = HiveConf.getBoolVar((Configuration)jobConf, HiveConf.ConfVars.SPARK_OPTIMIZE_SHUFFLE_SERDE) ? ShuffleKryoSerializer.getInstance(sc, (Configuration)jobConf) : null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public SparkPlan generate(SparkWork sparkWork) throws Exception {
        this.perfLogger.PerfLogBegin(CLASS_NAME, "SparkBuildPlan");
        SparkPlan sparkPlan = new SparkPlan(this.sc.sc());
        this.cloneToWork = sparkWork.getCloneToWork();
        this.workToTranMap.clear();
        this.workToParentWorkTranMap.clear();
        try {
            for (BaseWork work : sparkWork.getAllWork()) {
                this.perfLogger.PerfLogBegin(CLASS_NAME, "SparkCreateTran." + work.getName());
                SparkTran tran = this.generate(work, sparkWork);
                SparkTran parentTran = this.generateParentTran(sparkPlan, sparkWork, work);
                sparkPlan.addTran(tran);
                sparkPlan.connect(parentTran, tran);
                this.workToTranMap.put(work, tran);
                this.perfLogger.PerfLogEnd(CLASS_NAME, "SparkCreateTran." + work.getName());
            }
        }
        finally {
            Utilities.clearWorkMap((Configuration)this.jobConf);
        }
        this.perfLogger.PerfLogEnd(CLASS_NAME, "SparkBuildPlan");
        return sparkPlan;
    }

    private SparkTran generateParentTran(SparkPlan sparkPlan, SparkWork sparkWork, BaseWork work) throws Exception {
        SparkTran<WritableComparable, Writable, WritableComparable, Writable> result;
        BaseWork originalWork;
        if (this.cloneToWork.containsKey(work) && this.workToParentWorkTranMap.containsKey(originalWork = this.cloneToWork.get(work))) {
            return this.workToParentWorkTranMap.get(originalWork);
        }
        if (work instanceof MapWork) {
            result = this.generateMapInput(sparkPlan, (MapWork)work);
            sparkPlan.addTran(result);
        } else if (work instanceof ReduceWork) {
            boolean toCache = this.cloneToWork.containsKey(work);
            List<BaseWork> parentWorks = sparkWork.getParents(work);
            SparkEdgeProperty sparkEdgeProperty = sparkWork.getEdgeProperty(parentWorks.get(0), work);
            result = this.generate(sparkPlan, sparkEdgeProperty, toCache, work.getName());
            sparkPlan.addTran(result);
            for (BaseWork parentWork : parentWorks) {
                sparkPlan.connect(this.workToTranMap.get(parentWork), result);
            }
        } else {
            throw new IllegalStateException("AssertionError: expected either MapWork or ReduceWork, but found " + work.getClass().getName());
        }
        if (this.cloneToWork.containsKey(work)) {
            this.workToParentWorkTranMap.put(this.cloneToWork.get(work), result);
        }
        return result;
    }

    private Class<?> getInputFormat(JobConf jobConf, MapWork mWork) throws HiveException {
        Class inputFormatClass;
        if (mWork.getInputformat() != null) {
            HiveConf.setVar((Configuration)jobConf, HiveConf.ConfVars.HIVEINPUTFORMAT, mWork.getInputformat());
        }
        String inpFormat = HiveConf.getVar((Configuration)jobConf, HiveConf.ConfVars.HIVEINPUTFORMAT);
        if (mWork.isUseBucketizedHiveInputFormat()) {
            inpFormat = BucketizedHiveInputFormat.class.getName();
        }
        try {
            inputFormatClass = JavaUtils.loadClass(inpFormat);
        }
        catch (ClassNotFoundException e) {
            String message = "Failed to load specified input format class:" + inpFormat;
            LOG.error(message, (Throwable)e);
            throw new HiveException(message, e);
        }
        return inputFormatClass;
    }

    private MapInput generateMapInput(SparkPlan sparkPlan, MapWork mapWork) throws Exception {
        JavaPairRDD hadoopRDD;
        JobConf jobConf = this.cloneJobConf(mapWork);
        Class<?> ifClass = this.getInputFormat(jobConf, mapWork);
        this.sc.sc().setCallSite(CallSite.apply((String)mapWork.getName(), (String)""));
        if (mapWork.getNumMapTasks() != null) {
            jobConf.setNumMapTasks(mapWork.getNumMapTasks().intValue());
            hadoopRDD = this.sc.hadoopRDD(jobConf, ifClass, WritableComparable.class, Writable.class, mapWork.getNumMapTasks().intValue());
        } else {
            hadoopRDD = this.sc.hadoopRDD(jobConf, ifClass, WritableComparable.class, Writable.class);
        }
        boolean toCache = false;
        String tables = mapWork.getAllRootOperators().stream().filter(op -> op instanceof TableScanOperator).map(ts -> ((TableScanDesc)ts.getConf()).getAlias()).collect(Collectors.joining(", "));
        String rddName = mapWork.getName() + " (" + tables + ", " + hadoopRDD.getNumPartitions() + (toCache ? ", cached)" : ")");
        MapInput result = new MapInput(sparkPlan, (JavaPairRDD<WritableComparable, Writable>)hadoopRDD, toCache, rddName);
        return result;
    }

    private ShuffleTran generate(SparkPlan sparkPlan, SparkEdgeProperty edge, boolean toCache, String name) {
        Preconditions.checkArgument(!edge.isShuffleNone(), "AssertionError: SHUFFLE_NONE should only be used for UnionWork.");
        SparkShuffler<Iterable<BytesWritable>> shuffler = edge.isMRShuffle() ? new SortByShuffler(false, sparkPlan, this.shuffleSerializer) : (edge.isShuffleSort() ? new SortByShuffler(true, sparkPlan, this.shuffleSerializer) : new GroupByShuffler());
        return new ShuffleTran(sparkPlan, shuffler, edge.getNumPartitions(), toCache, name, edge);
    }

    private SparkTran generate(BaseWork work, SparkWork sparkWork) throws Exception {
        this.initStatsPublisher(work);
        JobConf newJobConf = this.cloneJobConf(work);
        this.checkSpecs(work, newJobConf);
        byte[] confBytes = KryoSerializer.serializeJobConf(newJobConf);
        boolean caching = this.isCachingWork(work, sparkWork);
        if (work instanceof MapWork) {
            if (work instanceof MergeFileWork) {
                Path outputPath = ((MergeFileWork)work).getOutputDir();
                Path tempOutPath = Utilities.toTempPath(outputPath);
                FileSystem fs = outputPath.getFileSystem((Configuration)this.jobConf);
                try {
                    if (!fs.exists(tempOutPath)) {
                        fs.mkdirs(tempOutPath);
                    }
                }
                catch (IOException e) {
                    throw new RuntimeException("Can't make path " + outputPath + " : " + e.getMessage());
                }
            }
            MapTran mapTran = new MapTran(caching, work.getName());
            HiveMapFunction mapFunc = new HiveMapFunction(confBytes, this.sparkReporter);
            mapTran.setMapFunction(mapFunc);
            return mapTran;
        }
        if (work instanceof ReduceWork) {
            ReduceTran reduceTran = new ReduceTran(caching, work.getName());
            HiveReduceFunction reduceFunc = new HiveReduceFunction(confBytes, this.sparkReporter);
            reduceTran.setReduceFunction(reduceFunc);
            return reduceTran;
        }
        throw new IllegalStateException("AssertionError: expected either MapWork or ReduceWork, but found " + work.getClass().getName());
    }

    private boolean isCachingWork(BaseWork work, SparkWork sparkWork) {
        boolean caching = true;
        List<BaseWork> children = sparkWork.getChildren(work);
        if (children.size() < 2) {
            caching = false;
        } else {
            for (BaseWork child : children) {
                if (!this.cloneToWork.containsKey(child)) continue;
                caching = false;
            }
        }
        return caching;
    }

    private void checkSpecs(BaseWork work, JobConf jc) throws Exception {
        Set<Operator<?>> opList = work.getAllOperators();
        for (Operator<?> op : opList) {
            if (!(op instanceof FileSinkOperator)) continue;
            ((FileSinkOperator)op).checkOutputSpecs(null, jc);
        }
    }

    private JobConf cloneJobConf(BaseWork work) throws Exception {
        if (this.workToJobConf.containsKey(work)) {
            return this.workToJobConf.get(work);
        }
        JobConf cloned = new JobConf((Configuration)this.jobConf);
        HiveConf.setVar((Configuration)cloned, HiveConf.ConfVars.PLAN, "");
        try {
            cloned.setPartitionerClass(JavaUtils.loadClass(HiveConf.getVar((Configuration)cloned, HiveConf.ConfVars.HIVEPARTITIONER)));
        }
        catch (ClassNotFoundException e) {
            String msg = "Could not find partitioner class: " + e.getMessage() + " which is specified by: " + HiveConf.ConfVars.HIVEPARTITIONER.varname;
            throw new IllegalArgumentException(msg, e);
        }
        if (work instanceof MapWork) {
            MapWork mapWork = (MapWork)work;
            cloned.setBoolean("mapred.task.is.map", true);
            List<Path> inputPaths = Utilities.getInputPaths(cloned, mapWork, this.scratchDir, this.context, false);
            Utilities.setInputPaths(cloned, inputPaths);
            Utilities.setMapWork((Configuration)cloned, mapWork, this.scratchDir, false);
            Utilities.createTmpDirs((Configuration)cloned, mapWork);
            if (work instanceof MergeFileWork) {
                MergeFileWork mergeFileWork = (MergeFileWork)work;
                cloned.set("mapred.mapper.class", MergeFileMapper.class.getName());
                cloned.set("mapred.input.format.class", mergeFileWork.getInputformat());
                cloned.setClass("mapred.output.format.class", MergeFileOutputFormat.class, FileOutputFormat.class);
            } else {
                cloned.set("mapred.mapper.class", ExecMapper.class.getName());
            }
            if (mapWork.getMaxSplitSize() != null) {
                HiveConf.setLongVar((Configuration)cloned, HiveConf.ConfVars.MAPREDMAXSPLITSIZE, mapWork.getMaxSplitSize());
            }
            if (mapWork.getMinSplitSize() != null) {
                HiveConf.setLongVar((Configuration)cloned, HiveConf.ConfVars.MAPREDMINSPLITSIZE, mapWork.getMinSplitSize());
            }
            if (mapWork.getMinSplitSizePerNode() != null) {
                HiveConf.setLongVar((Configuration)cloned, HiveConf.ConfVars.MAPREDMINSPLITSIZEPERNODE, mapWork.getMinSplitSizePerNode());
            }
            if (mapWork.getMinSplitSizePerRack() != null) {
                HiveConf.setLongVar((Configuration)cloned, HiveConf.ConfVars.MAPREDMINSPLITSIZEPERRACK, mapWork.getMinSplitSizePerRack());
            }
            this.workToJobConf.put(work, cloned);
        } else if (work instanceof ReduceWork) {
            cloned.setBoolean("mapred.task.is.map", false);
            Utilities.setReduceWork((Configuration)cloned, (ReduceWork)work, this.scratchDir, false);
            Utilities.createTmpDirs((Configuration)cloned, (ReduceWork)work);
            cloned.set("mapred.reducer.class", ExecReducer.class.getName());
        }
        return cloned;
    }

    private void initStatsPublisher(BaseWork work) throws HiveException {
        StatsFactory factory;
        if (work.isGatheringStats() && (factory = StatsFactory.newFactory((Configuration)this.jobConf)) != null) {
            StatsPublisher statsPublisher = factory.getStatsPublisher();
            StatsCollectionContext sc = new StatsCollectionContext((Configuration)this.jobConf);
            sc.setStatsTmpDirs(Utilities.getStatsTmpDirs(work, (Configuration)this.jobConf));
            if (!statsPublisher.init(sc) && HiveConf.getBoolVar((Configuration)this.jobConf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) {
                throw new HiveException(ErrorMsg.STATSPUBLISHER_INITIALIZATION_ERROR.getErrorCodedMsg());
            }
        }
    }
}

