/*
 * Decompiled with CFR 0.152.
 */
package org.talend.bigdata.launcher.databricks;

import com.databricks.sdk.WorkspaceClient;
import com.databricks.sdk.core.DatabricksConfig;
import com.databricks.sdk.service.compute.ClusterDetails;
import com.databricks.sdk.service.compute.Library;
import com.databricks.sdk.service.compute.RestartCluster;
import com.databricks.sdk.service.compute.State;
import com.databricks.sdk.service.files.FileInfo;
import com.databricks.sdk.service.files.GetMetadataResponse;
import com.databricks.sdk.service.files.GetStatusRequest;
import com.databricks.sdk.service.files.UploadRequest;
import com.databricks.sdk.service.jobs.CreateJob;
import com.databricks.sdk.service.jobs.CreateResponse;
import com.databricks.sdk.service.jobs.GetRunRequest;
import com.databricks.sdk.service.jobs.JobAccessControlRequest;
import com.databricks.sdk.service.jobs.JobPermissionLevel;
import com.databricks.sdk.service.jobs.Run;
import com.databricks.sdk.service.jobs.RunLifeCycleState;
import com.databricks.sdk.service.jobs.RunNow;
import com.databricks.sdk.service.jobs.RunNowResponse;
import com.databricks.sdk.service.jobs.RunResultState;
import com.databricks.sdk.service.jobs.SparkJarTask;
import com.databricks.sdk.service.jobs.SubmitRun;
import com.databricks.sdk.service.jobs.SubmitRunResponse;
import com.databricks.sdk.service.jobs.SubmitTask;
import com.databricks.sdk.service.jobs.Task;
import com.databricks.sdk.service.workspace.Import;
import com.databricks.sdk.service.workspace.ImportFormat;
import com.databricks.sdk.service.workspace.ObjectInfo;
import com.databricks.sdk.support.Wait;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import org.apache.commons.codec.binary.Base64;
import org.apache.http.impl.client.CloseableHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.talend.bigdata.launcher.databricks.DatabricksCluster;
import org.talend.bigdata.launcher.databricks.DatabricksLauncherException;
import org.talend.bigdata.launcher.databricks.filesystem.DatabricksFileSystemUtil;

public abstract class DatabricksJob {
    protected boolean isCancelled;
    protected boolean isRunSubmit;
    protected String mJarToExecute;
    protected String mClassToExecute;
    protected String mAppName;
    protected String mjobJarName;
    protected final String endpoint;
    protected Long mRunId;
    protected Long mJobId;
    protected Map<String, String> mConf;
    protected String mClusterId;
    protected Map<String, String> mTuningConf;
    protected String mFilePath;
    protected String mLibJars;
    protected List<String> mLibraries;
    protected List<String> mArgs;
    protected String userAgent;
    protected DatabricksCluster transientCluster;
    protected RunLifeCycleState lifeCycleState;
    protected RunResultState resultState;
    protected boolean production;
    protected boolean isUC;
    protected boolean isWS;
    protected boolean isACLEnabled;
    protected String mUserName;
    protected String mSPName;
    protected String mPermissionLevel;
    protected String mGroupName;
    protected String mOwnerName;
    protected long msBeforeRequest;
    protected CloseableHttpClient httpClient;
    WorkspaceClient workspace;
    private static final Logger LOG = LoggerFactory.getLogger(DatabricksJob.class);
    int apiRetrySleep = 5000;
    protected static final int API_MAX_RETRY_COUNT = 10;
    protected static final int API_MAX_SIZE = 0x100000;

    protected DatabricksJob(String host, String token, WorkspaceClientFactory factory) {
        this.endpoint = host;
        this.workspace = factory.createClient(host, token, null, null);
    }

    protected DatabricksJob(String host, String clientID, String secretID, WorkspaceClientFactory factory) {
        this.endpoint = host;
        this.workspace = factory.createClient(host, null, clientID, secretID);
    }

    protected DatabricksJob(String host, String token) {
        this(host, token, new DefaultWorkspaceClientFactory());
    }

    protected DatabricksJob(String host, String clientID, String secretID) {
        this(host, clientID, secretID, new DefaultWorkspaceClientFactory());
    }

    protected void uploadJars() throws IOException {
        if (this.mLibraries == null) {
            this.mLibraries = new ArrayList<String>();
        } else {
            this.mLibraries.clear();
        }
        String remotePathBase = this.mFilePath.endsWith("/") ? this.mFilePath : this.mFilePath + "/";
        LOG.info("Using {} as base path", (Object)remotePathBase);
        LOG.info("will upload libs {}", (Object)this.mLibJars);
        for (String jarPath : this.mLibJars.split(",")) {
            File uploadFile = new File(jarPath.trim());
            String fileName = uploadFile.getName();
            String fullRemotePath = remotePathBase + fileName;
            if (!uploadFile.exists()) {
                LOG.warn("Skipping jar: File not found at {}", (Object)jarPath);
                continue;
            }
            try {
                if (this.isUC) {
                    this.handleUCCatalogUpload(jarPath, uploadFile, fullRemotePath);
                } else if (this.isWS) {
                    this.createTargetDirectory(remotePathBase);
                    this.handleWSUpload(jarPath, uploadFile, fullRemotePath);
                } else {
                    this.handleDBFSUpload(jarPath, uploadFile, fullRemotePath);
                }
                this.addLibraryReference(fullRemotePath);
            }
            catch (Exception e) {
                LOG.error("Error uploading jar {} to {}", new Object[]{jarPath, fullRemotePath, e});
            }
        }
    }

    private boolean needUpload(File uploadFile, String fullRemotePath) {
        FileInfo remoteInfo = null;
        boolean needUpload = true;
        try {
            remoteInfo = this.workspace.dbfs().getStatus(new GetStatusRequest().setPath(fullRemotePath));
            if (remoteInfo != null) {
                needUpload = !remoteInfo.getFileSize().equals(uploadFile.length());
            }
        }
        catch (Exception exception) {
            // empty catch block
        }
        return needUpload;
    }

    private void handleUCCatalogUpload(String jarPath, File uploadFile, String fullRemotePath) throws IOException {
        try (InputStream uploadInputStream = Files.newInputStream(Paths.get(jarPath, new String[0]), new OpenOption[0]);){
            GetMetadataResponse remoteFile = this.workspace.files().getMetadata(fullRemotePath);
            if (remoteFile == null || remoteFile.getContentLength().longValue() != uploadFile.length()) {
                LOG.info("Upload of {} to volume: {}", (Object)jarPath, (Object)fullRemotePath);
                this.workspace.files().upload(new UploadRequest().setFilePath(fullRemotePath).setContents(uploadInputStream).setOverwrite(Boolean.valueOf(true)));
            } else {
                LOG.info("Skip upload of {} because it is already present and of the same size on volume", (Object)uploadFile.getName());
            }
        }
    }

    private void handleWSUpload(String jarPath, File uploadFile, String fullRemotePath) throws IOException {
        byte[] bytes = DatabricksJob.loadFile(uploadFile);
        byte[] encoded = Base64.encodeBase64((byte[])bytes);
        String encodedJar = new String(encoded);
        ObjectInfo remoteFile = null;
        try {
            remoteFile = this.workspace.workspace().getStatus(fullRemotePath);
        }
        catch (Exception exception) {
            // empty catch block
        }
        Long remoteSize = 0L;
        if (remoteFile != null) {
            remoteSize = remoteFile.getSize();
        }
        if (remoteFile == null || remoteSize == null || remoteSize.longValue() != uploadFile.length()) {
            if (remoteFile != null) {
                LOG.debug("remote size {} local size {}", (Object)remoteSize, (Object)uploadFile.length());
            }
            this.workspace.workspace().importContent(new Import().setFormat(ImportFormat.AUTO).setPath(fullRemotePath).setContent(encodedJar).setOverwrite(Boolean.valueOf(true)));
            LOG.info("Uploaded {} to Workspace: {}", (Object)jarPath, (Object)fullRemotePath);
        } else {
            LOG.info("Skip upload of {} because it is already present and of the same size on Workspace", (Object)uploadFile.getName());
        }
    }

    private void handleDBFSUpload(String jarPath, File uploadFile, String fullRemotePath) {
        boolean needUpload = this.needUpload(uploadFile, fullRemotePath);
        if (needUpload) {
            DatabricksFileSystemUtil.copyFromLocal(this.workspace, jarPath, fullRemotePath, true);
        } else {
            LOG.info("Skip upload of {} because it is already present and of the same size on DBFS", (Object)uploadFile.getName());
        }
    }

    protected void createTargetDirectory(String folder) {
        try {
            this.workspace.workspace().mkdirs(folder);
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    protected static byte[] loadFile(File file) throws IOException {
        FileInputStream is = new FileInputStream(file);
        long length = file.length();
        byte[] bytes = new byte[(int)length];
        int numRead = 0;
        try {
            for (int offset = 0; offset < bytes.length && (numRead = ((InputStream)is).read(bytes, offset, bytes.length - offset)) >= 0; offset += numRead) {
            }
        }
        catch (IOException e) {
            throw new IOException("Could not completely read file " + file.getName());
        }
        finally {
            ((InputStream)is).close();
        }
        return bytes;
    }

    private void addLibraryReference(String fullRemotePath) {
        if (this.isUC || this.isWS) {
            this.mLibraries.add(fullRemotePath);
        } else {
            this.mLibraries.add("dbfs:" + fullRemotePath);
        }
    }

    protected Task createTask() throws TimeoutException {
        SparkJarTask sparkTask = new SparkJarTask().setMainClassName(this.mClassToExecute).setParameters(this.mArgs);
        if (this.mArgs != null && !this.mArgs.isEmpty()) {
            ArrayList<String> parameters = new ArrayList<String>(this.mArgs);
            parameters.add("-calledByDatabricks");
            sparkTask.setParameters(parameters);
        }
        Task task = new Task().setSparkJarTask(sparkTask);
        if (this.useTransientCuster()) {
            ClusterDetails cluster = this.transientCluster.createOnDatabricks(this.workspace);
            task.setNewCluster(cluster.getSpec());
            this.mClusterId = cluster.getClusterId();
        } else {
            task.setExistingClusterId(this.mClusterId);
        }
        LOG.info("clusterId is {}", (Object)this.mClusterId);
        List<Library> libs = this.mLibraries.stream().map(jar -> new Library().setJar(jar)).toList();
        task.setLibraries(libs);
        task.setTaskKey(this.mjobJarName + UUID.randomUUID().toString());
        return task;
    }

    protected SubmitTask createSubmitTask() throws TimeoutException {
        SparkJarTask sparkTask = new SparkJarTask().setMainClassName(this.mClassToExecute).setParameters(this.mArgs);
        if (this.mArgs != null && this.mArgs.size() > 0) {
            ArrayList<String> parameters = new ArrayList<String>(this.mArgs);
            parameters.add("-calledByDatabricks");
            sparkTask.setParameters(parameters);
        }
        SubmitTask task = new SubmitTask().setSparkJarTask(sparkTask);
        if (this.useTransientCuster()) {
            ClusterDetails cluster = this.transientCluster.createOnDatabricks(this.workspace);
            task.setNewCluster(cluster.getSpec());
            this.mClusterId = cluster.getClusterId();
        } else {
            task.setExistingClusterId(this.mClusterId);
        }
        LOG.info("clusterId is {}", (Object)this.mClusterId);
        List<Library> libs = this.mLibraries.stream().map(jar -> new Library().setJar(jar)).toList();
        task.setLibraries(libs);
        task.setTaskKey(this.mjobJarName + UUID.randomUUID().toString());
        return task;
    }

    protected List<JobAccessControlRequest> createJobControlRequest() {
        ArrayList<JobAccessControlRequest> aclList = new ArrayList<JobAccessControlRequest>();
        if (this.isACLEnabled) {
            if (this.mUserName != null) {
                JobAccessControlRequest userAcl = new JobAccessControlRequest().setUserName(this.mUserName).setPermissionLevel(JobPermissionLevel.valueOf((String)this.mPermissionLevel));
                aclList.add(userAcl);
            } else if (this.mGroupName != null) {
                JobAccessControlRequest groupAcl = new JobAccessControlRequest().setGroupName(this.mGroupName).setPermissionLevel(JobPermissionLevel.valueOf((String)this.mPermissionLevel));
                aclList.add(groupAcl);
            } else if (this.mSPName != null) {
                JobAccessControlRequest servicePrincipalAcl = new JobAccessControlRequest().setServicePrincipalName(this.mSPName).setPermissionLevel(JobPermissionLevel.valueOf((String)this.mPermissionLevel));
                aclList.add(servicePrincipalAcl);
            }
        }
        return aclList;
    }

    protected synchronized Wait<Run, SubmitRunResponse> submitJobAndRun() {
        Wait response;
        if (this.isCancelled) {
            return null;
        }
        try {
            SubmitTask task = this.createSubmitTask();
            SubmitRun job = new SubmitRun().setRunName(this.mAppName).setTasks(Arrays.asList(task)).setAccessControlList(this.createJobControlRequest());
            response = this.workspace.jobs().submit(job);
            LOG.info("Job created with ID: {}", (Object)((SubmitRunResponse)response.getResponse()).getRunId());
            this.mRunId = ((SubmitRunResponse)response.getResponse()).getRunId();
            LOG.info("job state {}", (Object)((Run)response.get()).getState());
        }
        catch (IllegalStateException e) {
            this.workspace.clusters().delete(this.mClusterId);
            throw new DatabricksLauncherException("IllegalStateException ", e);
        }
        catch (TimeoutException e) {
            throw new DatabricksLauncherException("Timeout exception", e);
        }
        if (this.mRunId != null) {
            GetRunRequest getRunRequest = new GetRunRequest().setRunId(this.mRunId);
            String runPageUrl = this.workspace.jobs().getRun(getRunRequest).getRunPageUrl();
            LOG.info("logs of job : {}", (Object)runPageUrl);
        }
        return response;
    }

    protected synchronized Wait<Run, RunNowResponse> createJobAndRun() {
        Wait runNowResponse;
        if (this.isCancelled) {
            return null;
        }
        try {
            Task task = this.createTask();
            CreateJob job = new CreateJob().setName(this.mAppName).setTasks(Arrays.asList(task)).setAccessControlList(this.createJobControlRequest());
            CreateResponse createResponse = this.workspace.jobs().create(job);
            this.mJobId = createResponse.getJobId();
            LOG.info("Job created with id : {}", (Object)this.mJobId);
            RunNow runNowRequest = new RunNow().setJobId(this.mJobId);
            runNowResponse = this.workspace.jobs().runNow(runNowRequest);
        }
        catch (IllegalStateException e) {
            LOG.error("error running job, deleting cluster {}", (Object)this.mClusterId);
            this.workspace.clusters().delete(this.mClusterId);
            throw new DatabricksLauncherException("IllegalStateException ", e);
        }
        catch (TimeoutException e) {
            throw new DatabricksLauncherException("Timeout exception", e);
        }
        return runNowResponse;
    }

    protected void restartCluster() {
        try {
            RestartCluster restartRequest = new RestartCluster().setClusterId(this.mClusterId);
            this.workspace.clusters().restart(restartRequest);
            LOG.info("Cluster {} restart command sent successfully.", (Object)this.mClusterId);
        }
        catch (Exception e) {
            LOG.error("Failed to restart cluster: ", (Throwable)e);
        }
    }

    protected boolean isClusterTerminated() {
        State state = this.workspace.clusters().get(this.mClusterId).getState();
        return state.equals((Object)State.TERMINATED);
    }

    protected int getReturnCodeFromState(RunLifeCycleState lifeCycleState, RunResultState resultState) {
        if (lifeCycleState.compareTo((Enum)RunLifeCycleState.TERMINATED) == 0 && resultState.compareTo((Enum)RunResultState.SUCCESS) == 0) {
            return 0;
        }
        LOG.error("Run terminated with status {} and result {} Check logs on Spark UI for more information", (Object)lifeCycleState.name(), (Object)resultState.name());
        return 1;
    }

    protected boolean isJobDone(RunLifeCycleState state) {
        return state.compareTo((Enum)RunLifeCycleState.TERMINATED) == 0 || state.compareTo((Enum)RunLifeCycleState.SKIPPED) == 0 || state.compareTo((Enum)RunLifeCycleState.INTERNAL_ERROR) == 0;
    }

    protected <T extends Enum<T>> T jsonToEnum(Class<T> clazz, JsonNode node) {
        T resultState = null;
        if (node != null && node.asText() != null) {
            try {
                resultState = Enum.valueOf(clazz, node.asText());
            }
            catch (IllegalArgumentException illegalArgumentException) {
                // empty catch block
            }
        }
        return resultState;
    }

    public void executeJob() throws DatabricksLauncherException, IOException {
        this.lifeCycleState = RunLifeCycleState.PENDING;
        this.uploadJars();
        if (!this.useTransientCuster() && !this.production) {
            this.restartCluster();
        }
        try {
            if (this.isRunSubmit) {
                Wait<Run, SubmitRunResponse> runResponse = this.submitJobAndRun();
                LOG.info("run state is {}", (Object)((Run)runResponse.get()).getState().getResultState());
            } else {
                Wait<Run, RunNowResponse> runResponse = this.createJobAndRun();
                LOG.info("run state is {}", (Object)((Run)runResponse.get()).getState().getResultState());
            }
            if (this.useTransientCuster()) {
                this.workspace.clusters().delete(this.mClusterId);
            }
        }
        catch (TimeoutException e) {
            LOG.error("Timeout ", (Throwable)e);
        }
    }

    protected boolean useTransientCuster() {
        return this.transientCluster != null;
    }

    public static interface WorkspaceClientFactory {
        public WorkspaceClient createClient(String var1, String var2, String var3, String var4);
    }

    public static class DefaultWorkspaceClientFactory
    implements WorkspaceClientFactory {
        @Override
        public WorkspaceClient createClient(String host, String token, String clientId, String clientSecret) {
            DatabricksConfig config = token != null ? new DatabricksConfig().setAuthType("pat").setHost(host).setToken(token) : new DatabricksConfig().setHost(host).setClientId(clientId).setClientSecret(clientSecret);
            return new WorkspaceClient(config);
        }
    }
}

