/*
 * Decompiled with CFR 0.152.
 */
package org.talend.bigdata.launcher.livy;

import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.security.InvalidKeyException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.talend.bigdata.http.HttpException;
import org.talend.bigdata.http.HttpRequestInterceptor;
import org.talend.bigdata.http.HttpResponse;
import org.talend.bigdata.http.client.config.RequestConfig;
import org.talend.bigdata.http.impl.client.CloseableHttpClient;
import org.talend.bigdata.http.impl.client.HttpClientBuilder;
import org.talend.bigdata.http.message.BasicHeader;
import org.talend.bigdata.jackson.core.JsonProcessingException;
import org.talend.bigdata.jackson.databind.JsonNode;
import org.talend.bigdata.jackson.databind.ObjectMapper;
import org.talend.bigdata.jackson.databind.node.ArrayNode;
import org.talend.bigdata.jackson.databind.node.ObjectNode;
import org.talend.bigdata.launcher.EndpointDescriptor;
import org.talend.bigdata.launcher.Job;
import org.talend.bigdata.launcher.fs.FileSystem;
import org.talend.bigdata.launcher.livy.SparkJobRequest;
import org.talend.bigdata.launcher.utils.Utils;

public class SparkJob
extends Job {
    protected static final String SUCCESS = "success";
    protected static final String ERROR = "error";
    protected static final String DEAD = "dead";
    protected static final String CANCELLED = "cancelled";
    private static final Logger LOGGER = LoggerFactory.getLogger(SparkJob.class);
    protected FileSystem mFileSystem;
    protected String mToken;
    protected String mRemoteFolder;
    protected String mLivyEndpoint;
    protected String userAgent;
    protected CloseableHttpClient httpClient;
    protected String mLibjars;
    protected String mSparkPoolsName;
    protected Map<String, String> mConf;
    protected String mAppName;
    protected String mJarToExecute;
    protected String mClassToExecute;
    protected String mDriverMemory;
    protected String mExecutorMemory;
    protected Integer mDriverCore;
    protected Integer mExecutorCore;
    protected Integer mNumOfExecutors;
    protected List<String> mArgs;
    protected int returnCode;
    protected StringBuilder log;
    protected Integer pollingIntervalDuration;
    protected Integer maxMissingStatuses;
    protected Long jobId;

    private SparkJob(FileSystem fs, String token, String userAgent, String remoteFolder, String livyEndpoint, String libJars, String sparkPoolsName, String classToExecute, String jarToExecute, Map<String, String> conf, String appName, String driverMemory, String executorMemory, Integer driverCore, Integer executorCore, List<String> args, Integer pollingIntervalDuration, Integer maxMissingStatuses) {
        this(fs, token, userAgent, remoteFolder, livyEndpoint, libJars, sparkPoolsName, classToExecute, jarToExecute, conf, appName, driverMemory, executorMemory, driverCore, executorCore, args, pollingIntervalDuration, maxMissingStatuses, 15000, 15000);
    }

    private SparkJob(FileSystem fs, String token, String userAgent, String remoteFolder, String livyEndpoint, String libJars, String sparkPoolsName, String classToExecute, String jarToExecute, Map<String, String> conf, String appName, String driverMemory, String executorMemory, Integer driverCore, Integer executorCore, List<String> args, Integer pollingIntervalDuration, Integer maxMissingStatuses, int connectionTimeout, int socketTimeout) {
        this.mClassToExecute = classToExecute;
        this.userAgent = userAgent;
        this.mFileSystem = fs;
        this.mToken = token;
        this.mJarToExecute = jarToExecute;
        this.mLibjars = libJars;
        this.mLivyEndpoint = livyEndpoint;
        this.mRemoteFolder = remoteFolder;
        this.mSparkPoolsName = sparkPoolsName;
        this.mConf = conf;
        this.mAppName = appName;
        this.mDriverCore = driverCore;
        this.mDriverMemory = driverMemory;
        this.mExecutorCore = executorCore;
        this.mExecutorMemory = executorMemory;
        this.mArgs = args;
        this.pollingIntervalDuration = pollingIntervalDuration;
        this.maxMissingStatuses = maxMissingStatuses;
        this.returnCode = -1;
        this.httpClient = this.createHttpClient(connectionTimeout, socketTimeout);
    }

    private CloseableHttpClient createHttpClient(int connectionTimeout, int socketTimeout) {
        HttpRequestInterceptor logInterceptor = (request, context) -> {
            LOGGER.info("###");
            LOGGER.info(request.getRequestLine().toString());
            LOGGER.debug("");
            Arrays.stream(request.getAllHeaders()).map(Object::toString).filter(s -> !s.contains("Authorization")).forEach(arg_0 -> ((Logger)LOGGER).debug(arg_0));
            LOGGER.debug("");
            LOGGER.debug("###");
        };
        RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(connectionTimeout).setSocketTimeout(socketTimeout).build();
        return HttpClientBuilder.create().addInterceptorLast(logInterceptor).disableContentCompression().disableAutomaticRetries().setUserAgent(this.userAgent).setDefaultHeaders(Arrays.asList(new BasicHeader("Accept", "*/*"), new BasicHeader("Cache-Control", "no-cache"), new BasicHeader("Pragma", "no-cache"), new BasicHeader("Content-Type", "application/json"))).setDefaultRequestConfig(requestConfig).build();
    }

    private int getReturnCode(String state) {
        if (SUCCESS.equals(state)) {
            return 0;
        }
        return 1;
    }

    private boolean isJobDone(String state) {
        return SUCCESS.equals(state) || CANCELLED.equals(state) || ERROR.equals(state) || DEAD.equals(state);
    }

    private void checkResponse(EndpointDescriptor request, HttpResponse response) throws HttpException {
        this.checkResponse(request, response, 200);
    }

    private void checkResponse(EndpointDescriptor request, HttpResponse response, int httpCode) throws HttpException {
        if (response.getStatusLine().getStatusCode() != httpCode) {
            throw new HttpException("Error when calling " + request.getFullEndpoint() + ": " + response.getStatusLine());
        }
    }

    private void checkResponse(EndpointDescriptor request, HttpResponse response, List<Integer> httpCodes) throws HttpException {
        if (httpCodes == null || !httpCodes.contains(response.getStatusLine().getStatusCode())) {
            throw new HttpException("Error when calling " + request.getFullEndpoint() + ": " + response.getStatusLine());
        }
    }

    protected String createExecuteBody(ObjectMapper mapper, String remoteLibJars) throws JsonProcessingException {
        ObjectNode jobj = mapper.createObjectNode();
        if (this.mFileSystem != null && this.mJarToExecute != null) {
            StringBuilder buffer = new StringBuilder();
            buffer.append(this.mFileSystem.getFileSystemPrefix());
            buffer.append("/");
            buffer.append(this.mRemoteFolder);
            buffer.append("/jar/");
            buffer.append(new File(this.mJarToExecute).getName());
            String path = buffer.toString();
            jobj.put("file", path);
        }
        jobj.put("className", this.mClassToExecute);
        ArrayNode args = mapper.createArrayNode();
        args.add("-calledByLivy");
        if (this.mArgs != null && this.mArgs.size() > 0) {
            for (String string : this.mArgs) {
                args.add(string);
            }
        }
        jobj.set("args", args);
        ObjectNode conf = mapper.createObjectNode();
        if (this.mConf != null) {
            for (Map.Entry<String, String> entry : this.mConf.entrySet()) {
                conf.put(entry.getKey(), entry.getValue());
            }
        }
        jobj.set("conf", conf);
        if (remoteLibJars != null) {
            ArrayNode arrayNode = mapper.createArrayNode();
            for (String jar : remoteLibJars.split(",")) {
                if (jar.contains("log4j-1.2-api")) continue;
                arrayNode.add(jar);
            }
            jobj.set("jars", arrayNode);
        }
        jobj.put("name", this.mAppName);
        if (this.mDriverCore != null) {
            jobj.put("driverCores", this.mDriverCore);
        }
        if (this.mExecutorCore != null) {
            jobj.put("executorCores", this.mExecutorCore);
        }
        if (this.mDriverMemory != null) {
            jobj.put("driverMemory", this.mDriverMemory);
        }
        if (this.mExecutorMemory != null) {
            jobj.put("executorMemory", this.mExecutorMemory);
        }
        jobj.put("NumExecutors", "3");
        return mapper.writeValueAsString(jobj);
    }

    private Long start(String remoteLibJars) throws HttpException, IOException {
        ObjectMapper mapper = new ObjectMapper();
        String body = this.createExecuteBody(mapper, remoteLibJars);
        LOGGER.info("REQUEST BODY: " + body);
        SparkJobRequest request = SparkJobRequest.run(this.mLivyEndpoint, this.mSparkPoolsName, this.mToken, body).withHttpClient(this.httpClient);
        LOGGER.debug("Send request : " + request.toString());
        HttpResponse response = request.execute();
        this.checkResponse((EndpointDescriptor)request, response, 200);
        JsonNode result = mapper.readTree(response.getEntity().getContent());
        return result.get("id").asLong();
    }

    private void cancel(Long jobId) throws HttpException, IOException {
        HttpResponse response;
        ObjectMapper mapper = new ObjectMapper();
        SparkJobRequest request = SparkJobRequest.cancel(this.mLivyEndpoint, this.mSparkPoolsName, jobId, this.mToken).withHttpClient(this.httpClient);
        LOGGER.debug("Cancel request : " + request.toString());
        long authorizedMissingCalls = this.maxMissingStatuses.intValue();
        do {
            try {
                response = request.execute();
                this.checkResponse((EndpointDescriptor)request, response, Arrays.asList(200, 201, 202));
            }
            catch (IOException e) {
                --authorizedMissingCalls;
                throw new HttpException("Error when calling " + request.getFullEndpoint(), e);
            }
        } while (response == null && authorizedMissingCalls > 0L);
    }

    public String sendFiles() throws IOException, InvalidKeyException, URISyntaxException {
        return Utils.sendFiles(this.mRemoteFolder, null, this.mFileSystem, this.mJarToExecute, this.mLibjars);
    }

    private int wait(Long jobId) throws InterruptedException, HttpException, IOException {
        ObjectMapper mapper = new ObjectMapper();
        SparkJobRequest request = SparkJobRequest.get(this.mLivyEndpoint, this.mSparkPoolsName, jobId, this.mToken).withHttpClient(this.httpClient);
        LOGGER.debug("Waiting for response from : " + request.toString());
        String state = ERROR;
        long authorizedMissingCalls = this.maxMissingStatuses.intValue();
        do {
            Thread.sleep(this.pollingIntervalDuration.intValue());
            try {
                HttpResponse response = request.execute();
                this.checkResponse(request, response);
                state = mapper.readTree(response.getEntity().getContent()).get("state").asText();
                authorizedMissingCalls = this.maxMissingStatuses.intValue();
            }
            catch (IOException e) {
                if (--authorizedMissingCalls == 0L) {
                    throw new IOException("Failed to get status of job. Error when calling " + request.getFullEndpoint(), e);
                }
                LOGGER.warn("Failed to get status of job. retrying.");
            }
        } while (!this.isJobDone(state) && authorizedMissingCalls > 0L);
        return this.getReturnCode(state);
    }

    private StringBuilder generateLog(Long jobId) throws HttpException {
        ObjectMapper mapper = new ObjectMapper();
        SparkJobRequest request = SparkJobRequest.get(this.mLivyEndpoint, this.mSparkPoolsName, jobId, this.mToken).withHttpClient(this.httpClient);
        try {
            HttpResponse response = request.execute();
            this.checkResponse(request, response);
            JsonNode json = mapper.readTree(response.getEntity().getContent());
            this.log = new StringBuilder(json.get("log").asText(""));
            return this.log;
        }
        catch (IOException e) {
            throw new HttpException("Error when calling " + request.getFullEndpoint(), e);
        }
    }

    @Override
    public int executeJob() throws Exception {
        String remoteLibJars = this.sendFiles();
        this.jobId = this.start(remoteLibJars);
        this.returnCode = this.wait(this.jobId);
        this.log = this.generateLog(this.jobId);
        return this.getReturnCode();
    }

    @Override
    public void cancelJob() throws Exception {
        for (long max = (long)this.maxMissingStatuses.intValue(); this.jobId == null && max > 0L; --max) {
            Thread.sleep(this.pollingIntervalDuration.intValue());
        }
        if (this.jobId != null) {
            this.cancel(this.jobId);
        } else {
            LOGGER.error("Can not cancel the job as it seems to not have started (no Job Id)");
        }
    }

    public int getReturnCode() {
        return this.returnCode;
    }

    public StringBuilder getJobLog() {
        return this.log;
    }

    public static class Builder {
        protected FileSystem fileSystem;
        protected String token;
        protected String remoteFolder;
        protected String livyEndpoint;
        protected String libJars;
        protected String sparkPoolsName;
        protected String jarToExecute;
        protected String classToExecute;
        protected String driverMemory;
        protected String executorMemory;
        protected Integer driverCore;
        protected Integer executorCore;
        protected List<String> arguments;
        protected Map<String, String> configuration;
        protected String applicationName;
        protected Integer intervalPollingDuration;
        protected Integer maxMissingStatuses;
        protected Integer socketTimeout = 15000;
        protected Integer connectionTimeout = 15000;
        protected String userAgent = "Talend Studio";

        public Builder withFileSystem(FileSystem fs) {
            this.fileSystem = fs;
            return this;
        }

        public Builder withAccessToken(String token) {
            this.token = token;
            return this;
        }

        public Builder withJarToExecute(String jarToExecute) {
            this.jarToExecute = jarToExecute;
            return this;
        }

        public Builder withClassToExecute(String classToExecute) {
            this.classToExecute = classToExecute;
            return this;
        }

        public Builder withLivyEndpoint(String endpoint) {
            this.livyEndpoint = endpoint;
            return this;
        }

        public Builder withIntervalPollingDuration(Integer intervalDuration) {
            this.intervalPollingDuration = intervalDuration;
            return this;
        }

        public Builder withIntervalPollingDuration(String intervalDuration) {
            this.intervalPollingDuration = Integer.parseInt(intervalDuration);
            return this;
        }

        public Builder withMaxMissingStatuses(Integer maxMissingStatuses) {
            this.maxMissingStatuses = maxMissingStatuses;
            return this;
        }

        public Builder withMaxMissingStatuses(String maxMissingStatuses) {
            this.maxMissingStatuses = Integer.parseInt(maxMissingStatuses);
            return this;
        }

        public Builder withConnectionTimeout(String connectionTimeout) {
            this.connectionTimeout = Integer.parseInt(connectionTimeout);
            return this;
        }

        public Builder withSocketTimeout(String socketTimeout) {
            this.socketTimeout = Integer.parseInt(socketTimeout);
            return this;
        }

        public Builder withUserAgent(String userAgent) {
            this.userAgent = userAgent;
            return this;
        }

        public Builder withRemoteFolder(String remoteFolder) {
            this.remoteFolder = remoteFolder;
            return this;
        }

        public Builder withSparkPools(String sparkPoolsName) {
            this.sparkPoolsName = sparkPoolsName;
            return this;
        }

        public Builder withLibJars(String libJars) {
            this.libJars = libJars;
            return this;
        }

        public Builder withAppName(String appName) {
            this.applicationName = appName;
            return this;
        }

        public Builder withDriverMemory(String memory) {
            if (memory != null) {
                this.driverMemory = memory;
            }
            return this;
        }

        public Builder withExecutorMemory(String memory) {
            if (memory != null) {
                this.executorMemory = memory;
            }
            return this;
        }

        public Builder withDriverCore(String core) {
            if (core != null) {
                this.driverCore = Integer.parseInt(core);
            }
            return this;
        }

        public Builder withExecutorCore(String core) {
            if (core != null) {
                this.executorCore = Integer.parseInt(core);
            }
            return this;
        }

        public Builder withConf(Map<String, String> conf) {
            this.configuration = conf;
            return this;
        }

        public Builder withArgs(List<String> args) {
            this.arguments = args;
            return this;
        }

        public SparkJob build() {
            SparkJob result = new SparkJob(this.fileSystem, this.token, this.userAgent, this.remoteFolder, this.livyEndpoint, this.libJars, this.sparkPoolsName, this.classToExecute, this.jarToExecute, this.configuration, this.applicationName, this.driverMemory, this.executorMemory, this.driverCore, this.executorCore, this.arguments, this.intervalPollingDuration, this.maxMissingStatuses, this.connectionTimeout, this.socketTimeout);
            return result;
        }
    }
}

