/*
 * Decompiled with CFR 0.152.
 */
package org.talend.bigdata.launcher.databricks;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.ws.rs.core.Response;
import org.apache.cxf.jaxrs.client.WebClient;
import org.apache.log4j.Logger;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.talend.bigdata.launcher.Job;
import org.talend.bigdata.launcher.databricks.DatabricksCluster;
import org.talend.bigdata.launcher.databricks.Utils;
import org.talend.bigdata.launcher.databricks.api.jobs.Endpoints;
import org.talend.bigdata.launcher.fs.DatabricksFileSystem;
import org.talend.bigdata.launcher.utils.BigDataLauncherException;

public abstract class DatabricksJob
extends Job {
    protected String mJarToExecute;
    protected String mClassToExecute;
    protected String mAppName;
    protected String mEndpoint;
    protected String mRunId;
    protected String mJobId;
    protected Map<String, String> mConf;
    protected String mToken;
    protected String mClusterId;
    protected Map<String, String> mTuningConf;
    protected Long mExitCode;
    protected Exception mException;
    protected DatabricksFileSystem mFileSystem;
    protected String mFilePath;
    protected String mLibJars;
    protected List<String> mLibraries;
    protected WebClient mClient;
    protected List<String> mArgs;
    protected String mUserAgent;
    protected DatabricksCluster transientCluster;
    protected RunLifeCycleState lifeCycleState;
    protected boolean production;
    protected long msBeforeRequest;
    private static Logger LOG = Logger.getLogger(DatabricksJob.class);

    protected void resetClientForEndpoint(Endpoints apiEndpoint) {
        if (this.mClient == null) {
            this.mClient = Utils.createClient(this.mEndpoint);
        } else {
            this.mClient.reset();
        }
        Utils.resetClientForEndpoint(this.mClient, this.mToken, apiEndpoint.getAPIPath(), this.mUserAgent);
    }

    private void uploadJars() {
        this.mFileSystem.updateDatabricksJarList(this.mFilePath);
        this.mLibraries = new ArrayList<String>();
        for (String jar : this.mLibJars.split(",")) {
            File jarFile = new File(jar);
            if (this.mFileSystem.notExistsOrIsDifferent(jar, this.mFilePath + jarFile.getName())) {
                this.mFileSystem.copyFromLocal(jar, this.mFilePath + jarFile.getName());
                LOG.debug((Object)("Upload of " + jar + " to dbfs:" + this.mFilePath + jarFile.getName()));
            } else {
                LOG.debug((Object)("Skip upload of " + jar + " because it is already present and of the same size on DBFS"));
            }
            this.mLibraries.add("dbfs:" + this.mFilePath + jarFile.getName());
        }
    }

    private JSONObject createJobObject() {
        JSONObject body = new JSONObject();
        body.put((Object)"run_name", (Object)this.mAppName);
        body.put((Object)"name", (Object)this.mAppName);
        if (!this.useTransientCuster()) {
            body.put((Object)"existing_cluster_id", (Object)this.mClusterId);
        } else {
            body.put((Object)"new_cluster", (Object)this.transientCluster);
        }
        JSONObject spark_jar_body = new JSONObject();
        spark_jar_body.put((Object)"main_class_name", (Object)this.mClassToExecute);
        if (this.mArgs != null && this.mArgs.size() > 0) {
            JSONArray spark_jar_params = new JSONArray();
            spark_jar_params.addAll(this.mArgs);
            spark_jar_params.add((Object)"-calledByDatabricks");
            spark_jar_body.put((Object)"parameters", (Object)spark_jar_params);
        }
        body.put((Object)"spark_jar_task", (Object)spark_jar_body);
        if (this.mLibraries != null && this.mLibraries.size() > 0) {
            JSONArray libraries_body = new JSONArray();
            for (String lib : this.mLibraries) {
                JSONObject lib_obj = new JSONObject();
                lib_obj.put((Object)"jar", (Object)lib);
                libraries_body.add((Object)lib_obj);
            }
            body.put((Object)"libraries", (Object)libraries_body);
        }
        return body;
    }

    private void createJobAndRun() throws ParseException {
        this.resetClientForEndpoint(Endpoints.CREATE);
        this.mClient.type("application/json");
        JSONObject body = this.createJobObject();
        LOG.debug((Object)("Content sent to " + Endpoints.CREATE.getAPIPath() + " : " + body.toJSONString()));
        LOG.debug((Object)("With headers : " + this.mClient.getHeaders().toString()));
        Response response = this.mClient.post((Object)body.toJSONString());
        int status = response.getStatus();
        if (status != 200) {
            throw new BigDataLauncherException("Status " + status + " for " + Endpoints.CREATE.getAPIPath() + " : " + (String)response.readEntity(String.class));
        }
        JSONObject result = (JSONObject)new JSONParser().parse((String)response.readEntity(String.class));
        this.mJobId = String.valueOf(result.get((Object)"job_id"));
        LOG.info((Object)("Job created with id : " + this.mJobId));
        this.resetClientForEndpoint(Endpoints.RUN_NOW);
        this.mClient.type("application/json");
        body = new JSONObject();
        body.put((Object)"job_id", (Object)this.mJobId);
        LOG.debug((Object)("Content sent to " + Endpoints.RUN_NOW.getAPIPath() + " : " + body.toJSONString()));
        LOG.debug((Object)("With headers : " + this.mClient.getHeaders().toString()));
        response = this.mClient.post((Object)body.toJSONString());
        status = response.getStatus();
        result = (JSONObject)new JSONParser().parse((String)response.readEntity(String.class));
        this.mRunId = String.valueOf(result.get((Object)"run_id"));
        LOG.info((Object)("Run started with id : " + this.mRunId));
    }

    private int restartCluster() {
        this.resetClientForEndpoint(Endpoints.RESTART_CLUSTER);
        this.mClient.type("application/json");
        JSONObject body = new JSONObject();
        body.put((Object)"cluster_id", (Object)this.mClusterId);
        LOG.debug((Object)("Content sent to " + Endpoints.RESTART_CLUSTER.getAPIPath() + " : " + body.toJSONString()));
        LOG.debug((Object)("With headers : " + this.mClient.getHeaders().toString()));
        Response response = this.mClient.post((Object)body.toJSONString());
        int status = response.getStatus();
        if (status != 200) {
            LOG.error((Object)("Status " + status + " for " + Endpoints.RESTART_CLUSTER.getAPIPath()));
            throw new BigDataLauncherException("Status " + status + " for " + Endpoints.RESTART_CLUSTER.getAPIPath() + " : " + (String)response.readEntity(String.class));
        }
        LOG.info((Object)("Cluster restarted with id : " + this.mClusterId));
        return status;
    }

    private int getReturnCodeFromState(RunLifeCycleState lifeCycleState, RunResultState resultState) {
        if (lifeCycleState.compareTo(RunLifeCycleState.TERMINATED) == 0 && resultState.compareTo(RunResultState.SUCCESS) == 0) {
            return 0;
        }
        LOG.error((Object)("Run terminated with status " + lifeCycleState.name() + " and result " + resultState.name() + "\nCheck logs on Spark UI for more information"));
        return 1;
    }

    private boolean isJobDone(RunLifeCycleState state) {
        return state.compareTo(RunLifeCycleState.TERMINATED) == 0 || state.compareTo(RunLifeCycleState.SKIPPED) == 0 || state.compareTo(RunLifeCycleState.INTERNAL_ERROR) == 0;
    }

    private int awaitEnd() throws InterruptedException, ParseException {
        this.resetClientForEndpoint(Endpoints.RUNS_GET);
        this.mClient.type("application/json");
        this.mClient.query("run_id", new Object[]{this.mRunId});
        JSONObject response = (JSONObject)((JSONObject)new JSONParser().parse((String)this.mClient.get(String.class))).get((Object)"state");
        this.lifeCycleState = RunLifeCycleState.valueOf((String)response.get((Object)"life_cycle_state"));
        while (!this.isJobDone(this.lifeCycleState)) {
            Thread.sleep(this.msBeforeRequest);
            response = (JSONObject)((JSONObject)new JSONParser().parse((String)this.mClient.get(String.class))).get((Object)"state");
            this.lifeCycleState = RunLifeCycleState.valueOf((String)response.get((Object)"life_cycle_state"));
            LOG.info((Object)("Run status : " + this.lifeCycleState.name() + ", " + response.get((Object)"state_message")));
        }
        RunResultState resultState = RunResultState.valueOf((String)response.get((Object)"result_state"));
        LOG.info((Object)("Run result : " + resultState.name()));
        return this.getReturnCodeFromState(this.lifeCycleState, resultState);
    }

    public int executeJob() throws Exception, BigDataLauncherException {
        this.mFileSystem = new DatabricksFileSystem(this.mEndpoint, this.mToken, this.mUserAgent);
        this.uploadJars();
        if (!this.useTransientCuster() && !this.production) {
            this.restartCluster();
        }
        this.createJobAndRun();
        return this.awaitEnd();
    }

    private boolean useTransientCuster() {
        return this.transientCluster != null;
    }

    @Override
    public void cancelJob() {
        if (!RunLifeCycleState.PENDING.equals((Object)this.lifeCycleState) && !RunLifeCycleState.RUNNING.equals((Object)this.lifeCycleState)) {
            return;
        }
        this.resetClientForEndpoint(Endpoints.RUNS_CANCEL);
        this.mClient.type("application/json");
        JSONObject body = new JSONObject();
        body.put((Object)"run_id", (Object)this.mRunId);
        LOG.debug((Object)("Content sent to " + Endpoints.RUNS_CANCEL.getAPIPath() + " : " + body.toJSONString()));
        LOG.debug((Object)("With headers : " + this.mClient.getHeaders().toString()));
        Response response = this.mClient.post((Object)body.toJSONString());
        int status = response.getStatus();
        if (status != 200) {
            LOG.error((Object)("Status " + status + " for " + Endpoints.RESTART_CLUSTER.getAPIPath()));
            throw new BigDataLauncherException("Status " + status + " for " + Endpoints.RESTART_CLUSTER.getAPIPath() + " : " + (String)response.readEntity(String.class));
        }
        LOG.info((Object)("Run " + this.mRunId + " canceled"));
    }

    protected static enum RunLifeCycleState {
        PENDING,
        RUNNING,
        TERMINATING,
        TERMINATED,
        SKIPPED,
        INTERNAL_ERROR;

    }

    protected static enum RunResultState {
        SUCCESS,
        FAILED,
        TIMEOUT,
        CANCELED;

    }
}

