/*
 * Decompiled with CFR 0.152.
 */
package org.talend.bigdata.launcher.cde;

import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.knox.gateway.shell.KnoxSession;
import org.apache.knox.gateway.shell.knox.token.Get;
import org.apache.knox.gateway.shell.knox.token.Token;
import org.apache.knox.gateway.util.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.talend.bigdata.launcher.Job;
import org.talend.bigdata.launcher.cde.LogType;
import org.talend.cde.api.JobRunsApi;
import org.talend.cde.api.JobsApi;
import org.talend.cde.api.ResourcesApi;
import org.talend.cde.invoker.ApiClient;
import org.talend.cde.invoker.ApiException;
import org.talend.cde.model.CommonFileResource;
import org.talend.cde.model.CommonJobCreateRequest;
import org.talend.cde.model.CommonResource;
import org.talend.cde.model.CommonResourceMountSpec;
import org.talend.cde.model.CommonResourceRequest;
import org.talend.cde.model.CommonRunDescribeResponse;
import org.talend.cde.model.CommonRunJobResponse;
import org.talend.cde.model.CommonRunRequest;
import org.talend.cde.model.CommonSparkAppSpec;

public class SparkJob
extends Job {
    private static Logger LOG = LoggerFactory.getLogger(SparkJob.class);
    private String className;
    private ApiClient client;
    private Map<String, String> conf;
    private Properties context;
    private Integer driverCores;
    private String driverMemory;
    private Integer executorCores;
    private String executorMemory;
    private List<String> jars;
    private String jobName;
    private String jarToExecute;
    private String logLevel;
    private Integer numExecutors;
    private boolean overrideDependencies;
    private Integer pollingInterval = 10000;
    private JobRunsApi jobRunsApi;
    private int runId;
    private List<LogType> logsToFetch;
    private List<String> arguments;

    public SparkJob(Builder builder) throws IOException, URISyntaxException {
        if (builder.generateToken) {
            builder.apiKey = this.acquireKnoxToken(builder.tokenEndpoint, builder.workloadUser, builder.workloadPassword);
        }
        this.client = new ApiClient();
        this.client.setApiKeyPrefix("Bearer");
        this.client.setApiKey(builder.apiKey);
        this.client.setDebugging(builder.debugging);
        this.client.setBasePath(builder.basePath);
        this.arguments = builder.arguments;
        this.className = builder.className;
        this.context = builder.context;
        this.jobName = builder.jobName;
        this.jars = builder.jars;
        this.jarToExecute = builder.jarToExecute;
        this.numExecutors = builder.numExecutors;
        this.conf = builder.conf;
        this.driverCores = builder.driverCores;
        this.driverMemory = builder.driverMemory;
        this.executorCores = builder.executorCores;
        this.executorMemory = builder.executorMemory;
        this.logLevel = builder.logLevel;
        this.logsToFetch = builder.logsToFetch;
        this.overrideDependencies = builder.overrideDependencies;
        this.pollingInterval = builder.pollingInterval;
        this.jobRunsApi = new JobRunsApi(this.client);
    }

    public int executeJob() throws Exception {
        JobsApi jobsApi;
        block13: {
            ResourcesApi resourcesApi = new ResourcesApi(this.client);
            jobsApi = new JobsApi(this.client);
            boolean resourceExists = true;
            CommonResource resource = null;
            try {
                resource = resourcesApi.getResource(this.jobName, Boolean.valueOf(true));
            }
            catch (ApiException ae) {
                if (404 == ae.getCode()) {
                    resourceExists = false;
                }
                if (401 == ae.getCode()) {
                    LOG.error(ae.getMessage());
                    return 1;
                }
                throw ae;
            }
            if (!resourceExists) {
                CommonResourceRequest resourceRequest = new CommonResourceRequest();
                resourceRequest.setName(this.jobName);
                resourcesApi.createResource(resourceRequest);
            }
            List<Object> resourceFiles = new ArrayList();
            if (resourceExists && resource.getFiles() != null) {
                resourceFiles = resource.getFiles().stream().map(CommonFileResource::getPath).collect(Collectors.toList());
            }
            for (String string : this.jars) {
                String jarName = Paths.get(string, new String[0]).getFileName().toString();
                if (!this.overrideDependencies && !string.equals(this.jarToExecute) && resourceFiles.contains(jarName)) continue;
                LOG.info("Deploying " + string);
                resourcesApi.putResourceFile(this.jobName, jarName, new File(string));
            }
            try {
                jobsApi.deleteJob(this.jobName);
            }
            catch (ApiException ae) {
                if (404 == ae.getCode()) break block13;
                throw ae;
            }
        }
        CommonSparkAppSpec sparkAppSpec = new CommonSparkAppSpec().file(Paths.get(this.jarToExecute, new String[0]).getFileName().toString()).className(this.className).addArgsItem("-calledByCDE").jars(this.jars.stream().map(j -> Paths.get(j, new String[0]).getFileName().toString()).collect(Collectors.toList())).numExecutors(this.numExecutors).conf(this.conf).driverCores(this.driverCores).driverMemory(this.driverMemory).executorCores(this.executorCores).executorMemory(this.executorMemory).logLevel(this.logLevel);
        if (this.context != null && this.context.stringPropertyNames() != null) {
            for (String contextParam : this.context.stringPropertyNames()) {
                sparkAppSpec = sparkAppSpec.addArgsItem("--context_param " + contextParam + "=" + this.context.getProperty(contextParam));
            }
        }
        if (this.arguments != null) {
            for (String argsItem : this.arguments) {
                sparkAppSpec = sparkAppSpec.addArgsItem(argsItem);
            }
        }
        CommonJobCreateRequest commonJobCreateRequest = new CommonJobCreateRequest().name(this.jobName).type("spark").mounts(Arrays.asList(new CommonResourceMountSpec().resourceName(this.jobName))).spark(sparkAppSpec);
        jobsApi.createJob(commonJobCreateRequest);
        CommonRunRequest runJobRequest = new CommonRunRequest();
        CommonRunJobResponse runJobResponse = jobsApi.runJob(this.jobName, runJobRequest);
        this.runId = runJobResponse.getId();
        LOG.info("Starting job (Run ID : " + this.runId + ")");
        int returnCode = this.awaitEnd();
        return returnCode;
    }

    public int awaitEnd() throws ApiException, InterruptedException {
        CommonRunDescribeResponse jobRunResponse = null;
        String currentStatus = JobRunStatus.starting.toString();
        while (!JobRunStatus.succeeded.toString().equals(currentStatus) && !JobRunStatus.failed.toString().equals(currentStatus)) {
            jobRunResponse = this.jobRunsApi.getJobRun(Integer.valueOf(this.runId));
            currentStatus = jobRunResponse.getStatus();
            LOG.info("Status : " + currentStatus);
            Thread.sleep(this.pollingInterval.intValue());
        }
        LOG.info("Spark application url : " + jobRunResponse.getSpark().getSparkAppURL());
        this.fetchAndDisplayLogs(this.logsToFetch, JobRunStatus.valueOf(currentStatus));
        return JobRunStatus.succeeded.toString().equals(currentStatus) ? 0 : 1;
    }

    private String acquireKnoxToken(String tokenEndpoint, String workloadUser, String workloadPassword) throws IOException, URISyntaxException {
        KnoxSession knoxSession = KnoxSession.login((String)tokenEndpoint, (String)workloadUser, (String)workloadPassword);
        Get.Response getTokenResponse = (Get.Response)Token.get((KnoxSession)knoxSession).now();
        Map tokenAsMap = JsonUtils.getMapFromJsonString((String)getTokenResponse.getString());
        return (String)tokenAsMap.get("access_token");
    }

    public void fetchAndDisplayLogs(List<LogType> logsToFetch, JobRunStatus jobRunStatus) throws ApiException, InterruptedException {
        if (logsToFetch == null || logsToFetch.isEmpty()) {
            return;
        }
        for (LogType logType : logsToFetch) {
            if (jobRunStatus.equals((Object)JobRunStatus.failed) && logType.equals((Object)LogType.DRIVER_STDOUT)) continue;
            Collection logs = null;
            while (logs == null) {
                LOG.info("Fetching logs " + logType.getValue());
                Thread.sleep(this.pollingInterval.intValue());
                logs = this.jobRunsApi.getJobRunLogs(Integer.valueOf(this.runId), logType.getValue(), null, null, null);
            }
            logs.stream().forEach(System.out::println);
        }
    }

    public void cancelJob() throws Exception {
        LOG.info("Killing job");
        this.jobRunsApi.killJobRun(Integer.valueOf(this.runId));
    }

    public static class Builder {
        protected String apiKey;
        protected List<String> arguments;
        protected String basePath;
        protected String className;
        protected Map<String, String> conf;
        protected Properties context;
        protected boolean debugging;
        protected Integer driverCores;
        protected String driverMemory;
        protected String executorMemory;
        protected Integer executorCores;
        protected boolean generateToken;
        protected String jarToExecute;
        protected List<String> jars;
        protected String jobName;
        protected String logLevel;
        protected List<LogType> logsToFetch;
        protected Integer numExecutors;
        protected boolean overrideDependencies;
        protected Integer pollingInterval;
        protected String tokenEndpoint;
        protected String workloadUser;
        protected String workloadPassword;

        public Builder withApiKey(String apiKey) {
            this.apiKey = apiKey;
            return this;
        }

        public Builder withArgument(String argument) {
            if (this.arguments == null) {
                this.arguments = new ArrayList<String>();
            }
            this.arguments.add(argument);
            return this;
        }

        public Builder withBasePath(String basePath) {
            this.basePath = basePath;
            return this;
        }

        public Builder withClassName(String className) {
            this.className = className;
            return this;
        }

        public Builder withConf(Map<String, String> conf) {
            this.conf = conf;
            return this;
        }

        public Builder withContext(Properties context) {
            this.context = context;
            return this;
        }

        public Builder withDebugging(boolean debugging) {
            this.debugging = debugging;
            return this;
        }

        public Builder withDriverCores(Integer driverCores) {
            this.driverCores = driverCores;
            return this;
        }

        public Builder withDriverMemory(String driverMemory) {
            this.driverMemory = driverMemory;
            return this;
        }

        public Builder withExecutorCores(Integer executorCores) {
            this.executorCores = executorCores;
            return this;
        }

        public Builder withExecutorMemory(String executorMemory) {
            this.executorMemory = executorMemory;
            return this;
        }

        public Builder withFetchLog(LogType logType) {
            if (this.logsToFetch == null) {
                this.logsToFetch = new ArrayList<LogType>();
            }
            this.logsToFetch.add(logType);
            return this;
        }

        public Builder withGenerateToken(boolean generateToken) {
            this.generateToken = generateToken;
            return this;
        }

        public Builder withJars(List<String> jars) {
            this.jars = jars;
            return this;
        }

        public Builder withJarToExecute(String jarToExecute) {
            this.jarToExecute = jarToExecute;
            return this;
        }

        public Builder withJobName(String jobName) {
            this.jobName = jobName;
            return this;
        }

        public Builder withLogLevel(String logLevel) {
            this.logLevel = logLevel;
            return this;
        }

        public Builder withNumExecutors(Integer numExecutors) {
            this.numExecutors = numExecutors;
            return this;
        }

        public Builder withOverrideDependencies(boolean overrideDependencies) {
            this.overrideDependencies = overrideDependencies;
            return this;
        }

        public Builder withPollingInterval(Integer pollingInterval) {
            this.pollingInterval = pollingInterval;
            return this;
        }

        public Builder withTokenEndpoint(String tokenEndpoint) {
            this.tokenEndpoint = tokenEndpoint;
            return this;
        }

        public Builder withWorkloadUser(String workloadUser) {
            this.workloadUser = workloadUser;
            return this;
        }

        public Builder withWorkloadPassword(String workloadPassword) {
            this.workloadPassword = workloadPassword;
            return this;
        }

        public SparkJob build() throws IOException, URISyntaxException {
            SparkJob job = new SparkJob(this);
            return job;
        }
    }

    public static enum JobRunStatus {
        starting,
        running,
        succeeded,
        failed;

    }
}

