package org.apache.spark.api.python;

import java.io.DataInputStream;
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.util.List;
import org.apache.spark.Logging;
import org.apache.spark.SparkException;
import org.slf4j.Logger;
import scala.Array$;
import scala.Function0;
import scala.Option$;
import scala.Predef$;
import scala.StringContext;
import scala.collection.JavaConversions$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.Map;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.TraitSetter;
import scala.sys.package$;

/* compiled from: PythonWorkerFactory.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005}a!B\u0001\u0003\u0001\u0019a!a\u0005)zi\"|gnV8sW\u0016\u0014h)Y2u_JL(BA\u0002\u0005\u0003\u0019\u0001\u0018\u0010\u001e5p]*\u0011QAB\u0001\u0004CBL'BA\u0004\t\u0003\u0015\u0019\b/\u0019:l\u0015\tI!\"\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002\u0017\u0005\u0019qN]4\u0014\u0007\u0001i1\u0003\u0005\u0002\u000f#5\tqBC\u0001\u0011\u0003\u0015\u00198-\u00197b\u0013\t\u0011rB\u0001\u0004B]f\u0014VM\u001a\t\u0003)Ui\u0011AB\u0005\u0003-\u0019\u0011q\u0001T8hO&tw\r\u0003\u0005\u0019\u0001\t\u0005\t\u0015!\u0003\u001b\u0003)\u0001\u0018\u0010\u001e5p]\u0016CXmY\u0002\u0001!\tYbD\u0004\u0002\u000f9%\u0011QdD\u0001\u0007!J,G-\u001a4\n\u0005}\u0001#AB*ue&twM\u0003\u0002\u001e\u001f!A!\u0005\u0001B\u0001B\u0003%1%A\u0004f]Z4\u0016M]:\u0011\tm!#DG\u0005\u0003K\u0001\u00121!T1q\u0011\u00159\u0003\u0001\"\u0001)\u0003\u0019a\u0014N\\5u}Q\u0019\u0011f\u000b\u0017\u0011\u0005)\u0002Q\"\u0001\u0002\t\u000ba1\u0003\u0019\u0001\u000e\t\u000b\t2\u0003\u0019A\u0012\t\u000f9\u0002!\u0019!C\u0001_\u0005IQo]3EC\u0016lwN\\\u000b\u0002aA\u0011a\"M\u0005\u0003e=\u0011qAQ8pY\u0016\fg\u000e\u0003\u00045\u0001\u0001\u0006I\u0001M\u0001\u000bkN,G)Y3n_:\u0004\u0003b\u0002\u001c\u0001\u0001\u0004%\taN\u0001\u0007I\u0006,Wn\u001c8\u0016\u0003a\u0002\"!\u000f \u000e\u0003iR!a\u000f\u001f\u0002\t1\fgn\u001a\u0006\u0002{\u0005!!.\u0019<b\u0013\ty$HA\u0004Qe>\u001cWm]:\t\u000f\u0005\u0003\u0001\u0019!C\u0001\u0005\u0006QA-Y3n_:|F%Z9\u0015\u0005\r3\u0005C\u0001\bE\u0013\t)uB\u0001\u0003V]&$\bbB$A\u0003\u0003\u0005\r\u0001O\u0001\u0004q\u0012\n\u0004BB%\u0001A\u0003&\u0001(A\u0004eC\u0016lwN\u001c\u0011\t\u000f-\u0003!\u0019!C\u0001\u0019\u0006QA-Y3n_:Dun\u001d;\u0016\u00035\u0003\"AT)\u000e\u0003=S!\u0001\u0015\u001f\u0002\u00079,G/\u0003\u0002S\u001f\nY\u0011J\\3u\u0003\u0012$'/Z:t\u0011\u0019!\u0006\u0001)A\u0005\u001b\u0006YA-Y3n_:Dun\u001d;!\u0011\u001d1\u0006\u00011A\u0005\u0002]\u000b!\u0002Z1f[>t\u0007k\u001c:u+\u0005A\u0006C\u0001\bZ\u0013\tQvBA\u0002J]RDq\u0001\u0018\u0001A\u0002\u0013\u0005Q,\u0001\beC\u0016lwN\u001c)peR|F%Z9\u0015\u0005\rs\u0006bB$\\\u0003\u0003\u0005\r\u0001\u0017\u0005\u0007A\u0002\u0001\u000b\u0015\u0002-\u0002\u0017\u0011\fW-\\8o!>\u0014H\u000f\t\u0005\bE\u0002\u0011\r\u0011\"\u0001d\u0003)\u0001\u0018\u0010\u001e5p]B\u000bG\u000f[\u000b\u00025!1Q\r\u0001Q\u0001\ni\t1\u0002]=uQ>t\u0007+\u0019;iA!)q\r\u0001C\u0001Q\u000611M]3bi\u0016$\u0012!\u001b\t\u0003\u001d*L!a[(\u0003\rM{7m[3u\u0011\u0015i\u0007\u0001\"\u0003i\u0003M\u0019'/Z1uKRC'o\\;hQ\u0012\u000bW-\\8o\u0011\u0015y\u0007\u0001\"\u0003i\u0003I\u0019'/Z1uKNKW\u000e\u001d7f/>\u00148.\u001a:\t\u000bE\u0004A\u0011\u0002:\u0002\u0017M$\u0018M\u001d;EC\u0016lwN\u001c\u000b\u0002\u0007\")A\u000f\u0001C\u0005k\u00069\"/\u001a3je\u0016\u001cGo\u0015;sK\u0006l7\u000fV8Ti\u0012,'O\u001d\u000b\u0004\u0007Zt\b\"B<t\u0001\u0004A\u0018AB:uI>,H\u000f\u0005\u0002zy6\t!P\u0003\u0002|y\u0005\u0011\u0011n\\\u0005\u0003{j\u00141\"\u00138qkR\u001cFO]3b[\")qp\u001da\u0001q\u000611\u000f\u001e3feJDa!a\u0001\u0001\t\u0013\u0011\u0018AC:u_B$\u0015-Z7p]\"1\u0011q\u0001\u0001\u0005\u0002I\fAa\u001d;pa\u001e9\u00111\u0002\u0002\t\n\u00055\u0011a\u0005)zi\"|gnV8sW\u0016\u0014h)Y2u_JL\bc\u0001\u0016\u0002\u0010\u00191\u0011A\u0001E\u0005\u0003#\u00192!a\u0004\u000e\u0011\u001d9\u0013q\u0002C\u0001\u0003+!\"!!\u0004\t\u0013\u0005e\u0011q\u0002b\u0001\n\u00039\u0016a\u0006)S\u001f\u000e+5kU0X\u0003&#v\fV%N\u000b>+FkX'T\u0011!\ti\"a\u0004!\u0002\u0013A\u0016\u0001\u0007)S\u001f\u000e+5kU0X\u0003&#v\fV%N\u000b>+FkX'TA\u0001")
/* loaded from: input_file:org/apache/spark/api/python/PythonWorkerFactory.class */
public class PythonWorkerFactory implements Logging {
    private final String pythonExec;
    private final Map<String, String> envVars;
    private final boolean useDaemon;
    private Process daemon;
    private final InetAddress daemonHost;
    private int daemonPort;
    private final String pythonPath;
    private transient Logger org$apache$spark$Logging$$log_;

    public static int PROCESS_WAIT_TIMEOUT_MS() {
        return PythonWorkerFactory$.MODULE$.PROCESS_WAIT_TIMEOUT_MS();
    }

    @Override // org.apache.spark.Logging
    public Logger org$apache$spark$Logging$$log_() {
        return this.org$apache$spark$Logging$$log_;
    }

    @Override // org.apache.spark.Logging
    @TraitSetter
    public void org$apache$spark$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$Logging$$log_ = logger;
    }

    @Override // org.apache.spark.Logging
    public Logger log() {
        return Logging.Cclass.log(this);
    }

    @Override // org.apache.spark.Logging
    public void logInfo(Function0<String> function0) {
        Logging.Cclass.logInfo(this, function0);
    }

    @Override // org.apache.spark.Logging
    public void logDebug(Function0<String> function0) {
        Logging.Cclass.logDebug(this, function0);
    }

    @Override // org.apache.spark.Logging
    public void logTrace(Function0<String> function0) {
        Logging.Cclass.logTrace(this, function0);
    }

    @Override // org.apache.spark.Logging
    public void logWarning(Function0<String> function0) {
        Logging.Cclass.logWarning(this, function0);
    }

    @Override // org.apache.spark.Logging
    public void logError(Function0<String> function0) {
        Logging.Cclass.logError(this, function0);
    }

    @Override // org.apache.spark.Logging
    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.Cclass.logInfo(this, function0, th);
    }

    @Override // org.apache.spark.Logging
    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.Cclass.logDebug(this, function0, th);
    }

    @Override // org.apache.spark.Logging
    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.Cclass.logTrace(this, function0, th);
    }

    @Override // org.apache.spark.Logging
    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.Cclass.logWarning(this, function0, th);
    }

    @Override // org.apache.spark.Logging
    public void logError(Function0<String> function0, Throwable th) {
        Logging.Cclass.logError(this, function0, th);
    }

    @Override // org.apache.spark.Logging
    public boolean isTraceEnabled() {
        return Logging.Cclass.isTraceEnabled(this);
    }

    public boolean useDaemon() {
        return this.useDaemon;
    }

    public Process daemon() {
        return this.daemon;
    }

    public void daemon_$eq(Process process) {
        this.daemon = process;
    }

    public InetAddress daemonHost() {
        return this.daemonHost;
    }

    public int daemonPort() {
        return this.daemonPort;
    }

    public void daemonPort_$eq(int i) {
        this.daemonPort = i;
    }

    public String pythonPath() {
        return this.pythonPath;
    }

    public Socket create() {
        return useDaemon() ? createThroughDaemon() : createSimpleWorker();
    }

    private synchronized Socket createThroughDaemon() {
        startDaemon();
        try {
            return new Socket(daemonHost(), daemonPort());
        } catch (SocketException e) {
            logWarning(new PythonWorkerFactory$$anonfun$createThroughDaemon$1(this));
            stopDaemon();
            startDaemon();
            return new Socket(daemonHost(), daemonPort());
        }
    }

    private Socket createSimpleWorker() {
        ServerSocket serverSocket = null;
        try {
            serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress((byte[]) Array$.MODULE$.apply(Predef$.MODULE$.wrapByteArray(new byte[]{Byte.MAX_VALUE, 0, 0, 1}), ClassTag$.MODULE$.Byte())));
            ProcessBuilder processBuilder = new ProcessBuilder((List<String>) JavaConversions$.MODULE$.seqAsJavaList((Seq) Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{this.pythonExec, "-u", "-m", "pyspark.worker"}))));
            java.util.Map<String, String> environment = processBuilder.environment();
            environment.putAll(JavaConversions$.MODULE$.mapAsJavaMap(this.envVars));
            environment.put("PYTHONPATH", pythonPath());
            Process start = processBuilder.start();
            redirectStreamsToStderr(start.getInputStream(), start.getErrorStream());
            OutputStreamWriter outputStreamWriter = new OutputStreamWriter(start.getOutputStream());
            outputStreamWriter.write(new StringBuilder().append(serverSocket.getLocalPort()).append((Object) "\n").toString());
            outputStreamWriter.flush();
            serverSocket.setSoTimeout(10000);
            try {
                Socket accept = serverSocket.accept();
                if (serverSocket != null) {
                    serverSocket.close();
                }
                return accept;
            } catch (Exception e) {
                throw new SparkException("Python worker did not connect back in time", e);
            }
        } catch (Throwable th) {
            if (serverSocket != null) {
                serverSocket.close();
            }
            throw th;
        }
    }

    private synchronized void startDaemon() {
        if (daemon() == null) {
            try {
                ProcessBuilder processBuilder = new ProcessBuilder((List<String>) JavaConversions$.MODULE$.seqAsJavaList((Seq) Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{this.pythonExec, "-u", "-m", "pyspark.daemon"}))));
                java.util.Map<String, String> environment = processBuilder.environment();
                environment.putAll(JavaConversions$.MODULE$.mapAsJavaMap(this.envVars));
                environment.put("PYTHONPATH", pythonPath());
                daemon_$eq(processBuilder.start());
                DataInputStream dataInputStream = new DataInputStream(daemon().getInputStream());
                daemonPort_$eq(dataInputStream.readInt());
                redirectStreamsToStderr(dataInputStream, daemon().getErrorStream());
            } catch (Exception e) {
                String str = (String) Option$.MODULE$.apply(daemon()).flatMap(new PythonWorkerFactory$$anonfun$3(this)).getOrElse(new PythonWorkerFactory$$anonfun$4(this));
                stopDaemon();
                if (str != null ? str.equals("") : "" == 0) {
                    throw e;
                }
                SparkException sparkException = new SparkException(new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n              |Error from python worker:\n              |  ", "\n              |PYTHONPATH was:\n              |  ", "\n              |", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str.replace("\n", "\n  "), pythonPath(), e})))).stripMargin());
                sparkException.setStackTrace(e.getStackTrace());
                throw sparkException;
            }
        }
    }

    private void redirectStreamsToStderr(InputStream inputStream, InputStream inputStream2) {
        try {
            new RedirectThread(inputStream, System.err, new StringBuilder().append((Object) "stdout reader for ").append((Object) this.pythonExec).toString()).start();
            new RedirectThread(inputStream2, System.err, new StringBuilder().append((Object) "stderr reader for ").append((Object) this.pythonExec).toString()).start();
        } catch (Exception e) {
            logError(new PythonWorkerFactory$$anonfun$redirectStreamsToStderr$1(this), e);
        }
    }

    private synchronized void stopDaemon() {
        if (daemon() != null) {
            daemon().destroy();
        }
        daemon_$eq(null);
        daemonPort_$eq(0);
    }

    public void stop() {
        stopDaemon();
    }

    public PythonWorkerFactory(String str, Map<String, String> map) {
        this.pythonExec = str;
        this.envVars = map;
        org$apache$spark$Logging$$log__$eq(null);
        this.useDaemon = !System.getProperty("os.name").startsWith("Windows");
        this.daemon = null;
        this.daemonHost = InetAddress.getByAddress((byte[]) Array$.MODULE$.apply(Predef$.MODULE$.wrapByteArray(new byte[]{Byte.MAX_VALUE, 0, 0, 1}), ClassTag$.MODULE$.Byte()));
        this.daemonPort = 0;
        this.pythonPath = PythonUtils$.MODULE$.mergePythonPaths(Predef$.MODULE$.wrapRefArray(new String[]{PythonUtils$.MODULE$.sparkPythonPath(), (String) map.getOrElse("PYTHONPATH", new PythonWorkerFactory$$anonfun$1(this)), (String) package$.MODULE$.env().getOrElse("PYTHONPATH", new PythonWorkerFactory$$anonfun$2(this))}));
    }
}
