/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pig.impl.streaming;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
import org.apache.pig.impl.streaming.HandlerFactory;
import org.apache.pig.impl.streaming.InputHandler;
import org.apache.pig.impl.streaming.OutputHandler;
import org.apache.pig.impl.streaming.StreamingCommand;
import org.apache.pig.impl.util.UDFContext;

public class ExecutableManager {
    private static final Log LOG = LogFactory.getLog(ExecutableManager.class);
    private static final int SUCCESS = 0;
    private static final String PATH = "PATH";
    private static final String BASH = "bash";
    private static final Result EOS_RESULT = new Result(4, null);
    protected StreamingCommand command;
    String argvAsString;
    Process process;
    protected int exitCode = -127;
    protected DataOutputStream stdin;
    ProcessInputThread stdinThread;
    ProcessOutputThread stdoutThread;
    InputStream stdout;
    ProcessErrorThread stderrThread;
    InputStream stderr;
    InputHandler inputHandler;
    OutputHandler outputHandler;
    protected long inputRecords = 0L;
    protected long inputBytes = 0L;
    protected long outputRecords = 0L;
    protected long outputBytes = 0L;
    protected volatile Throwable outerrThreadsError;
    private POStream poStream;
    private ProcessInputThread fileInputThread;

    public void configure(POStream stream) throws IOException, ExecException {
        this.poStream = stream;
        this.command = stream.getCommand();
        String[] argv = this.command.getCommandArgs();
        this.argvAsString = "";
        for (String arg : argv) {
            this.argvAsString = this.argvAsString + arg;
            this.argvAsString = this.argvAsString + " ";
        }
        this.inputHandler = HandlerFactory.createInputHandler(this.command);
        this.outputHandler = HandlerFactory.createOutputHandler(this.command);
    }

    public void close() throws IOException {
        this.inputHandler.close(this.process);
        if (this.inputHandler.getInputType() == InputHandler.InputType.ASYNCHRONOUS) {
            this.exec();
        }
        try {
            this.exitCode = this.process.waitFor();
        }
        catch (InterruptedException ie) {
            LOG.error((Object)"Unexpected exception while waiting for streaming binary to complete", (Throwable)ie);
            this.killProcess(this.process);
        }
        try {
            if (this.stdoutThread != null) {
                this.stdoutThread.join(0L);
            }
            this.stdoutThread = null;
        }
        catch (InterruptedException ie) {
            LOG.error((Object)"Unexpected exception while waiting for output thread for streaming binary to complete", (Throwable)ie);
            this.killProcess(this.process);
        }
        try {
            if (this.stderrThread != null) {
                this.stderrThread.join(0L);
            }
            this.stderrThread = null;
        }
        catch (InterruptedException ie) {
            LOG.error((Object)"Unexpected exception while waiting for input thread for streaming binary to complete", (Throwable)ie);
            this.killProcess(this.process);
        }
        LOG.debug((Object)("Process exited with: " + this.exitCode));
        if (this.exitCode != 0) {
            LOG.error((Object)(this.command + " failed with exit status: " + this.exitCode));
        }
        if (this.outputHandler.getOutputType() == OutputHandler.OutputType.ASYNCHRONOUS) {
            this.outputHandler.bindTo("", null, 0L, -1L);
            this.stdoutThread = new ProcessOutputThread(this.outputHandler, this.poStream);
            this.stdoutThread.start();
        }
        if (this.outerrThreadsError != null) {
            LOG.error((Object)("Output/Error thread failed with: " + this.outerrThreadsError));
        }
    }

    private void killProcess(Process process) throws IOException {
        if (process != null) {
            this.inputHandler.close(process);
            this.outputHandler.close();
            process.destroy();
        }
    }

    protected void setupEnvironment(ProcessBuilder pb) {
        String separator = ":";
        Configuration conf = UDFContext.getUDFContext().getJobConf();
        Map<String, String> env = pb.environment();
        this.addJobConfToEnvironment(conf, env);
        File dir = pb.directory();
        String cwd = dir != null ? dir.getAbsolutePath() : System.getProperty("user.dir");
        String envPath = env.get(PATH);
        envPath = envPath == null ? cwd : envPath + separator + cwd;
        env.put(PATH, envPath);
    }

    void addJobConfToEnvironment(Configuration conf, Map<String, String> env) {
        String propsToSend = conf.get("pig.streaming.environment");
        LOG.debug((Object)("Properties to ship to streaming environment set in pig.streaming.environment: " + propsToSend));
        if (propsToSend == null) {
            return;
        }
        for (String prop : propsToSend.split(",")) {
            String value = conf.get(prop);
            if (value == null) {
                LOG.warn((Object)("Property set in pig.streaming.environment not found in Configuration: " + prop));
                continue;
            }
            LOG.debug((Object)("Setting property in streaming environment: " + prop));
            this.envPut(env, prop, value);
        }
    }

    void envPut(Map<String, String> env, String name, String value) {
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Add  env entry:" + name + "=" + value));
        }
        env.put(name, value);
    }

    protected void exec() throws IOException {
        ArrayList<String> cmdArgs = new ArrayList<String>();
        if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS")) {
            cmdArgs.add("cmd");
            cmdArgs.add("/c");
            cmdArgs.add(this.argvAsString);
        } else {
            cmdArgs.add(BASH);
            cmdArgs.add("-c");
            StringBuffer sb = new StringBuffer();
            sb.append("exec ");
            sb.append(this.argvAsString);
            cmdArgs.add(sb.toString());
        }
        ProcessBuilder processBuilder = new ProcessBuilder(cmdArgs.toArray(new String[cmdArgs.size()]));
        this.setupEnvironment(processBuilder);
        this.process = processBuilder.start();
        LOG.debug((Object)("Started the process for command: " + this.command));
        this.stderr = new DataInputStream(new BufferedInputStream(this.process.getErrorStream()));
        this.stderrThread = new ProcessErrorThread();
        this.stderrThread.start();
        if (this.outputHandler.getOutputType() == OutputHandler.OutputType.SYNCHRONOUS) {
            this.stdout = new DataInputStream(new BufferedInputStream(this.process.getInputStream()));
            this.outputHandler.bindTo("", new BufferedPositionedInputStream(this.stdout), 0L, Long.MAX_VALUE);
            this.stdoutThread = new ProcessOutputThread(this.outputHandler, this.poStream);
            this.stdoutThread.start();
        }
    }

    public void run() throws IOException {
        if (this.inputHandler.getInputType() == InputHandler.InputType.ASYNCHRONOUS) {
            this.fileInputThread = new ProcessInputThread(this.inputHandler, this.poStream, UDFContext.getUDFContext());
            this.fileInputThread.start();
            return;
        }
        this.exec();
        this.stdin = new DataOutputStream(new BufferedOutputStream(this.process.getOutputStream()));
        this.inputHandler.bindTo(this.stdin);
        this.stdinThread = new ProcessInputThread(this.inputHandler, this.poStream, null);
        this.stdinThread.start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendOutput(BlockingQueue<Result> binaryOutputQueue, Result res) {
        try {
            binaryOutputQueue.put(res);
        }
        catch (InterruptedException e) {
            LOG.error((Object)"Error while sending binary output to POStream", (Throwable)e);
        }
        POStream pOStream = this.poStream;
        synchronized (pOStream) {
            if (res != null) {
                this.poStream.notifyAll();
            }
        }
    }

    protected void processError(String error) {
        System.err.print(error);
    }

    class ProcessErrorThread
    extends Thread {
        public ProcessErrorThread() {
            this.setDaemon(true);
        }

        @Override
        public void run() {
            try {
                String error;
                BufferedReader reader = new BufferedReader(new InputStreamReader(ExecutableManager.this.stderr));
                while ((error = reader.readLine()) != null) {
                    ExecutableManager.this.processError(error + "\n");
                }
                if (ExecutableManager.this.stderr != null) {
                    ExecutableManager.this.stderr.close();
                    LOG.debug((Object)"ProcessErrorThread done");
                }
            }
            catch (Throwable t) {
                ExecutableManager.this.outerrThreadsError = t;
                LOG.error((Object)t);
                try {
                    if (ExecutableManager.this.stderr != null) {
                        ExecutableManager.this.stderr.close();
                    }
                }
                catch (IOException ioe) {
                    LOG.warn((Object)ioe);
                }
                throw new RuntimeException(t);
            }
        }
    }

    class ProcessOutputThread
    extends Thread {
        OutputHandler outputHandler;
        private BlockingQueue<Result> binaryOutputQueue;

        ProcessOutputThread(OutputHandler outputHandler, POStream poStream) {
            this.setDaemon(true);
            this.outputHandler = outputHandler;
            this.binaryOutputQueue = poStream.getBinaryOutputQueue();
        }

        @Override
        public void run() {
            try {
                Tuple tuple = null;
                while ((tuple = this.outputHandler.getNext()) != null) {
                    this.processOutput(tuple);
                    ExecutableManager.this.outputBytes += tuple.getMemorySize();
                }
                this.processOutput(null);
                this.outputHandler.close();
            }
            catch (Throwable t) {
                ExecutableManager.this.outerrThreadsError = t;
                LOG.error((Object)"Caught Exception in OutputHandler of Streaming binary, sending error signal to pipeline", t);
                try {
                    Result res = new Result();
                    res.result = "Error reading output from Streaming binary:'" + ExecutableManager.this.argvAsString + "':" + t.getMessage();
                    res.returnStatus = (byte)2;
                    ExecutableManager.this.sendOutput(this.binaryOutputQueue, res);
                    ExecutableManager.this.killProcess(ExecutableManager.this.process);
                }
                catch (Exception e) {
                    LOG.error((Object)"Error while trying to signal Error status to pipeline", (Throwable)e);
                }
            }
        }

        void processOutput(Tuple t) {
            Result res = new Result();
            if (t != null) {
                res.result = t;
                res.returnStatus = 0;
                ++ExecutableManager.this.outputRecords;
            } else {
                try {
                    ExecutableManager.this.exitCode = ExecutableManager.this.process.waitFor();
                }
                catch (InterruptedException ie) {
                    try {
                        ExecutableManager.this.killProcess(ExecutableManager.this.process);
                    }
                    catch (IOException e) {
                        LOG.warn((Object)"Exception trying to kill process while processing null output from binary", (Throwable)e);
                    }
                    String errMsg = "Failure while waiting for process (" + ExecutableManager.this.argvAsString + ")" + ie.getMessage();
                    LOG.error((Object)errMsg, (Throwable)ie);
                    res.result = errMsg;
                    res.returnStatus = (byte)2;
                    ExecutableManager.this.sendOutput(this.binaryOutputQueue, res);
                    return;
                }
                if (ExecutableManager.this.exitCode == 0) {
                    res = EOS_RESULT;
                } else {
                    String errMsg = "'" + ExecutableManager.this.argvAsString + "'" + " failed with exit status: " + ExecutableManager.this.exitCode;
                    LOG.error((Object)errMsg);
                    res.result = errMsg;
                    res.returnStatus = (byte)2;
                }
            }
            ExecutableManager.this.sendOutput(this.binaryOutputQueue, res);
        }
    }

    class ProcessInputThread
    extends Thread {
        InputHandler inputHandler;
        private POStream poStream;
        private UDFContext udfContext;
        private BlockingQueue<Result> binaryInputQueue;

        ProcessInputThread(InputHandler inputHandler, POStream poStream, UDFContext udfContext) {
            this.setDaemon(true);
            this.inputHandler = inputHandler;
            this.poStream = poStream;
            this.udfContext = udfContext;
            this.binaryInputQueue = poStream.getBinaryInputQueue();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            if (this.inputHandler.getInputType() == InputHandler.InputType.ASYNCHRONOUS && this.udfContext != null) {
                UDFContext.setUdfContext(this.udfContext);
            }
            try {
                while (true) {
                    Result inp = null;
                    inp = this.binaryInputQueue.take();
                    POStream pOStream = this.poStream;
                    synchronized (pOStream) {
                        if (inp != null) {
                            this.poStream.notifyAll();
                        }
                    }
                    if (inp != null && inp.returnStatus == 3) {
                        ExecutableManager.this.close();
                        return;
                    }
                    if (inp == null || inp.returnStatus != 0) continue;
                    if (ExecutableManager.this.outerrThreadsError != null) {
                        throw new IOException("Output/Error thread failed with: " + ExecutableManager.this.outerrThreadsError);
                    }
                    Tuple t = null;
                    try {
                        t = (Tuple)inp.result;
                        this.inputHandler.putNext(t);
                    }
                    catch (IOException e) {
                        if (this.inputHandler.getInputType() == InputHandler.InputType.SYNCHRONOUS) {
                            LOG.warn((Object)"Exception while trying to write to stream binary's input", (Throwable)e);
                            ExecutableManager.this.close();
                            return;
                        }
                        LOG.error((Object)"Exception while trying to write to stream binary's input", (Throwable)e);
                        Result res = new Result(2, "Exception while trying to write to stream binary's input" + e.getMessage());
                        ExecutableManager.this.sendOutput(this.poStream.getBinaryOutputQueue(), res);
                        throw e;
                    }
                    ExecutableManager.this.inputBytes += t.getMemorySize();
                    ++ExecutableManager.this.inputRecords;
                }
            }
            catch (Throwable t) {
                ExecutableManager.this.outerrThreadsError = t;
                LOG.error((Object)"Error while reading from POStream and passing it to the streaming process", t);
                try {
                    ExecutableManager.this.killProcess(ExecutableManager.this.process);
                }
                catch (IOException ioe) {
                    LOG.warn((Object)ioe);
                }
                return;
            }
        }
    }
}

