/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.regionserver;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressWarnings;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
import org.apache.hadoop.hbase.regionserver.CompactionRequestor;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionMergeRequest;
import org.apache.hadoop.hbase.regionserver.SplitRequest;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.util.StringUtils;

@InterfaceAudience.Private
public class CompactSplitThread
implements CompactionRequestor,
PropagatingConfigurationObserver {
    static final Log LOG = LogFactory.getLog(CompactSplitThread.class);
    public static final String LARGE_COMPACTION_THREADS = "hbase.regionserver.thread.compaction.large";
    public static final int LARGE_COMPACTION_THREADS_DEFAULT = 1;
    public static final String SMALL_COMPACTION_THREADS = "hbase.regionserver.thread.compaction.small";
    public static final int SMALL_COMPACTION_THREADS_DEFAULT = 1;
    public static final String SPLIT_THREADS = "hbase.regionserver.thread.split";
    public static final int SPLIT_THREADS_DEFAULT = 1;
    public static final String MERGE_THREADS = "hbase.regionserver.thread.merge";
    public static final int MERGE_THREADS_DEFAULT = 1;
    public static final String REGION_SERVER_REGION_SPLIT_LIMIT = "hbase.regionserver.regionSplitLimit";
    public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT = 1000;
    private final HRegionServer server;
    private final Configuration conf;
    private final ThreadPoolExecutor longCompactions;
    private final ThreadPoolExecutor shortCompactions;
    private final ThreadPoolExecutor splits;
    private final ThreadPoolExecutor mergePool;
    private volatile CompactionThroughputController compactionThroughputController;
    private int regionSplitLimit;

    CompactSplitThread(HRegionServer server) {
        this.server = server;
        this.conf = server.getConfiguration();
        this.regionSplitLimit = this.conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT, 1000);
        int largeThreads = Math.max(1, this.conf.getInt(LARGE_COMPACTION_THREADS, 1));
        int smallThreads = this.conf.getInt(SMALL_COMPACTION_THREADS, 1);
        int splitThreads = this.conf.getInt(SPLIT_THREADS, 1);
        Preconditions.checkArgument((largeThreads > 0 && smallThreads > 0 ? 1 : 0) != 0);
        final String n = Thread.currentThread().getName();
        this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60L, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>(), new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName(n + "-longCompactions-" + System.currentTimeMillis());
                return t;
            }
        });
        this.longCompactions.setRejectedExecutionHandler(new Rejection());
        this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60L, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>(), new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName(n + "-shortCompactions-" + System.currentTimeMillis());
                return t;
            }
        });
        this.shortCompactions.setRejectedExecutionHandler(new Rejection());
        this.splits = (ThreadPoolExecutor)Executors.newFixedThreadPool(splitThreads, new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName(n + "-splits-" + System.currentTimeMillis());
                return t;
            }
        });
        int mergeThreads = this.conf.getInt(MERGE_THREADS, 1);
        this.mergePool = (ThreadPoolExecutor)Executors.newFixedThreadPool(mergeThreads, new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName(n + "-merges-" + System.currentTimeMillis());
                return t;
            }
        });
        this.compactionThroughputController = CompactionThroughputControllerFactory.create(server, this.conf);
    }

    public String toString() {
        return "compaction_queue=(" + this.longCompactions.getQueue().size() + ":" + this.shortCompactions.getQueue().size() + ")" + ", split_queue=" + this.splits.getQueue().size() + ", merge_queue=" + this.mergePool.getQueue().size();
    }

    public String dumpQueue() {
        StringBuffer queueLists = new StringBuffer();
        queueLists.append("Compaction/Split Queue dump:\n");
        queueLists.append("  LargeCompation Queue:\n");
        BlockingQueue<Runnable> lq = this.longCompactions.getQueue();
        Iterator it = lq.iterator();
        while (it.hasNext()) {
            queueLists.append("    " + ((Runnable)it.next()).toString());
            queueLists.append("\n");
        }
        if (this.shortCompactions != null) {
            queueLists.append("\n");
            queueLists.append("  SmallCompation Queue:\n");
            lq = this.shortCompactions.getQueue();
            it = lq.iterator();
            while (it.hasNext()) {
                queueLists.append("    " + ((Runnable)it.next()).toString());
                queueLists.append("\n");
            }
        }
        queueLists.append("\n");
        queueLists.append("  Split Queue:\n");
        lq = this.splits.getQueue();
        it = lq.iterator();
        while (it.hasNext()) {
            queueLists.append("    " + ((Runnable)it.next()).toString());
            queueLists.append("\n");
        }
        queueLists.append("\n");
        queueLists.append("  Region Merge Queue:\n");
        lq = this.mergePool.getQueue();
        it = lq.iterator();
        while (it.hasNext()) {
            queueLists.append("    " + ((Runnable)it.next()).toString());
            queueLists.append("\n");
        }
        return queueLists.toString();
    }

    public synchronized void requestRegionsMerge(HRegion a, HRegion b, boolean forcible) {
        try {
            this.mergePool.execute(new RegionMergeRequest(a, b, this.server, forcible));
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Region merge requested for " + a + "," + b + ", forcible=" + forcible + ".  " + this));
            }
        }
        catch (RejectedExecutionException ree) {
            LOG.warn((Object)("Could not execute merge for " + a + "," + b + ", forcible=" + forcible), (Throwable)ree);
        }
    }

    public synchronized boolean requestSplit(HRegion r) {
        byte[] midKey;
        if (this.shouldSplitRegion() && r.getCompactPriority() >= 1 && (midKey = r.checkSplit()) != null) {
            this.requestSplit(r, midKey);
            return true;
        }
        return false;
    }

    public synchronized void requestSplit(HRegion r, byte[] midKey) {
        if (midKey == null) {
            LOG.debug((Object)("Region " + r.getRegionNameAsString() + " not splittable because midkey=null"));
            if (r.shouldForceSplit()) {
                r.clearSplit();
            }
            return;
        }
        try {
            this.splits.execute(new SplitRequest(r, midKey, this.server));
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Split requested for " + r + ".  " + this));
            }
        }
        catch (RejectedExecutionException ree) {
            LOG.info((Object)("Could not execute split for " + r), (Throwable)ree);
        }
    }

    @Override
    public synchronized List<CompactionRequest> requestCompaction(HRegion r, String why) throws IOException {
        return this.requestCompaction(r, why, null);
    }

    @Override
    public synchronized List<CompactionRequest> requestCompaction(HRegion r, String why, List<Pair<CompactionRequest, Store>> requests) throws IOException {
        return this.requestCompaction(r, why, Integer.MIN_VALUE, requests);
    }

    @Override
    public synchronized CompactionRequest requestCompaction(HRegion r, Store s, String why, CompactionRequest request) throws IOException {
        return this.requestCompaction(r, s, why, Integer.MIN_VALUE, request);
    }

    @Override
    public synchronized List<CompactionRequest> requestCompaction(HRegion r, String why, int p, List<Pair<CompactionRequest, Store>> requests) throws IOException {
        return this.requestCompactionInternal(r, why, p, requests, true);
    }

    private List<CompactionRequest> requestCompactionInternal(HRegion r, String why, int p, List<Pair<CompactionRequest, Store>> requests, boolean selectNow) throws IOException {
        ArrayList<CompactionRequest> ret = null;
        if (requests == null) {
            ret = selectNow ? new ArrayList<CompactionRequest>(r.getStores().size()) : null;
            for (Store s : r.getStores().values()) {
                CompactionRequest cr = this.requestCompactionInternal(r, s, why, p, null, selectNow);
                if (!selectNow) continue;
                ret.add(cr);
            }
        } else {
            Preconditions.checkArgument((boolean)selectNow);
            ret = new ArrayList(requests.size());
            for (Pair<CompactionRequest, Store> pair : requests) {
                ret.add(this.requestCompaction(r, (Store)pair.getSecond(), why, p, (CompactionRequest)pair.getFirst()));
            }
        }
        return ret;
    }

    @Override
    public CompactionRequest requestCompaction(HRegion r, Store s, String why, int priority, CompactionRequest request) throws IOException {
        return this.requestCompactionInternal(r, s, why, priority, request, true);
    }

    public synchronized void requestSystemCompaction(HRegion r, String why) throws IOException {
        this.requestCompactionInternal(r, why, Integer.MIN_VALUE, null, false);
    }

    public void requestSystemCompaction(HRegion r, Store s, String why) throws IOException {
        this.requestCompactionInternal(r, s, why, Integer.MIN_VALUE, null, false);
    }

    private synchronized CompactionRequest requestCompactionInternal(HRegion r, Store s, String why, int priority, CompactionRequest request, boolean selectNow) throws IOException {
        if (this.server.isStopped() || r.getTableDesc() != null && !r.getTableDesc().isCompactionEnabled()) {
            return null;
        }
        CompactionContext compaction = null;
        if (selectNow && (compaction = this.selectCompaction(r, s, priority, request)) == null) {
            return null;
        }
        long size = selectNow ? compaction.getRequest().getSize() : 0L;
        ThreadPoolExecutor pool = !selectNow && s.throttleCompaction(size) ? this.longCompactions : this.shortCompactions;
        pool.execute(new CompactionRunner(s, r, compaction, pool));
        if (LOG.isDebugEnabled()) {
            String type = pool == this.shortCompactions ? "Small " : "Large ";
            LOG.debug((Object)(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system") + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this));
        }
        return selectNow ? compaction.getRequest() : null;
    }

    private CompactionContext selectCompaction(HRegion r, Store s, int priority, CompactionRequest request) throws IOException {
        CompactionContext compaction = s.requestCompaction(priority, request);
        if (compaction == null) {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Not compacting " + r.getRegionNameAsString() + " because compaction request was cancelled"));
            }
            return null;
        }
        assert (compaction.hasSelection());
        if (priority != Integer.MIN_VALUE) {
            compaction.getRequest().setPriority(priority);
        }
        return compaction;
    }

    void interruptIfNecessary() {
        this.splits.shutdown();
        this.mergePool.shutdown();
        this.longCompactions.shutdown();
        this.shortCompactions.shutdown();
    }

    private void waitFor(ThreadPoolExecutor t, String name) {
        boolean done = false;
        while (!done) {
            try {
                done = t.awaitTermination(60L, TimeUnit.SECONDS);
                LOG.info((Object)("Waiting for " + name + " to finish..."));
                if (done) continue;
                t.shutdownNow();
            }
            catch (InterruptedException ie) {
                LOG.warn((Object)("Interrupted waiting for " + name + " to finish..."));
            }
        }
    }

    void join() {
        this.waitFor(this.splits, "Split Thread");
        this.waitFor(this.mergePool, "Merge Thread");
        this.waitFor(this.longCompactions, "Large Compaction Thread");
        this.waitFor(this.shortCompactions, "Small Compaction Thread");
    }

    public int getCompactionQueueSize() {
        return this.longCompactions.getQueue().size() + this.shortCompactions.getQueue().size();
    }

    public int getLargeCompactionQueueSize() {
        return this.longCompactions.getQueue().size();
    }

    public int getSmallCompactionQueueSize() {
        return this.shortCompactions.getQueue().size();
    }

    public int getSplitQueueSize() {
        return this.splits.getQueue().size();
    }

    private boolean shouldSplitRegion() {
        if ((double)this.server.getNumberOfOnlineRegions() > 0.9 * (double)this.regionSplitLimit) {
            LOG.warn((Object)("Total number of regions is approaching the upper limit " + this.regionSplitLimit + ". " + "Please consider taking a look at http://hbase.apache.org/book.html#ops.regionmgt"));
        }
        return this.regionSplitLimit > this.server.getNumberOfOnlineRegions();
    }

    public int getRegionSplitLimit() {
        return this.regionSplitLimit;
    }

    @Override
    public void onConfigurationChange(Configuration newConf) {
        CompactionThroughputController old;
        int largeThreads = Math.max(1, newConf.getInt(LARGE_COMPACTION_THREADS, 1));
        if (this.longCompactions.getCorePoolSize() != largeThreads) {
            LOG.info((Object)("Changing the value of hbase.regionserver.thread.compaction.large from " + this.longCompactions.getCorePoolSize() + " to " + largeThreads));
            this.longCompactions.setMaximumPoolSize(largeThreads);
            this.longCompactions.setCorePoolSize(largeThreads);
        }
        int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS, 1);
        if (this.shortCompactions.getCorePoolSize() != smallThreads) {
            LOG.info((Object)("Changing the value of hbase.regionserver.thread.compaction.small from " + this.shortCompactions.getCorePoolSize() + " to " + smallThreads));
            this.shortCompactions.setMaximumPoolSize(smallThreads);
            this.shortCompactions.setCorePoolSize(smallThreads);
        }
        int splitThreads = newConf.getInt(SPLIT_THREADS, 1);
        if (this.splits.getCorePoolSize() != splitThreads) {
            LOG.info((Object)("Changing the value of hbase.regionserver.thread.split from " + this.splits.getCorePoolSize() + " to " + splitThreads));
            this.splits.setMaximumPoolSize(smallThreads);
            this.splits.setCorePoolSize(smallThreads);
        }
        int mergeThreads = newConf.getInt(MERGE_THREADS, 1);
        if (this.mergePool.getCorePoolSize() != mergeThreads) {
            LOG.info((Object)("Changing the value of hbase.regionserver.thread.merge from " + this.mergePool.getCorePoolSize() + " to " + mergeThreads));
            this.mergePool.setMaximumPoolSize(smallThreads);
            this.mergePool.setCorePoolSize(smallThreads);
        }
        if ((old = this.compactionThroughputController) != null) {
            old.stop("configuration change");
        }
        this.compactionThroughputController = CompactionThroughputControllerFactory.create(this.server, newConf);
        this.conf.reloadConfiguration();
    }

    protected int getSmallCompactionThreadNum() {
        return this.shortCompactions.getCorePoolSize();
    }

    public int getLargeCompactionThreadNum() {
        return this.longCompactions.getCorePoolSize();
    }

    @Override
    public void registerChildren(ConfigurationManager manager) {
    }

    @Override
    public void deregisterChildren(ConfigurationManager manager) {
    }

    @VisibleForTesting
    public CompactionThroughputController getCompactionThroughputController() {
        return this.compactionThroughputController;
    }

    private static class Rejection
    implements RejectedExecutionHandler {
        private Rejection() {
        }

        @Override
        public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
            if (runnable instanceof CompactionRunner) {
                CompactionRunner runner = (CompactionRunner)runnable;
                LOG.debug((Object)("Compaction Rejected: " + runner));
                runner.store.cancelRequestedCompaction(runner.compaction);
            }
        }
    }

    @SuppressWarnings(value={"EQ_COMPARETO_USE_OBJECT_EQUALS"}, justification="Contrived use of compareTo")
    private class CompactionRunner
    implements Runnable,
    Comparable<CompactionRunner> {
        private final Store store;
        private final HRegion region;
        private CompactionContext compaction;
        private int queuedPriority;
        private ThreadPoolExecutor parent;

        public CompactionRunner(Store store, HRegion region, CompactionContext compaction, ThreadPoolExecutor parent) {
            this.store = store;
            this.region = region;
            this.compaction = compaction;
            this.queuedPriority = this.compaction == null ? store.getCompactPriority() : compaction.getRequest().getPriority();
            this.parent = parent;
        }

        public String toString() {
            return this.compaction != null ? "Request = " + this.compaction.getRequest() : "Store = " + this.store.toString() + ", pri = " + this.queuedPriority;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            Preconditions.checkNotNull((Object)CompactSplitThread.this.server);
            if (CompactSplitThread.this.server.isStopped() || this.region.getTableDesc() != null && !this.region.getTableDesc().isCompactionEnabled()) {
                return;
            }
            if (this.compaction == null) {
                ThreadPoolExecutor pool;
                int oldPriority = this.queuedPriority;
                this.queuedPriority = this.store.getCompactPriority();
                if (this.queuedPriority > oldPriority) {
                    this.parent.execute(this);
                    return;
                }
                try {
                    this.compaction = CompactSplitThread.this.selectCompaction(this.region, this.store, this.queuedPriority, null);
                }
                catch (IOException ex) {
                    LOG.error((Object)("Compaction selection failed " + this), (Throwable)ex);
                    CompactSplitThread.this.server.checkFileSystem();
                    return;
                }
                if (this.compaction == null) {
                    return;
                }
                assert (this.compaction.hasSelection());
                ThreadPoolExecutor threadPoolExecutor = pool = this.store.throttleCompaction(this.compaction.getRequest().getSize()) ? CompactSplitThread.this.longCompactions : CompactSplitThread.this.shortCompactions;
                if (this.parent != pool) {
                    this.store.cancelRequestedCompaction(this.compaction);
                    this.compaction = null;
                    this.parent = pool;
                    this.parent.execute(this);
                    return;
                }
            }
            assert (this.compaction != null);
            this.compaction.getRequest().beforeExecute();
            try {
                long start = EnvironmentEdgeManager.currentTime();
                boolean completed = this.region.compact(this.compaction, this.store, CompactSplitThread.this.compactionThroughputController);
                long now = EnvironmentEdgeManager.currentTime();
                LOG.info((Object)((completed ? "Completed" : "Aborted") + " compaction: " + this + "; duration=" + StringUtils.formatTimeDiff((long)now, (long)start)));
                if (completed) {
                    if (this.store.getCompactPriority() <= 0) {
                        CompactSplitThread.this.requestSystemCompaction(this.region, this.store, "Recursive enqueue");
                    } else {
                        CompactSplitThread.this.requestSplit(this.region);
                    }
                }
            }
            catch (IOException ex) {
                IOException remoteEx = RemoteExceptionHandler.checkIOException((IOException)ex);
                LOG.error((Object)("Compaction failed " + this), (Throwable)remoteEx);
                if (remoteEx != ex) {
                    LOG.info((Object)("Compaction failed at original callstack: " + this.formatStackTrace(ex)));
                }
                CompactSplitThread.this.server.checkFileSystem();
            }
            catch (Exception ex) {
                LOG.error((Object)("Compaction failed " + this), (Throwable)ex);
                CompactSplitThread.this.server.checkFileSystem();
            }
            finally {
                LOG.debug((Object)("CompactSplitThread Status: " + CompactSplitThread.this));
            }
            this.compaction.getRequest().afterExecute();
        }

        private String formatStackTrace(Exception ex) {
            StringWriter sw = new StringWriter();
            PrintWriter pw = new PrintWriter(sw);
            ex.printStackTrace(pw);
            pw.flush();
            return sw.toString();
        }

        @Override
        public int compareTo(CompactionRunner o) {
            int compareVal = this.queuedPriority - o.queuedPriority;
            if (compareVal != 0) {
                return compareVal;
            }
            CompactionContext tc = this.compaction;
            CompactionContext oc = o.compaction;
            return tc == null ? (oc == null ? 0 : 1) : (oc == null ? -1 : tc.getRequest().compareTo(oc.getRequest()));
        }
    }
}

