/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import javax.xml.parsers.ParserConfigurationException;
import javax.xml.transform.TransformerException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.pig.ExecType;
import org.apache.pig.PigException;
import org.apache.pig.PigWarning;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.AccumulatorOptimizer;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.CombinerOptimizer;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobCreationException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.KeyTypeDiscoveryVisitor;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.Launcher;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.LimitAdjuster;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.NativeMapReduceOper;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.NoopFilterRemover;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.NoopStoreRemover;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.SampleOptimizer;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.SecondaryKeyOptimizer;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.DotMRPrinter;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.EndOfAllInputSetter;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRIntermediateDataVisitor;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRPrinter;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.POPackageAnnotator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.XMLMRPrinter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.CompilationMessageCollector;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.ConfigurationValidator;
import org.apache.pig.impl.util.LogUtils;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.PigStatsUtil;
import org.apache.pig.tools.pigstats.ScriptState;

public class MapReduceLauncher
extends Launcher {
    public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
    public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER = "mapreduce.fileoutputcommitter.marksuccessfuljobs";
    private static final Log log = LogFactory.getLog(MapReduceLauncher.class);
    private Exception jobControlException = null;
    private String jobControlExceptionStackTrace = null;
    private boolean aggregateWarning = false;
    private Map<FileSpec, Exception> failureMap;
    private JobControl jc = null;

    public MapReduceLauncher() {
        Runtime.getRuntime().addShutdownHook(new HangingJobKiller());
    }

    public Exception getError(FileSpec spec) {
        return this.failureMap.get(spec);
    }

    @Override
    public void reset() {
        this.failureMap = new HashMap<FileSpec, Exception>();
        super.reset();
    }

    @Override
    public PigStats launchPig(PhysicalPlan php, String grpName, PigContext pc) throws PlanException, VisitorException, IOException, ExecException, JobCreationException, Exception {
        long sleepTime = 500L;
        this.aggregateWarning = "true".equalsIgnoreCase(pc.getProperties().getProperty("aggregate.warning"));
        MROperPlan mrp = this.compile(php, pc);
        ConfigurationValidator.validatePigProperties(pc.getProperties());
        Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
        HExecutionEngine exe = pc.getExecutionEngine();
        JobClient jobClient = new JobClient(exe.getJobConf());
        JobControlCompiler jcc = new JobControlCompiler(pc, conf);
        ScriptState.get().addWorkflowAdjacenciesToConf(mrp, conf);
        PigStatsUtil.startCollection(pc, jobClient, jcc, mrp);
        MRIntermediateDataVisitor intermediateVisitor = new MRIntermediateDataVisitor(mrp);
        intermediateVisitor.visit();
        LinkedList failedJobs = new LinkedList();
        LinkedList<NativeMapReduceOper> failedNativeMR = new LinkedList<NativeMapReduceOper>();
        LinkedList<Job> completeFailedJobsInThisRun = new LinkedList<Job>();
        LinkedList succJobs = new LinkedList();
        int totalMRJobs = mrp.size();
        int numMRJobsCompl = 0;
        double lastProg = -1.0;
        long scriptSubmittedTimestamp = System.currentTimeMillis();
        JobControlThreadExceptionHandler jctExceptionHandler = new JobControlThreadExceptionHandler();
        boolean stop_on_failure = pc.getProperties().getProperty("stop.on.failure", "false").equals("true");
        while (mrp.size() != 0) {
            String jobTrackerLoc;
            this.jc = jcc.compile(mrp, grpName);
            if (this.jc == null) {
                LinkedList roots = new LinkedList();
                roots.addAll(mrp.getRoots());
                for (MapReduceOper mro : roots) {
                    NativeMapReduceOper natOp;
                    block42: {
                        if (!(mro instanceof NativeMapReduceOper)) continue;
                        natOp = (NativeMapReduceOper)mro;
                        try {
                            ScriptState.get().emitJobsSubmittedNotification(1);
                            natOp.runJob();
                            ++numMRJobsCompl;
                        }
                        catch (IOException e) {
                            mrp.trimBelow(natOp);
                            failedNativeMR.add(natOp);
                            String msg = "Error running native mapreduce operator job :" + natOp.getJobId() + e.getMessage();
                            String stackTrace = this.getStackStraceStr(e);
                            LogUtils.writeLog(msg, stackTrace, pc.getProperties().getProperty("pig.logfile"), log);
                            log.info((Object)msg);
                            if (!stop_on_failure) break block42;
                            int errCode = 6017;
                            throw new ExecException(msg, errCode, 16);
                        }
                    }
                    double prog = (double)numMRJobsCompl / (double)totalMRJobs;
                    this.notifyProgress(prog, lastProg);
                    lastProg = prog;
                    mrp.remove(natOp);
                }
                continue;
            }
            ArrayList jobsWithoutIds = this.jc.getWaitingJobs();
            log.info((Object)(jobsWithoutIds.size() + " map-reduce job(s) waiting for submission."));
            ScriptState.get().emitJobsSubmittedNotification(jobsWithoutIds.size());
            PigStatsUtil.updateJobMroMap(jcc.getJobMroMap());
            JobConf jobConf = ((Job)jobsWithoutIds.get(0)).getJobConf();
            try {
                String port = jobConf.get("mapred.job.tracker.http.address");
                String jobTrackerAdd = jobConf.get("mapred.job.tracker");
                jobTrackerLoc = jobTrackerAdd.substring(0, jobTrackerAdd.indexOf(":")) + port.substring(port.indexOf(":"));
            }
            catch (Exception e) {
                jobTrackerLoc = null;
                log.debug((Object)"Failed to get job tracker location.");
            }
            completeFailedJobsInThisRun.clear();
            final UDFContext udfContext = UDFContext.getUDFContext();
            Thread jcThread = new Thread((Runnable)this.jc, "JobControl"){

                @Override
                public void run() {
                    UDFContext.setUdfContext(udfContext.clone());
                    super.run();
                }
            };
            jcThread.setUncaughtExceptionHandler(jctExceptionHandler);
            jcThread.setContextClassLoader(PigContext.getClassLoader());
            for (Job job : this.jc.getWaitingJobs()) {
                JobConf jobConfCopy = job.getJobConf();
                jobConfCopy.set("pig.script.submitted.timestamp", Long.toString(scriptSubmittedTimestamp));
                jobConfCopy.set("pig.job.submitted.timestamp", Long.toString(System.currentTimeMillis()));
                job.setJobConf(jobConfCopy);
            }
            jcThread.start();
            try {
                boolean warn_failure = true;
                while (!this.jc.allFinished()) {
                    try {
                        jcThread.join(sleepTime);
                    }
                    catch (InterruptedException e) {
                        // empty catch block
                    }
                    ArrayList<Job> jobsAssignedIdInThisRun = new ArrayList<Job>();
                    for (Job job : jobsWithoutIds) {
                        if (job.getAssignedJobID() == null) continue;
                        jobsAssignedIdInThisRun.add(job);
                        log.info((Object)("HadoopJobId: " + job.getAssignedJobID()));
                        MapReduceOper mro = jcc.getJobMroMap().get(job);
                        if (mro != null) {
                            String alias = ScriptState.get().getAlias(mro);
                            log.info((Object)("Processing aliases " + alias));
                            String aliasLocation = ScriptState.get().getAliasLocation(mro);
                            log.info((Object)("detailed locations: " + aliasLocation));
                        }
                        if (jobTrackerLoc != null) {
                            log.info((Object)("More information at: http://" + jobTrackerLoc + "/jobdetails.jsp?jobid=" + job.getAssignedJobID()));
                        }
                        PigStatsUtil.addJobStats(job);
                        ScriptState.get().emitJobStartedNotification(job.getAssignedJobID().toString());
                    }
                    jobsWithoutIds.removeAll(jobsAssignedIdInThisRun);
                    double prog = ((double)numMRJobsCompl + this.calculateProgress(this.jc, jobClient)) / (double)totalMRJobs;
                    if (this.notifyProgress(prog, lastProg)) {
                        lastProg = prog;
                    }
                    PigStatsUtil.accumulateStats(this.jc);
                    this.checkStopOnFailure(stop_on_failure);
                    if (!warn_failure || this.jc.getFailedJobs().isEmpty()) continue;
                    warn_failure = false;
                    log.warn((Object)"Ooops! Some job has failed! Specify -stop_on_failure if you want Pig to stop immediately on failure.");
                }
                if (this.jobControlException != null) {
                    if (this.jobControlException instanceof PigException) {
                        if (this.jobControlExceptionStackTrace != null) {
                            LogUtils.writeLog("Error message from job controller", this.jobControlExceptionStackTrace, pc.getProperties().getProperty("pig.logfile"), log);
                        }
                        throw this.jobControlException;
                    }
                    int errCode = 2117;
                    String msg = "Unexpected error when launching map reduce job.";
                    throw new ExecException(msg, errCode, 4, this.jobControlException);
                }
                if (!this.jc.getFailedJobs().isEmpty()) {
                    this.checkStopOnFailure(stop_on_failure);
                    for (Job job : this.jc.getFailedJobs()) {
                        completeFailedJobsInThisRun.add(job);
                        log.info((Object)("job " + job.getAssignedJobID() + " has failed! Stop running all dependent jobs"));
                    }
                    failedJobs.addAll(this.jc.getFailedJobs());
                }
                int removedMROp = jcc.updateMROpPlan(completeFailedJobsInThisRun);
                numMRJobsCompl += removedMROp;
                ArrayList jobs = this.jc.getSuccessfulJobs();
                jcc.moveResults(jobs);
                succJobs.addAll(jobs);
                PigStatsUtil.accumulateStats(this.jc);
            }
            catch (Exception e) {
                throw e;
            }
            finally {
                this.jc.stop();
            }
        }
        ScriptState.get().emitProgressUpdatedNotification(100);
        log.info((Object)"100% complete");
        boolean failed = false;
        if (failedNativeMR.size() > 0) {
            failed = true;
        }
        for (String path : intermediateVisitor.getIntermediate()) {
            FileLocalizer.delete(path, pc);
        }
        if (failedJobs != null && failedJobs.size() > 0) {
            Exception backendException = null;
            for (Job fj : failedJobs) {
                try {
                    this.getStats(fj, jobClient, true, pc);
                }
                catch (Exception e) {
                    backendException = e;
                }
                List<POStore> sts = jcc.getStores(fj);
                for (POStore st : sts) {
                    this.failureMap.put(st.getSFile(), backendException);
                }
                PigStatsUtil.setBackendException(fj, backendException);
            }
            failed = true;
        }
        PigStatsUtil.stopCollection(true);
        failed = failed || !PigStats.get().isSuccessful();
        HashMap<Enum, Long> warningAggMap = new HashMap<Enum, Long>();
        if (succJobs != null) {
            for (Job job : succJobs) {
                List<POStore> sts = jcc.getStores(job);
                for (POStore st : sts) {
                    if (pc.getExecType() == ExecType.LOCAL) {
                        HadoopShims.storeSchemaForLocal(job, st);
                    }
                    if (!st.isTmpStore()) {
                        this.createSuccessFile(job, st);
                        continue;
                    }
                    log.debug((Object)("Successfully stored result in: \"" + st.getSFile().getFileName() + "\""));
                }
                this.getStats(job, jobClient, false, pc);
                if (!this.aggregateWarning) continue;
                this.computeWarningAggregate(job, jobClient, warningAggMap);
            }
        }
        if (this.aggregateWarning) {
            CompilationMessageCollector.logAggregate(warningAggMap, CompilationMessageCollector.MessageType.Warning, log);
        }
        if (!failed) {
            log.info((Object)"Success!");
        } else if (succJobs != null && succJobs.size() > 0) {
            log.info((Object)"Some jobs have failed! Stop running all dependent jobs");
        } else {
            log.info((Object)"Failed!");
        }
        jcc.reset();
        int ret = failed ? (succJobs != null && succJobs.size() > 0 ? 3 : 2) : 0;
        return PigStatsUtil.getPigStats(ret);
    }

    private void checkStopOnFailure(boolean stop_on_failure) throws ExecException {
        if (this.jc.getFailedJobs().isEmpty()) {
            return;
        }
        if (stop_on_failure) {
            int errCode = 6017;
            StringBuilder msg = new StringBuilder();
            for (int i = 0; i < this.jc.getFailedJobs().size(); ++i) {
                Job j = (Job)this.jc.getFailedJobs().get(i);
                msg.append(j.getMessage());
                if (i == this.jc.getFailedJobs().size() - 1) continue;
                msg.append("\n");
            }
            throw new ExecException(msg.toString(), errCode, 16);
        }
    }

    private String getStackStraceStr(Throwable e) {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        PrintStream ps = new PrintStream(baos);
        e.printStackTrace(ps);
        return baos.toString();
    }

    private boolean notifyProgress(double prog, double lastProg) {
        if (prog >= lastProg + 0.04) {
            int perCom = (int)(prog * 100.0);
            if (perCom != 100) {
                log.info((Object)(perCom + "% complete"));
                ScriptState.get().emitProgressUpdatedNotification(perCom);
            }
            return true;
        }
        return false;
    }

    @Override
    public void explain(PhysicalPlan php, PigContext pc, PrintStream ps, String format, boolean verbose) throws PlanException, VisitorException, IOException {
        log.trace((Object)"Entering MapReduceLauncher.explain");
        MROperPlan mrp = this.compile(php, pc);
        if (format.equals("text")) {
            MRPrinter printer = new MRPrinter(ps, mrp);
            printer.setVerbose(verbose);
            printer.visit();
        } else if (format.equals("xml")) {
            try {
                XMLMRPrinter printer = new XMLMRPrinter(ps, mrp);
                printer.visit();
                printer.closePlan();
            }
            catch (ParserConfigurationException e) {
                e.printStackTrace();
            }
            catch (TransformerException e) {
                e.printStackTrace();
            }
        } else {
            ps.println("#--------------------------------------------------");
            ps.println("# Map Reduce Plan                                  ");
            ps.println("#--------------------------------------------------");
            DotMRPrinter printer = new DotMRPrinter(mrp, ps);
            printer.setVerbose(verbose);
            printer.dump();
            ps.println("");
        }
    }

    public MROperPlan compile(PhysicalPlan php, PigContext pc) throws PlanException, IOException, VisitorException {
        MRCompiler comp = new MRCompiler(php, pc);
        comp.randomizeFileLocalizer();
        comp.compile();
        comp.aggregateScalarsFiles();
        MROperPlan plan = comp.getMRPlan();
        comp.getMessageCollector().logMessages(CompilationMessageCollector.MessageType.Warning, this.aggregateWarning, log);
        String lastInputChunkSize = pc.getProperties().getProperty("last.input.chunksize", "1000");
        String prop = pc.getProperties().getProperty("pig.exec.nocombiner");
        if (!pc.inIllustrator && !"true".equals(prop)) {
            boolean doMapAgg = Boolean.valueOf(pc.getProperties().getProperty("pig.exec.mapPartAgg", "false"));
            CombinerOptimizer co = new CombinerOptimizer(plan, doMapAgg);
            co.visit();
            co.getMessageCollector().logMessages(CompilationMessageCollector.MessageType.Warning, this.aggregateWarning, log);
        }
        SampleOptimizer so = new SampleOptimizer(plan, pc);
        so.visit();
        if (!pc.inIllustrator) {
            LimitAdjuster la = new LimitAdjuster(plan, pc);
            la.visit();
            la.adjust();
        }
        prop = pc.getProperties().getProperty("pig.exec.nosecondarykey");
        if (!pc.inIllustrator && !"true".equals(prop)) {
            SecondaryKeyOptimizer skOptimizer = new SecondaryKeyOptimizer(plan);
            skOptimizer.visit();
        }
        POPackageAnnotator pkgAnnotator = new POPackageAnnotator(plan);
        pkgAnnotator.visit();
        MRCompiler.LastInputStreamingOptimizer liso = new MRCompiler.LastInputStreamingOptimizer(plan, lastInputChunkSize);
        liso.visit();
        KeyTypeDiscoveryVisitor kdv = new KeyTypeDiscoveryVisitor(plan);
        kdv.visit();
        NoopFilterRemover fRem = new NoopFilterRemover(plan);
        fRem.visit();
        boolean isMultiQuery = "true".equalsIgnoreCase(pc.getProperties().getProperty("opt.multiquery", "true"));
        if (isMultiQuery) {
            MultiQueryOptimizer mqOptimizer = new MultiQueryOptimizer(plan, pc.inIllustrator);
            mqOptimizer.visit();
        }
        NoopStoreRemover sRem = new NoopStoreRemover(plan);
        sRem.visit();
        EndOfAllInputSetter checker = new EndOfAllInputSetter(plan);
        checker.visit();
        boolean isAccum = "true".equalsIgnoreCase(pc.getProperties().getProperty("opt.accumulator", "true"));
        if (isAccum) {
            AccumulatorOptimizer accum = new AccumulatorOptimizer(plan);
            accum.visit();
        }
        return plan;
    }

    private boolean shouldMarkOutputDir(Job job) {
        return job.getJobConf().getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, false);
    }

    private void createSuccessFile(Job job, POStore store) throws IOException {
        Path filePath;
        Path outputPath;
        FileSystem fs;
        if (this.shouldMarkOutputDir(job) && (fs = (outputPath = new Path(store.getSFile().getFileName())).getFileSystem((Configuration)job.getJobConf())).exists(outputPath) && !fs.exists(filePath = new Path(outputPath, SUCCEEDED_FILE_NAME))) {
            fs.create(filePath).close();
        }
    }

    void computeWarningAggregate(Job job, JobClient jobClient, Map<Enum, Long> aggMap) {
        block7: {
            JobID mapRedJobID = job.getAssignedJobID();
            RunningJob runningJob = null;
            try {
                runningJob = jobClient.getJob(mapRedJobID);
                if (runningJob == null) break block7;
                Counters counters = runningJob.getCounters();
                if (counters == null) {
                    long nullCounterCount = aggMap.get((Object)PigWarning.NULL_COUNTER_COUNT) == null ? 0L : aggMap.get((Object)PigWarning.NULL_COUNTER_COUNT);
                    aggMap.put(PigWarning.NULL_COUNTER_COUNT, ++nullCounterCount);
                }
                try {
                    for (PigWarning e : PigWarning.values()) {
                        if (e == PigWarning.NULL_COUNTER_COUNT) continue;
                        Long currentCount = aggMap.get((Object)e);
                        currentCount = currentCount == null ? 0L : currentCount;
                        if (counters != null) {
                            currentCount = currentCount + counters.getCounter((Enum)e);
                        }
                        aggMap.put(e, currentCount);
                    }
                }
                catch (Exception e) {
                    log.warn((Object)"Exception getting counters.", (Throwable)e);
                }
            }
            catch (IOException ioe) {
                String msg = "Unable to retrieve job to compute warning aggregation.";
                log.warn((Object)msg);
            }
        }
    }

    class JobControlThreadExceptionHandler
    implements Thread.UncaughtExceptionHandler {
        JobControlThreadExceptionHandler() {
        }

        @Override
        public void uncaughtException(Thread thread, Throwable throwable) {
            MapReduceLauncher.this.jobControlExceptionStackTrace = MapReduceLauncher.this.getStackStraceStr(throwable);
            try {
                MapReduceLauncher.this.jobControlException = MapReduceLauncher.this.getExceptionFromString(MapReduceLauncher.this.jobControlExceptionStackTrace);
            }
            catch (Exception e) {
                String errMsg = "Could not resolve error that occured when launching map reduce job: " + MapReduceLauncher.this.jobControlExceptionStackTrace;
                MapReduceLauncher.this.jobControlException = new RuntimeException(errMsg, throwable);
            }
        }
    }

    private class HangingJobKiller
    extends Thread {
        @Override
        public void run() {
            try {
                log.debug((Object)"Receive kill signal");
                if (MapReduceLauncher.this.jc != null) {
                    for (Job job : MapReduceLauncher.this.jc.getRunningJobs()) {
                        RunningJob runningJob = job.getJobClient().getJob(job.getAssignedJobID());
                        if (runningJob != null) {
                            runningJob.killJob();
                        }
                        log.info((Object)("Job " + job.getJobID() + " killed"));
                    }
                }
            }
            catch (Exception e) {
                log.warn((Object)("Encounter exception on cleanup:" + e));
            }
        }
    }
}

