/*
 * Decompiled with CFR 0.152.
 */
package com.marklogic.contentpump;

import com.marklogic.contentpump.Command;
import com.marklogic.contentpump.ConfigConstants;
import com.marklogic.contentpump.ContentPump;
import com.marklogic.contentpump.LocalJob;
import com.marklogic.contentpump.LocalJobRunner;
import com.marklogic.contentpump.MultithreadedMapper;
import com.marklogic.mapreduce.utilities.InternalUtilities;
import com.marklogic.xcc.AdhocQuery;
import com.marklogic.xcc.ContentSource;
import com.marklogic.xcc.Request;
import com.marklogic.xcc.RequestOptions;
import com.marklogic.xcc.ResultItem;
import com.marklogic.xcc.ResultSequence;
import com.marklogic.xcc.Session;
import com.marklogic.xcc.exceptions.MLCloudRequestException;
import com.marklogic.xcc.exceptions.RequestException;
import com.marklogic.xcc.exceptions.ServerConnectionException;
import com.marklogic.xcc.types.XSInteger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.Mapper;

public class ThreadManager
implements ConfigConstants {
    public static final Log LOG = LogFactory.getLog(ThreadManager.class);
    public static final String SERVER_MAX_THREADS_QUERY = "import module namespace hadoop = \"http://marklogic.com/xdmp/hadoop\" at \"/MarkLogic/hadoop.xqy\";\nlet $f := fn:function-lookup(xs:QName('hadoop:get-port-max-threads'),0)\nreturn if (exists($f)) then $f() else 0";
    private LocalJob job;
    private Command cmd;
    private Configuration conf;
    private final ScheduledExecutorService scheduler;
    private ThreadPoolExecutor pool;
    private List<LocalJobRunner.LocalMapTask> taskList = new ArrayList<LocalJobRunner.LocalMapTask>();
    private List<Future<Object>> taskFutureList = new ArrayList<Future<Object>>();
    private int minThreads = 1;
    private int curServerThreads;
    private int idleServerThreads;
    private List<Integer> randomIndexes = new ArrayList<Integer>();
    private int newServerThreads;
    private boolean restrictHosts = false;
    private int threadCount;
    private int threadsPerSplit;
    private int maxThreads;
    private double maxThreadPercentage = 1.0;
    private int pollingInitDelay = 1;
    private int pollingPeriod = 1;

    public ThreadManager(LocalJob job) {
        this.job = job;
        this.conf = job.getConfiguration();
        this.minThreads = this.conf.getInt("mapreduce.marklogic.minthreads", this.minThreads);
        this.scheduler = Executors.newScheduledThreadPool(1);
    }

    public void parseCmdlineOptions(CommandLine cmdline, Command cmd) {
        this.cmd = cmd;
        if (cmdline.hasOption("thread_count")) {
            this.threadCount = Integer.parseInt(cmdline.getOptionValue("thread_count"));
        }
        if (cmdline.hasOption("thread_count_per_split")) {
            this.threadsPerSplit = Integer.parseInt(cmdline.getOptionValue("thread_count_per_split"));
        }
        if (cmdline.hasOption("max_threads")) {
            this.maxThreads = Integer.parseInt(cmdline.getOptionValue("max_threads"));
        }
        if (cmdline.hasOption("max_thread_percentage")) {
            this.maxThreadPercentage = (double)Integer.parseInt(cmdline.getOptionValue("max_thread_percentage")) / 100.0;
        }
        if (cmdline.hasOption("polling_init_delay")) {
            this.pollingInitDelay = Integer.parseInt(cmdline.getOptionValue("polling_init_delay"));
        }
        if (cmdline.hasOption("polling_period")) {
            this.pollingPeriod = Integer.parseInt(cmdline.getOptionValue("polling_period"));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void queryServerMaxThreads(ContentSource cs) throws RequestException {
        block7: {
            if (this.threadCount != 0) {
                this.newServerThreads = this.threadCount;
                return;
            }
            Session session = null;
            ResultSequence result = null;
            try {
                session = cs.newSession();
                AdhocQuery query = session.newAdhocQuery(SERVER_MAX_THREADS_QUERY);
                RequestOptions options = new RequestOptions();
                options.setDefaultXQueryVersion("1.0-ml");
                query.setOptions(options);
                result = session.submitRequest((Request)query);
                if (result.hasNext()) {
                    ResultItem item = result.next();
                    this.newServerThreads = (int)(this.maxThreadPercentage * (double)((XSInteger)item.getItem()).asPrimitiveInt());
                    break block7;
                }
                throw new IllegalStateException("Failed to query server max threads");
            }
            finally {
                if (result != null) {
                    result.close();
                }
                if (session != null) {
                    session.close();
                }
            }
        }
    }

    public void runThreadPoller() {
        ScheduledFuture<?> handler = this.scheduler.scheduleWithFixedDelay(new ThreadPoller(), this.pollingInitDelay, this.pollingPeriod, POLLING_TIME_UNIT);
    }

    public boolean runAutoScaling() {
        return this.restrictHosts && this.threadCount == 0 && this.threadsPerSplit == 0;
    }

    public ThreadPoolExecutor initThreadPool() {
        int numThreads;
        if (this.threadCount != 0) {
            numThreads = this.threadCount;
        } else {
            numThreads = this.newServerThreads;
            if (numThreads == 0) {
                numThreads = 4;
            }
        }
        numThreads = Math.max(numThreads, this.minThreads);
        if (this.maxThreads > 0) {
            numThreads = Math.min(numThreads, this.maxThreads);
        }
        if (numThreads > 1) {
            this.pool = (ThreadPoolExecutor)Executors.newFixedThreadPool(numThreads);
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Initial thread pool size: " + numThreads));
                if (this.runAutoScaling()) {
                    LOG.debug((Object)"Thread pool will auto-scale based on available server threads.");
                } else {
                    LOG.debug((Object)"Thread pool is fixed and will not auto-scale.");
                }
            }
        }
        this.curServerThreads = numThreads;
        return this.pool;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int getActiveTaskCounts() {
        int count = 0;
        List<LocalJobRunner.LocalMapTask> list = this.taskList;
        synchronized (list) {
            for (LocalJobRunner.LocalMapTask task : this.taskList) {
                if (task.isTaskDone()) {
                    if (task.getThreadCount() <= 0) continue;
                    this.idleServerThreads += task.getThreadCount();
                    task.setThreadCount(0);
                    continue;
                }
                ++count;
            }
        }
        return count;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void scaleOutThreadPool(int activeTaskCounts) {
        if (this.maxThreads > 0 && this.newServerThreads > this.maxThreads) {
            LOG.info((Object)("Thread count has reached the maximum value: " + this.maxThreads + " , and the thread pool will not further scale out."));
            this.newServerThreads = this.maxThreads;
        } else {
            LOG.info((Object)("Thread pool is scaling-out. New thread pool size: " + this.newServerThreads));
        }
        ThreadPoolExecutor threadPoolExecutor = this.pool;
        synchronized (threadPoolExecutor) {
            this.pool.setMaximumPoolSize(this.newServerThreads);
            this.pool.setCorePoolSize(this.newServerThreads);
        }
        for (int i = 0; i < this.taskList.size(); ++i) {
            LocalJobRunner.LocalMapTask task = this.taskList.get(i);
            if (task.getMapperClass() != MultithreadedMapper.class) continue;
            if (task.getThreadCount() == 0) {
                if (!LOG.isDebugEnabled()) continue;
                LOG.debug((Object)("Running with MultithreadedMapper. New thread count for split #" + i + ": 0, since this task is already completed."));
                continue;
            }
            int deltaTaskThreads = this.assignThreads(this.randomIndexes.get(i), activeTaskCounts, this.newServerThreads - this.curServerThreads + this.idleServerThreads, false);
            int newTaskThreads = deltaTaskThreads + task.getThreadCount();
            task.setThreadCount(newTaskThreads);
            ((MultithreadedMapper)task.getMapper()).setThreadCount(newTaskThreads);
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Running with MultithreadedMapper. New thread count for split #" + i + ": " + newTaskThreads));
            }
            if (task.isTaskDone()) continue;
            try {
                ((MultithreadedMapper)task.getMapper()).createRunners(deltaTaskThreads);
                continue;
            }
            catch (ClassNotFoundException e) {
                LOG.error((Object)"MapRunner class not found", (Throwable)e);
                continue;
            }
            catch (IOException | InterruptedException e) {
                LOG.error((Object)e.getMessage(), (Throwable)e);
            }
        }
    }

    public void scaleInThreadPool(int activeTaskCounts) {
        LOG.info((Object)("Thread pool is scaling-in. New thread pool size: " + this.newServerThreads));
        for (int i = 0; i < this.taskList.size(); ++i) {
            LocalJobRunner.LocalMapTask task = this.taskList.get(i);
            if (task.getMapperClass() != MultithreadedMapper.class) continue;
            if (task.getThreadCount() == 0) {
                if (!LOG.isDebugEnabled()) continue;
                LOG.debug((Object)("Running with MultithreadedMapper. New thread count for split #" + i + ": 0, since this task is already completed."));
                continue;
            }
            int deltaTaskThreads = this.assignThreads(this.randomIndexes.get(i), activeTaskCounts, this.curServerThreads - this.newServerThreads - this.idleServerThreads, false);
            int newTaskThreads = task.getThreadCount() - deltaTaskThreads;
            if (newTaskThreads < this.minThreads) {
                LOG.info((Object)("Thread count has reached minimum value: " + this.minThreads + " and the thread pool will not further scale in."));
                newTaskThreads = this.minThreads;
            }
            task.setThreadCount(newTaskThreads);
            ((MultithreadedMapper)task.getMapper()).setThreadCount(newTaskThreads);
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Running with MultithreadedMapper. New thread count for split #" + i + ": " + newTaskThreads));
            }
            if (task.isTaskDone()) continue;
            ((MultithreadedMapper)task.getMapper()).stopRunners(deltaTaskThreads);
        }
        this.pool.setCorePoolSize(this.newServerThreads);
        this.pool.setMaximumPoolSize(this.newServerThreads);
    }

    public void assignIdleThreads(int activeTaskCounts) {
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Assigning idle threads to each LocalMapTask. Idle threadcounts: " + this.idleServerThreads));
        }
        for (int i = 0; i < this.taskList.size(); ++i) {
            LocalJobRunner.LocalMapTask task = this.taskList.get(i);
            if (task.isTaskDone()) {
                if (!LOG.isDebugEnabled()) continue;
                LOG.debug((Object)("Running with MultithreadedMapper. New thread count for split #" + i + ": 0, since this task is already completed."));
                continue;
            }
            int deltaTaskThreads = this.assignThreads(this.randomIndexes.get(i), activeTaskCounts, this.idleServerThreads, false);
            if (task.getMapperClass() != MultithreadedMapper.class) continue;
            int newTaskThreads = deltaTaskThreads + task.getThreadCount();
            task.setThreadCount(newTaskThreads);
            ((MultithreadedMapper)task.getMapper()).setThreadCount(newTaskThreads);
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Running with MultithreadedMapper. New thread count for split #" + i + ": " + newTaskThreads));
            }
            if (deltaTaskThreads == 0) continue;
            try {
                ((MultithreadedMapper)task.getMapper()).createRunners(deltaTaskThreads);
                continue;
            }
            catch (ClassNotFoundException e) {
                LOG.error((Object)"MapRunner class not found", (Throwable)e);
                continue;
            }
            catch (IOException | InterruptedException e) {
                LOG.error((Object)e.getMessage(), (Throwable)e);
            }
        }
    }

    public void shutdownThreadPool() throws InterruptedException, ExecutionException {
        for (Future<Object> f : this.taskFutureList) {
            f.get();
        }
        if (this.scheduler != null) {
            this.scheduler.shutdown();
            while (!this.scheduler.awaitTermination(1L, TimeUnit.HOURS)) {
            }
        }
        if (this.pool != null) {
            this.pool.shutdown();
            while (!this.pool.awaitTermination(1L, TimeUnit.HOURS)) {
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void submitTask(LocalJobRunner.LocalMapTask task, int index, int splitCount) throws Exception {
        int taskThreads = this.assignThreads(index, splitCount, this.newServerThreads, true);
        Class mapperClass = this.job.getMapperClass();
        Class<? extends Mapper<?, ?, ?, ?>> runtimeMapperClass = this.job.getMapperClass();
        if (taskThreads != this.threadsPerSplit) {
            runtimeMapperClass = this.cmd.getRuntimeMapperClass(this.job, mapperClass, this.threadsPerSplit);
            if (runtimeMapperClass != mapperClass) {
                task.setMapperClass(runtimeMapperClass);
            }
            if (runtimeMapperClass == MultithreadedMapper.class) {
                task.setThreadCount(taskThreads);
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("Running with MultithreadedMapper. Initial thread count for split #" + index + ": " + taskThreads));
                }
            }
        }
        this.taskList.add(task);
        if (runtimeMapperClass == MultithreadedMapper.class) {
            ThreadPoolExecutor threadPoolExecutor = this.pool;
            synchronized (threadPoolExecutor) {
                this.taskFutureList.add(this.pool.submit(task));
                this.pool.wait();
            }
        } else {
            this.pool.submit(task);
        }
    }

    private int assignThreads(int splitIndex, int splitCount, int totalThreads, boolean initialize) {
        if (this.threadsPerSplit > 0) {
            return this.threadsPerSplit;
        }
        if (splitCount == 1) {
            return totalThreads;
        }
        if (splitCount * this.minThreads > totalThreads) {
            if (!initialize) {
                if (splitIndex < totalThreads) {
                    return this.minThreads;
                }
                return 0;
            }
            return this.minThreads;
        }
        if (splitIndex % totalThreads < totalThreads % splitCount) {
            return totalThreads / splitCount + 1;
        }
        return totalThreads / splitCount;
    }

    private void prepareRandomIndexes(int splitCount) {
        this.randomIndexes.clear();
        for (int i = 0; i < splitCount; ++i) {
            this.randomIndexes.add(i);
        }
        Collections.shuffle(this.randomIndexes);
    }

    public void setRestrictHosts(boolean newRestrictHosts) {
        this.restrictHosts = newRestrictHosts;
    }

    class ThreadPoller
    implements Runnable {
        private int pollingRetry;
        private int pollingSleepTime;
        private final int MAX_RETRIES = 5;
        private final int MIN_SLEEP_TIME = 500;

        ThreadPoller() {
        }

        @Override
        public void run() {
            if (ContentPump.shutdown) {
                return;
            }
            if (!ThreadManager.this.runAutoScaling()) {
                return;
            }
            boolean succeeded = false;
            boolean isRetryable = false;
            this.pollingRetry = 0;
            this.pollingSleepTime = 500;
            while (this.pollingRetry < 5) {
                String[] hosts;
                if (this.pollingRetry > 0 && LOG.isDebugEnabled()) {
                    LOG.debug((Object)"Retrying querying available server max threads.");
                }
                for (String host : hosts = ThreadManager.this.conf.getStrings("mapreduce.marklogic.output.host")) {
                    try {
                        ContentSource cs = InternalUtilities.getOutputContentSource(ThreadManager.this.conf, host);
                        ThreadManager.this.queryServerMaxThreads(cs);
                        if (LOG.isDebugEnabled()) {
                            LOG.debug((Object)("New available server threads: " + ThreadManager.this.newServerThreads));
                        }
                        succeeded = true;
                        break;
                    }
                    catch (Exception e) {
                        if (e instanceof RequestException) {
                            if (e instanceof ServerConnectionException) {
                                isRetryable = true;
                                LOG.warn((Object)("ServerConnectionException:" + e.getMessage() + " . Unable to connect to " + host + " to query available server max threads."));
                                continue;
                            }
                            if (e instanceof MLCloudRequestException) {
                                isRetryable |= ((MLCloudRequestException)e).isRetryable();
                                LOG.warn((Object)("MLCloudRequestException:" + e.getMessage()));
                                continue;
                            }
                            isRetryable = true;
                            LOG.warn((Object)("RequestException:" + e.getMessage()));
                            continue;
                        }
                        isRetryable = true;
                        LOG.warn((Object)("Exception:" + e.getMessage()));
                    }
                }
                if (succeeded) break;
                if (++this.pollingRetry < 5 && isRetryable) {
                    this.sleep();
                    continue;
                }
                LOG.error((Object)"Exceed max querying retry. Unable to queryavailable server max threads.");
                ThreadManager.this.job.setJobState(JobStatus.State.FAILED);
                return;
            }
            int activeTaskCounts = ThreadManager.this.getActiveTaskCounts();
            ThreadManager.this.prepareRandomIndexes(activeTaskCounts);
            if (ThreadManager.this.curServerThreads < ThreadManager.this.newServerThreads) {
                ThreadManager.this.scaleOutThreadPool(activeTaskCounts);
            } else if (ThreadManager.this.curServerThreads > ThreadManager.this.newServerThreads) {
                ThreadManager.this.scaleInThreadPool(activeTaskCounts);
            } else if (ThreadManager.this.idleServerThreads > 0) {
                ThreadManager.this.assignIdleThreads(activeTaskCounts);
            } else {
                return;
            }
            ThreadManager.this.curServerThreads = ThreadManager.this.newServerThreads;
            ThreadManager.this.idleServerThreads = 0;
        }

        private void sleep() {
            try {
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("Sleeping before retrying...sleepTime= " + this.pollingSleepTime + "ms"));
                }
                InternalUtilities.sleep(this.pollingSleepTime);
            }
            catch (Exception exception) {
                // empty catch block
            }
            this.pollingSleepTime *= 2;
        }
    }
}

