package org.apache.hive.spark.client;

import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Resources;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.Serializable;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import jodd.util.StringPool;
import jodd.util.SystemUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hive.spark.client.BaseProtocol;
import org.apache.hive.spark.client.JobHandle;
import org.apache.hive.spark.client.rpc.Rpc;
import org.apache.hive.spark.client.rpc.RpcConfiguration;
import org.apache.hive.spark.client.rpc.RpcServer;
import org.apache.spark.SparkContext;
import org.apache.spark.SparkException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/hive/spark/client/SparkClientImpl.class */
public class SparkClientImpl implements SparkClient {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(SparkClientImpl.class);
    private static final long DEFAULT_SHUTDOWN_TIMEOUT = 10000;
    private static final long MAX_ERR_LOG_LINES_FOR_RPC = 1000;
    private static final String OSX_TEST_OPTS = "SPARK_OSX_TEST_OPTS";
    private static final String SPARK_HOME_ENV = "SPARK_HOME";
    private static final String SPARK_HOME_KEY = "spark.home";
    private static final String DRIVER_OPTS_KEY = "spark.driver.extraJavaOptions";
    private static final String EXECUTOR_OPTS_KEY = "spark.executor.extraJavaOptions";
    private static final String DRIVER_EXTRA_CLASSPATH = "spark.driver.extraClassPath";
    private static final String EXECUTOR_EXTRA_CLASSPATH = "spark.executor.extraClassPath";
    private final Map<String, String> conf;
    private final HiveConf hiveConf;
    private final Thread driverThread;
    private final Map<String, JobHandleImpl<?>> jobs = Maps.newConcurrentMap();
    private final Rpc driverRpc;
    private final ClientProtocol protocol;
    private volatile boolean isAlive;

    /* loaded from: input_file:org/apache/hive/spark/client/SparkClientImpl$AddFileJob.class */
    private static class AddFileJob implements Job<Serializable> {
        private static final long serialVersionUID = 1;
        private final String path;

        AddFileJob() {
            this(null);
        }

        AddFileJob(String str) {
            this.path = str;
        }

        @Override // org.apache.hive.spark.client.Job
        public Serializable call(JobContext jobContext) throws Exception {
            jobContext.sc().addFile(this.path);
            return null;
        }
    }

    /* loaded from: input_file:org/apache/hive/spark/client/SparkClientImpl$AddJarJob.class */
    private static class AddJarJob implements Job<Serializable> {
        private static final long serialVersionUID = 1;
        private final String path;

        AddJarJob() {
            this(null);
        }

        AddJarJob(String str) {
            this.path = str;
        }

        @Override // org.apache.hive.spark.client.Job
        public Serializable call(JobContext jobContext) throws Exception {
            jobContext.sc().addJar(this.path);
            jobContext.getAddedJars().put(this.path, Long.valueOf(System.currentTimeMillis()));
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hive/spark/client/SparkClientImpl$ClientProtocol.class */
    public class ClientProtocol extends BaseProtocol {
        private ClientProtocol() {
        }

        <T extends Serializable> JobHandleImpl<T> submit(Job<T> job, List<JobHandle.Listener<T>> list) {
            final String uuid = UUID.randomUUID().toString();
            final Promise createPromise = SparkClientImpl.this.driverRpc.createPromise();
            final JobHandleImpl<T> jobHandleImpl = new JobHandleImpl<>(SparkClientImpl.this, createPromise, uuid, list);
            SparkClientImpl.this.jobs.put(uuid, jobHandleImpl);
            final Future<Void> call = SparkClientImpl.this.driverRpc.call(new BaseProtocol.JobRequest(uuid, job));
            SparkClientImpl.LOG.debug("Send JobRequest[{}].", uuid);
            call.addListener(new GenericFutureListener<Future<Void>>() { // from class: org.apache.hive.spark.client.SparkClientImpl.ClientProtocol.1
                public void operationComplete(Future<Void> future) {
                    if (future.isSuccess()) {
                        jobHandleImpl.changeState(JobHandle.State.QUEUED);
                    } else {
                        if (createPromise.isDone()) {
                            return;
                        }
                        createPromise.setFailure(future.cause());
                    }
                }
            });
            createPromise.addListener(new GenericFutureListener<Promise<T>>() { // from class: org.apache.hive.spark.client.SparkClientImpl.ClientProtocol.2
                public void operationComplete(Promise<T> promise) {
                    if (uuid != null) {
                        SparkClientImpl.this.jobs.remove(uuid);
                    }
                    if (!promise.isCancelled() || call.isDone()) {
                        return;
                    }
                    call.cancel(true);
                }
            });
            return jobHandleImpl;
        }

        <T extends Serializable> java.util.concurrent.Future<T> run(Job<T> job) {
            return SparkClientImpl.this.driverRpc.call(new BaseProtocol.SyncJobRequest(job), Serializable.class);
        }

        void cancel(String str) {
            SparkClientImpl.this.driverRpc.call(new BaseProtocol.CancelJob(str));
        }

        java.util.concurrent.Future<?> endSession() {
            return SparkClientImpl.this.driverRpc.call(new BaseProtocol.EndSession());
        }

        private void handle(ChannelHandlerContext channelHandlerContext, BaseProtocol.Error error) {
            SparkClientImpl.LOG.warn("Error reported from remote driver.", error.cause);
        }

        private void handle(ChannelHandlerContext channelHandlerContext, BaseProtocol.JobMetrics jobMetrics) {
            JobHandleImpl jobHandleImpl = (JobHandleImpl) SparkClientImpl.this.jobs.get(jobMetrics.jobId);
            if (jobHandleImpl != null) {
                jobHandleImpl.getMetrics().addMetrics(jobMetrics.sparkJobId, jobMetrics.stageId, jobMetrics.taskId, jobMetrics.metrics);
            } else {
                SparkClientImpl.LOG.warn("Received metrics for unknown job {}", jobMetrics.jobId);
            }
        }

        private void handle(ChannelHandlerContext channelHandlerContext, BaseProtocol.JobResult jobResult) {
            JobHandleImpl jobHandleImpl = (JobHandleImpl) SparkClientImpl.this.jobs.remove(jobResult.id);
            if (jobHandleImpl == null) {
                SparkClientImpl.LOG.warn("Received result for unknown job {}", jobResult.id);
                return;
            }
            SparkClientImpl.LOG.info("Received result for {}", jobResult.id);
            jobHandleImpl.setSparkCounters(jobResult.sparkCounters);
            SparkException sparkException = jobResult.error != null ? new SparkException(jobResult.error) : null;
            if (sparkException == null) {
                jobHandleImpl.setSuccess(jobResult.result);
            } else {
                jobHandleImpl.setFailure(sparkException);
            }
        }

        private void handle(ChannelHandlerContext channelHandlerContext, BaseProtocol.JobStarted jobStarted) {
            JobHandleImpl jobHandleImpl = (JobHandleImpl) SparkClientImpl.this.jobs.get(jobStarted.id);
            if (jobHandleImpl != null) {
                jobHandleImpl.changeState(JobHandle.State.STARTED);
            } else {
                SparkClientImpl.LOG.warn("Received event for unknown job {}", jobStarted.id);
            }
        }

        private void handle(ChannelHandlerContext channelHandlerContext, BaseProtocol.JobSubmitted jobSubmitted) {
            JobHandleImpl jobHandleImpl = (JobHandleImpl) SparkClientImpl.this.jobs.get(jobSubmitted.clientJobId);
            if (jobHandleImpl == null) {
                SparkClientImpl.LOG.warn("Received spark job ID: {} for unknown job {}", Integer.valueOf(jobSubmitted.sparkJobId), jobSubmitted.clientJobId);
            } else {
                SparkClientImpl.LOG.info("Received spark job ID: {} for {}", Integer.valueOf(jobSubmitted.sparkJobId), jobSubmitted.clientJobId);
                jobHandleImpl.addSparkJobId(jobSubmitted.sparkJobId);
            }
        }
    }

    /* loaded from: input_file:org/apache/hive/spark/client/SparkClientImpl$GetDefaultParallelismJob.class */
    private static class GetDefaultParallelismJob implements Job<Integer> {
        private static final long serialVersionUID = 1;

        private GetDefaultParallelismJob() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.hive.spark.client.Job
        public Integer call(JobContext jobContext) throws Exception {
            return Integer.valueOf(jobContext.sc().sc().defaultParallelism());
        }
    }

    /* loaded from: input_file:org/apache/hive/spark/client/SparkClientImpl$GetExecutorCountJob.class */
    private static class GetExecutorCountJob implements Job<Integer> {
        private static final long serialVersionUID = 1;

        private GetExecutorCountJob() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.hive.spark.client.Job
        public Integer call(JobContext jobContext) throws Exception {
            return Integer.valueOf(jobContext.sc().sc().getExecutorMemoryStatus().size() - 1);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hive/spark/client/SparkClientImpl$Redirector.class */
    public class Redirector implements Runnable {
        private final BufferedReader in;
        private List<String> errLogs;
        private int numErrLogLines = 0;

        Redirector(InputStream inputStream) {
            this.in = new BufferedReader(new InputStreamReader(inputStream));
        }

        Redirector(InputStream inputStream, List<String> list) {
            this.in = new BufferedReader(new InputStreamReader(inputStream));
            this.errLogs = list;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    String readLine = this.in.readLine();
                    if (readLine == null) {
                        return;
                    }
                    SparkClientImpl.LOG.info(readLine);
                    if (this.errLogs != null) {
                        int i = this.numErrLogLines;
                        this.numErrLogLines = i + 1;
                        if (i < 1000) {
                            this.errLogs.add(readLine);
                        }
                    }
                } catch (Exception e) {
                    SparkClientImpl.LOG.warn("Error in redirector thread.", e);
                    return;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SparkClientImpl(RpcServer rpcServer, Map<String, String> map, HiveConf hiveConf) throws IOException, SparkException {
        this.conf = map;
        this.hiveConf = hiveConf;
        String uuid = UUID.randomUUID().toString();
        String createSecret = rpcServer.createSecret();
        this.driverThread = startDriver(rpcServer, uuid, createSecret);
        this.protocol = new ClientProtocol();
        try {
            this.driverRpc = (Rpc) rpcServer.registerClient(uuid, createSecret, this.protocol).get();
            this.driverRpc.addListener(new Rpc.Listener() { // from class: org.apache.hive.spark.client.SparkClientImpl.1
                @Override // org.apache.hive.spark.client.rpc.Rpc.Listener
                public void rpcClosed(Rpc rpc) {
                    if (SparkClientImpl.this.isAlive) {
                        SparkClientImpl.LOG.warn("Client RPC channel closed unexpectedly.");
                        SparkClientImpl.this.isAlive = false;
                    }
                }
            });
            this.isAlive = true;
        } catch (Throwable th) {
            if (th.getCause() instanceof TimeoutException) {
                LOG.error("Timed out waiting for client to connect.\nPossible reasons include network issues, errors in remote driver or the cluster has no available resources, etc.\nPlease check YARN or Spark driver's logs for further information.", th);
            } else {
                LOG.error("Error while waiting for client to connect.", th);
            }
            this.driverThread.interrupt();
            try {
                this.driverThread.join();
            } catch (InterruptedException e) {
                LOG.debug("Interrupted before driver thread was finished.");
            }
            throw Throwables.propagate(th);
        }
    }

    @Override // org.apache.hive.spark.client.SparkClient
    public <T extends Serializable> JobHandle<T> submit(Job<T> job) {
        return this.protocol.submit(job, Collections.emptyList());
    }

    @Override // org.apache.hive.spark.client.SparkClient
    public <T extends Serializable> JobHandle<T> submit(Job<T> job, List<JobHandle.Listener<T>> list) {
        return this.protocol.submit(job, list);
    }

    @Override // org.apache.hive.spark.client.SparkClient
    public <T extends Serializable> java.util.concurrent.Future<T> run(Job<T> job) {
        return this.protocol.run(job);
    }

    @Override // org.apache.hive.spark.client.SparkClient
    public void stop() {
        if (this.isAlive) {
            this.isAlive = false;
            try {
                try {
                    this.protocol.endSession();
                    this.driverRpc.close();
                } catch (Exception e) {
                    LOG.warn("Exception while waiting for end session reply.", e);
                    this.driverRpc.close();
                }
            } catch (Throwable th) {
                this.driverRpc.close();
                throw th;
            }
        }
        long currentTimeMillis = System.currentTimeMillis() + DEFAULT_SHUTDOWN_TIMEOUT;
        try {
            this.driverThread.join(DEFAULT_SHUTDOWN_TIMEOUT);
        } catch (InterruptedException e2) {
            LOG.debug("Interrupted before driver thread was finished.");
        }
        if (currentTimeMillis - System.currentTimeMillis() <= 0) {
            LOG.warn("Timed out shutting down remote driver, interrupting...");
            this.driverThread.interrupt();
        }
    }

    @Override // org.apache.hive.spark.client.SparkClient
    public java.util.concurrent.Future<?> addJar(URI uri) {
        return run(new AddJarJob(uri.toString()));
    }

    @Override // org.apache.hive.spark.client.SparkClient
    public java.util.concurrent.Future<?> addFile(URI uri) {
        return run(new AddFileJob(uri.toString()));
    }

    @Override // org.apache.hive.spark.client.SparkClient
    public java.util.concurrent.Future<Integer> getExecutorCount() {
        return run(new GetExecutorCountJob());
    }

    @Override // org.apache.hive.spark.client.SparkClient
    public java.util.concurrent.Future<Integer> getDefaultParallelism() {
        return run(new GetDefaultParallelismJob());
    }

    @Override // org.apache.hive.spark.client.SparkClient
    public boolean isActive() {
        return this.isAlive && this.driverRpc.isActive();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void cancel(String str) {
        this.protocol.cancel(str);
    }

    private Thread startDriver(final RpcServer rpcServer, final String str, final String str2) throws IOException {
        Runnable runnable;
        final String address = rpcServer.getAddress();
        final String valueOf = String.valueOf(rpcServer.getPort());
        if (this.conf.containsKey("spark.client.do_not_use.run_driver_in_process")) {
            LOG.warn("!!!! Running remote driver in-process. !!!!");
            runnable = new Runnable() { // from class: org.apache.hive.spark.client.SparkClientImpl.2
                @Override // java.lang.Runnable
                public void run() {
                    ArrayList newArrayList = Lists.newArrayList();
                    newArrayList.add("--remote-host");
                    newArrayList.add(address);
                    newArrayList.add("--remote-port");
                    newArrayList.add(valueOf);
                    newArrayList.add("--client-id");
                    newArrayList.add(str);
                    newArrayList.add("--secret");
                    newArrayList.add(str2);
                    for (Map.Entry entry : SparkClientImpl.this.conf.entrySet()) {
                        newArrayList.add("--conf");
                        newArrayList.add(String.format("%s=%s", entry.getKey(), SparkClientImpl.this.conf.get(entry.getKey())));
                    }
                    try {
                        RemoteDriver.main((String[]) newArrayList.toArray(new String[newArrayList.size()]));
                    } catch (Exception e) {
                        SparkClientImpl.LOG.error("Error running driver.", e);
                    }
                }
            };
        } else {
            String str3 = this.conf.get(SPARK_HOME_KEY);
            if (str3 == null) {
                str3 = System.getenv(SPARK_HOME_ENV);
            }
            if (str3 == null) {
                str3 = System.getProperty(SPARK_HOME_KEY);
            }
            String str4 = this.conf.get("hive.spark.log.dir");
            if (str4 == null) {
                str4 = str3 == null ? "./target/" : str3 + "/logs/";
            }
            String nullToEmpty = Strings.nullToEmpty(System.getProperty(SystemUtil.OS_NAME)).toLowerCase().contains("mac") ? Strings.nullToEmpty(System.getenv(OSX_TEST_OPTS)) : "";
            String join = Joiner.on(" ").skipNulls().join("-Dhive.spark.log.dir=" + str4, nullToEmpty, this.conf.get(DRIVER_OPTS_KEY));
            String join2 = Joiner.on(" ").skipNulls().join("-Dhive.spark.log.dir=" + str4, nullToEmpty, this.conf.get(EXECUTOR_OPTS_KEY));
            File createTempFile = File.createTempFile("spark-submit.", ".properties");
            if (!createTempFile.setReadable(false) || !createTempFile.setReadable(true, true)) {
                throw new IOException("Cannot change permissions of job properties file.");
            }
            createTempFile.deleteOnExit();
            Properties properties = new Properties();
            try {
                URL resource = Thread.currentThread().getContextClassLoader().getResource("spark-defaults.conf");
                if (resource != null) {
                    LOG.info("Loading spark defaults: " + resource);
                    properties.load(new ByteArrayInputStream(Resources.toByteArray(resource)));
                }
                for (Map.Entry<String, String> entry : this.conf.entrySet()) {
                    properties.put(entry.getKey(), this.conf.get(entry.getKey()));
                }
                properties.put("spark.client.authentication.client_id", str);
                properties.put("spark.client.authentication.secret", str2);
                properties.put(DRIVER_OPTS_KEY, join);
                properties.put(EXECUTOR_OPTS_KEY, join2);
                String str5 = this.conf.get("spark.testing");
                if (str5 != null && str5.equalsIgnoreCase("true")) {
                    String nullToEmpty2 = Strings.nullToEmpty(System.getenv("HIVE_HADOOP_TEST_CLASSPATH"));
                    if (!nullToEmpty2.isEmpty()) {
                        String nullToEmpty3 = Strings.nullToEmpty((String) properties.get(DRIVER_EXTRA_CLASSPATH));
                        if (nullToEmpty3.isEmpty()) {
                            properties.put(DRIVER_EXTRA_CLASSPATH, nullToEmpty2);
                        } else {
                            properties.put(DRIVER_EXTRA_CLASSPATH, (nullToEmpty3.endsWith(File.pathSeparator) ? nullToEmpty3 : nullToEmpty3 + File.pathSeparator) + nullToEmpty2);
                        }
                        String nullToEmpty4 = Strings.nullToEmpty((String) properties.get(EXECUTOR_EXTRA_CLASSPATH));
                        if (nullToEmpty4.isEmpty()) {
                            properties.put(EXECUTOR_EXTRA_CLASSPATH, nullToEmpty2);
                        } else {
                            properties.put(EXECUTOR_EXTRA_CLASSPATH, (nullToEmpty4.endsWith(File.pathSeparator) ? nullToEmpty4 : nullToEmpty4 + File.pathSeparator) + nullToEmpty2);
                        }
                    }
                }
                OutputStreamWriter outputStreamWriter = new OutputStreamWriter(new FileOutputStream(createTempFile), Charsets.UTF_8);
                try {
                    properties.store(outputStreamWriter, "Spark Context configuration");
                    outputStreamWriter.close();
                    String str6 = this.conf.get("spark.master");
                    Preconditions.checkArgument(str6 != null, "spark.master is not defined.");
                    LinkedList newLinkedList = Lists.newLinkedList();
                    if (str3 != null) {
                        newLinkedList.add(new File(str3, "bin/spark-submit").getAbsolutePath());
                    } else {
                        LOG.info("No spark.home provided, calling SparkSubmit directly.");
                        newLinkedList.add(new File(System.getProperty(SystemUtil.JAVA_HOME), "bin/java").getAbsolutePath());
                        if (str6.startsWith("local") || str6.startsWith("mesos") || str6.endsWith("-client") || str6.startsWith("spark")) {
                            String str7 = this.conf.get("spark.driver.memory");
                            if (str7 != null) {
                                newLinkedList.add("-Xms" + str7);
                                newLinkedList.add("-Xmx" + str7);
                            }
                            String str8 = this.conf.get(DRIVER_EXTRA_CLASSPATH);
                            if (str8 != null) {
                                newLinkedList.add("-classpath");
                                newLinkedList.add(str8);
                            }
                            String str9 = this.conf.get("spark.driver.extraLibPath");
                            if (str9 != null) {
                                newLinkedList.add("-Djava.library.path=" + str9);
                            }
                            String str10 = this.conf.get(DRIVER_OPTS_KEY);
                            if (str10 != null) {
                                for (String str11 : str10.split("[ ]")) {
                                    if (!str11.trim().isEmpty()) {
                                        newLinkedList.add(str11.trim());
                                    }
                                }
                            }
                        }
                        newLinkedList.add("org.apache.spark.deploy.SparkSubmit");
                    }
                    if (str6.equals("yarn-cluster")) {
                        String str12 = this.conf.get("spark.executor.cores");
                        if (str12 != null) {
                            newLinkedList.add("--executor-cores");
                            newLinkedList.add(str12);
                        }
                        String str13 = this.conf.get("spark.executor.memory");
                        if (str13 != null) {
                            newLinkedList.add("--executor-memory");
                            newLinkedList.add(str13);
                        }
                        String str14 = this.conf.get("spark.executor.instances");
                        if (str14 != null) {
                            newLinkedList.add("--num-executors");
                            newLinkedList.add(str14);
                        }
                    }
                    if ("kerberos".equals(this.hiveConf.get("hadoop.security.authentication"))) {
                        String serverPrincipal = SecurityUtil.getServerPrincipal(this.hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL), "0.0.0.0");
                        String var = this.hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB);
                        if (StringUtils.isNotBlank(serverPrincipal) && StringUtils.isNotBlank(var)) {
                            if (this.hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) {
                                LinkedList newLinkedList2 = Lists.newLinkedList();
                                newLinkedList2.add("kinit");
                                newLinkedList2.add(serverPrincipal);
                                newLinkedList2.add("-k");
                                newLinkedList2.add("-t");
                                newLinkedList2.add(var + StringPool.SEMICOLON);
                                newLinkedList2.addAll(newLinkedList);
                                newLinkedList = newLinkedList2;
                            } else {
                                newLinkedList.add("--principal");
                                newLinkedList.add(serverPrincipal);
                                newLinkedList.add("--keytab");
                                newLinkedList.add(var);
                            }
                        }
                    }
                    if (this.hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) {
                        try {
                            String shortUserName = Utils.getUGI().getShortUserName();
                            if (!shortUserName.equals(System.getProperty(SystemUtil.USER_NAME))) {
                                LOG.info("Attempting impersonation of " + shortUserName);
                                newLinkedList.add("--proxy-user");
                                newLinkedList.add(shortUserName);
                            }
                        } catch (Exception e) {
                            throw new IllegalStateException("Cannot obtain username: " + e, e);
                        }
                    }
                    newLinkedList.add("--properties-file");
                    newLinkedList.add(createTempFile.getAbsolutePath());
                    newLinkedList.add("--class");
                    newLinkedList.add(RemoteDriver.class.getName());
                    newLinkedList.add(SparkContext.jarOfClass(getClass()).isDefined() ? (String) SparkContext.jarOfClass(getClass()).get() : "spark-internal");
                    newLinkedList.add("--remote-host");
                    newLinkedList.add(address);
                    newLinkedList.add("--remote-port");
                    newLinkedList.add(valueOf);
                    Iterator it = RpcConfiguration.HIVE_SPARK_RSC_CONFIGS.iterator();
                    while (it.hasNext()) {
                        String str15 = (String) it.next();
                        String value = RpcConfiguration.getValue(this.hiveConf, str15);
                        newLinkedList.add("--conf");
                        newLinkedList.add(String.format("%s=%s", str15, value));
                    }
                    String join3 = Joiner.on(" ").join((Iterable<?>) newLinkedList);
                    LOG.info("Running client driver with argv: {}", join3);
                    ProcessBuilder processBuilder = new ProcessBuilder("sh", "-c", join3);
                    processBuilder.environment().remove("HIVE_HOME");
                    processBuilder.environment().remove("HIVE_CONF_DIR");
                    String sparkJobCredentialProviderPassword = getSparkJobCredentialProviderPassword();
                    if (sparkJobCredentialProviderPassword != null) {
                        processBuilder.environment().put(Constants.HADOOP_CREDENTIAL_PASSWORD_ENVVAR, sparkJobCredentialProviderPassword);
                    }
                    if (str5 != null) {
                        processBuilder.environment().put("SPARK_TESTING", str5);
                    }
                    final Process start = processBuilder.start();
                    String name = Thread.currentThread().getName();
                    final List synchronizedList = Collections.synchronizedList(new ArrayList());
                    redirect("RemoteDriver-stdout-redir-" + name, new Redirector(start.getInputStream()));
                    redirect("RemoteDriver-stderr-redir-" + name, new Redirector(start.getErrorStream(), synchronizedList));
                    runnable = new Runnable() { // from class: org.apache.hive.spark.client.SparkClientImpl.3
                        @Override // java.lang.Runnable
                        public void run() {
                            try {
                                int waitFor = start.waitFor();
                                if (waitFor != 0) {
                                    StringBuilder sb = new StringBuilder();
                                    synchronized (synchronizedList) {
                                        Iterator it2 = synchronizedList.iterator();
                                        while (it2.hasNext()) {
                                            sb.append(it2.next());
                                            sb.append('\n');
                                        }
                                    }
                                    SparkClientImpl.LOG.warn("Child process exited with code {}", Integer.valueOf(waitFor));
                                    rpcServer.cancelClient(str, "Child process (spark-submit) exited before connecting back with error log " + sb.toString());
                                }
                            } catch (InterruptedException e2) {
                                SparkClientImpl.LOG.warn("Thread waiting on the child process (spark-submit) is interrupted, killing the child process.");
                                rpcServer.cancelClient(str, "Thread waiting on the child porcess (spark-submit) is interrupted");
                                Thread.interrupted();
                                start.destroy();
                            } catch (Exception e3) {
                                SparkClientImpl.LOG.warn("Exception while waiting for child process (spark-submit)", e3);
                                rpcServer.cancelClient(str, "Exception while waiting for child process (spark-submit)");
                            }
                        }
                    };
                } catch (Throwable th) {
                    outputStreamWriter.close();
                    throw th;
                }
            } catch (Exception e2) {
                throw new IOException("Exception trying to load spark-defaults.conf: " + e2, e2);
            }
        }
        Thread thread = new Thread(runnable);
        thread.setDaemon(true);
        thread.setName("Driver");
        thread.start();
        return thread;
    }

    private String getSparkJobCredentialProviderPassword() {
        if (this.conf.containsKey("spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD")) {
            return this.conf.get("spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD");
        }
        if (this.conf.containsKey("spark.executorEnv.HADOOP_CREDSTORE_PASSWORD")) {
            return this.conf.get("spark.executorEnv.HADOOP_CREDSTORE_PASSWORD");
        }
        return null;
    }

    private void redirect(String str, Redirector redirector) {
        Thread thread = new Thread(redirector);
        thread.setName(str);
        thread.setDaemon(true);
        thread.start();
    }
}
