/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.llap.tezplugins;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.ByteArrayDataOutput;
import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang.mutable.MutableInt;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.JvmPauseMonitor;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem;
import org.apache.hadoop.hive.llap.metrics.MetricsUtils;
import org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos;
import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
import org.apache.hadoop.hive.llap.registry.impl.InactiveServiceInstance;
import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
import org.apache.hadoop.hive.llap.tezplugins.ContainerFactory;
import org.apache.hadoop.hive.llap.tezplugins.LlapTaskCommunicator;
import org.apache.hadoop.hive.llap.tezplugins.endpoint.LlapPluginServerImpl;
import org.apache.hadoop.hive.llap.tezplugins.helpers.MonotonicClock;
import org.apache.hadoop.hive.llap.tezplugins.metrics.LlapMetricsCollector;
import org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerMetrics;
import org.apache.hadoop.hive.llap.tezplugins.scheduler.LoggingFutureCallback;
import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener;
import org.apache.hadoop.hive.registry.impl.TezAmRegistryImpl;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.service.ServiceStateChangeListener;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hive.common.util.Ref;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.serviceplugins.api.DagInfo;
import org.apache.tez.serviceplugins.api.ServicePluginError;
import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.serviceplugins.api.TaskScheduler;
import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LlapTaskSchedulerService
extends TaskScheduler {
    private static final Logger LOG = LoggerFactory.getLogger(LlapTaskSchedulerService.class);
    private static final Logger WM_LOG = LoggerFactory.getLogger((String)"GuaranteedTasks");
    private static final TaskStartComparator TASK_INFO_COMPARATOR = new TaskStartComparator();
    private static final Comparator<Priority> PRIORITY_COMPARATOR = new Comparator<Priority>(){

        @Override
        public int compare(Priority o1, Priority o2) {
            return o1.getPriority() - o2.getPriority();
        }
    };
    private final UpdateOperationCallback UPDATE_CALLBACK = new UpdateOperationCallback();
    static LlapTaskSchedulerService instance = null;
    private final Configuration conf;
    private LlapServiceInstanceSet activeInstances;
    @VisibleForTesting
    final Map<String, NodeInfo> instanceToNodeMap = new LinkedHashMap<String, NodeInfo>();
    private final TreeMap<Priority, List<TaskInfo>> pendingTasks = new TreeMap(PRIORITY_COMPARATOR);
    private final ConcurrentMap<Object, TaskInfo> knownTasks = new ConcurrentHashMap<Object, TaskInfo>();
    private final Map<TezTaskAttemptID, TaskInfo> tasksById = new HashMap<TezTaskAttemptID, TaskInfo>();
    private final TreeMap<Integer, TreeSet<TaskInfo>> guaranteedTasks = new TreeMap();
    private final TreeMap<Integer, TreeSet<TaskInfo>> speculativeTasks = new TreeMap();
    private final LlapPluginServerImpl pluginEndpoint;
    @VisibleForTesting
    final DelayQueue<NodeInfo> disabledNodesQueue = new DelayQueue();
    @VisibleForTesting
    final DelayQueue<TaskInfo> delayedTaskQueue = new DelayQueue();
    private volatile boolean dagRunning = false;
    private final ContainerFactory containerFactory;
    @VisibleForTesting
    final Clock clock;
    private final ListeningExecutorService nodeEnabledExecutor;
    private final NodeEnablerCallable nodeEnablerCallable = new NodeEnablerCallable();
    private final ListeningExecutorService delayedTaskSchedulerExecutor;
    @VisibleForTesting
    final DelayedTaskSchedulerCallable delayedTaskSchedulerCallable;
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private final ReentrantReadWriteLock.ReadLock readLock = this.lock.readLock();
    private final ReentrantReadWriteLock.WriteLock writeLock = this.lock.writeLock();
    private final Lock scheduleLock = new ReentrantLock();
    private final Condition scheduleCondition = this.scheduleLock.newCondition();
    private final AtomicBoolean pendingScheduleInvocations = new AtomicBoolean(false);
    private final ListeningExecutorService schedulerExecutor;
    private final SchedulerCallable schedulerCallable = new SchedulerCallable();
    private final AtomicBoolean isStopped = new AtomicBoolean(false);
    private final AtomicInteger pendingPreemptions = new AtomicInteger(0);
    private final Map<String, MutableInt> pendingPreemptionsPerHost = new HashMap<String, MutableInt>();
    private final NodeBlacklistConf nodeBlacklistConf;
    private final LocalityDelayConf localityDelayConf;
    private final int numSchedulableTasksPerNode;
    private final long timeout;
    private final Lock timeoutLock = new ReentrantLock();
    private final ScheduledExecutorService timeoutExecutor;
    private final ScheduledExecutorService scheduledLoggingExecutor;
    private final SchedulerTimeoutMonitor timeoutMonitor;
    private ScheduledFuture<?> timeoutFuture;
    private final AtomicReference<ScheduledFuture<?>> timeoutFutureRef = new AtomicReference<Object>(null);
    private final AtomicInteger assignedTaskCounter = new AtomicInteger(0);
    private final LlapRegistryService registry = new LlapRegistryService(false);
    private final TezAmRegistryImpl amRegistry;
    private volatile ListenableFuture<Void> nodeEnablerFuture;
    private volatile ListenableFuture<Void> delayedTaskSchedulerFuture;
    private volatile ListenableFuture<Void> schedulerFuture;
    @VisibleForTesting
    private final AtomicInteger dagCounter = new AtomicInteger(1);
    @VisibleForTesting
    StatsPerDag dagStats = new StatsPerDag();
    private final LlapTaskSchedulerMetrics metrics;
    private final JvmPauseMonitor pauseMonitor;
    private final Random random = new Random();
    private int totalGuaranteed = 0;
    private int unusedGuaranteed = 0;
    private final boolean consistentSplits;
    private long totalGuaranteedVersion = Long.MIN_VALUE;
    private final Object registryUpdateLock = new Object();
    private long tgVersionSent = Long.MIN_VALUE;
    private LlapTaskCommunicator communicator;
    private final String amAppId;
    private final String amHiveSessionId;
    private final int amPort;
    private final String serializedToken;
    private final String jobIdForToken;
    private final Object outputsLock = new Object();
    private TezDAGID depsDagId = null;
    private Map<Integer, Set<Integer>> transitiveOutputs;
    private LlapMetricsCollector llapMetricsCollector;
    public static final String LLAP_PLUGIN_ENDPOINT_ENABLED = "llap.plugin.endpoint.enabled";
    private static final SelectHostResult SELECT_HOST_RESULT_INADEQUATE_TOTAL_CAPACITY = new SelectHostResult(ScheduleResult.INADEQUATE_TOTAL_RESOURCES);
    private static final SelectHostResult SELECT_HOST_RESULT_DELAYED_LOCALITY = new SelectHostResult(ScheduleResult.DELAYED_LOCALITY);
    private static final SelectHostResult SELECT_HOST_RESULT_DELAYED_RESOURCES = new SelectHostResult(ScheduleResult.DELAYED_RESOURCES);

    public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) {
        this(taskSchedulerContext, new MonotonicClock(), true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext, Clock clock, boolean initMetrics) {
        super(taskSchedulerContext);
        this.clock = clock;
        this.amPort = taskSchedulerContext.getAppClientPort();
        this.amAppId = taskSchedulerContext.getApplicationAttemptId().getApplicationId().toString();
        this.delayedTaskSchedulerCallable = this.createDelayedTaskSchedulerCallable();
        try {
            this.conf = TezUtils.createConfFromUserPayload((UserPayload)taskSchedulerContext.getInitialUserPayload());
        }
        catch (IOException e) {
            throw new TezUncheckedException("Failed to parse user payload for " + LlapTaskSchedulerService.class.getSimpleName(), (Throwable)e);
        }
        this.amHiveSessionId = HiveConf.getVar((Configuration)this.conf, (HiveConf.ConfVars)HiveConf.ConfVars.HIVESESSIONID);
        this.consistentSplits = HiveConf.getBoolVar((Configuration)this.conf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS);
        if (this.conf.getBoolean(LLAP_PLUGIN_ENDPOINT_ENABLED, false)) {
            JobTokenSecretManager sm = null;
            if (UserGroupInformation.isSecurityEnabled()) {
                ApplicationId id = ApplicationId.newInstance((long)System.nanoTime(), (int)((int)(System.nanoTime() % 100000L)));
                Token<JobTokenIdentifier> token = LlapTaskSchedulerService.createAmsToken(id);
                this.serializedToken = LlapTaskSchedulerService.serializeToken(token);
                this.jobIdForToken = token.getService().toString();
                sm = new JobTokenSecretManager();
                sm.addTokenForJob(this.jobIdForToken, token);
            } else {
                this.jobIdForToken = null;
                this.serializedToken = null;
            }
            this.pluginEndpoint = new LlapPluginServerImpl((SecretManager<JobTokenIdentifier>)sm, HiveConf.getIntVar((Configuration)this.conf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_PLUGIN_RPC_NUM_HANDLERS), this, HiveConf.getIntVar((Configuration)this.conf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_PLUGIN_RPC_PORT));
        } else {
            this.jobIdForToken = null;
            this.serializedToken = null;
            this.pluginEndpoint = null;
        }
        this.containerFactory = new ContainerFactory(taskSchedulerContext.getApplicationAttemptId(), taskSchedulerContext.getCustomClusterIdentifier());
        this.nodeBlacklistConf = new NodeBlacklistConf(HiveConf.getTimeVar((Configuration)this.conf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MS, (TimeUnit)TimeUnit.MILLISECONDS), HiveConf.getTimeVar((Configuration)this.conf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_TASK_SCHEDULER_NODE_REENABLE_MAX_TIMEOUT_MS, (TimeUnit)TimeUnit.MILLISECONDS), HiveConf.getFloatVar((Configuration)this.conf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_TASK_SCHEDULER_NODE_DISABLE_BACK_OFF_FACTOR));
        this.numSchedulableTasksPerNode = HiveConf.getIntVar((Configuration)this.conf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_TASK_SCHEDULER_NUM_SCHEDULABLE_TASKS_PER_NODE);
        long localityDelayMs = HiveConf.getTimeVar((Configuration)this.conf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_TASK_SCHEDULER_LOCALITY_DELAY, (TimeUnit)TimeUnit.MILLISECONDS);
        this.localityDelayConf = new LocalityDelayConf(localityDelayMs);
        this.timeoutMonitor = new SchedulerTimeoutMonitor();
        this.timeout = HiveConf.getTimeVar((Configuration)this.conf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_DAEMON_TASK_SCHEDULER_TIMEOUT_SECONDS, (TimeUnit)TimeUnit.MILLISECONDS);
        this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapTaskSchedulerTimeoutMonitor").build());
        this.timeoutFuture = null;
        this.scheduledLoggingExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapTaskSchedulerTimedLogThread").build());
        if (HiveConf.getTimeVar((Configuration)this.conf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_MS, (TimeUnit)TimeUnit.MILLISECONDS) > 0L) {
            this.llapMetricsCollector = new LlapMetricsCollector(this.conf, this.registry);
            this.registry.registerServiceListener((ServiceStateChangeListener)this.llapMetricsCollector);
        }
        String instanceId = HiveConf.getTrimmedVar((Configuration)this.conf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
        Preconditions.checkNotNull((Object)instanceId, (Object)(HiveConf.ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname + " must be defined"));
        ExecutorService executorServiceRaw = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapSchedulerNodeEnabler").build());
        this.nodeEnabledExecutor = MoreExecutors.listeningDecorator((ExecutorService)executorServiceRaw);
        ExecutorService delayedTaskSchedulerExecutorRaw = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapSchedulerDelayedTaskHandler").build());
        this.delayedTaskSchedulerExecutor = MoreExecutors.listeningDecorator((ExecutorService)delayedTaskSchedulerExecutorRaw);
        ExecutorService schedulerExecutorServiceRaw = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapScheduler").build());
        this.schedulerExecutor = MoreExecutors.listeningDecorator((ExecutorService)schedulerExecutorServiceRaw);
        if (initMetrics && !this.conf.getBoolean(HiveConf.ConfVars.HIVE_IN_TEST.varname, false)) {
            LlapMetricsSystem.initialize((String)"LlapTaskScheduler");
            this.pauseMonitor = new JvmPauseMonitor(this.conf);
            this.pauseMonitor.start();
            String displayName = "LlapTaskSchedulerMetrics-" + MetricsUtils.getHostName();
            String sessionId = this.conf.get("llap.daemon.metrics.sessionid");
            this.metrics = LlapTaskSchedulerMetrics.create(displayName, sessionId);
        } else {
            this.metrics = null;
            this.pauseMonitor = null;
        }
        String hostsString = HiveConf.getVar((Configuration)this.conf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
        LOG.info("Running with configuration: hosts={}, numSchedulableTasksPerNode={}, nodeBlacklistConf={}, localityConf={} consistentSplits={}", new Object[]{hostsString, this.numSchedulableTasksPerNode, this.nodeBlacklistConf, this.localityDelayConf, this.consistentSplits});
        String registryName = HiveConf.getVar((Configuration)this.conf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME);
        this.amRegistry = TezAmRegistryImpl.create((String)registryName, (Configuration)this.conf, (boolean)true);
        Object object = LlapTaskCommunicator.pluginInitLock;
        synchronized (object) {
            LlapTaskCommunicator peer = LlapTaskCommunicator.instance;
            if (peer != null) {
                this.setTaskCommunicator(peer);
                peer.setScheduler(this);
                LlapTaskCommunicator.instance = null;
            } else {
                instance = this;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Map<Integer, Set<Integer>> getDependencyInfo(TezDAGID depsDagId) {
        Object object = this.outputsLock;
        synchronized (object) {
            if (depsDagId == this.depsDagId) {
                return this.transitiveOutputs;
            }
            this.depsDagId = depsDagId;
            if (!HiveConf.getBoolVar((Configuration)this.conf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_TASK_SCHEDULER_PREEMPT_INDEPENDENT)) {
                this.transitiveOutputs = LlapTaskSchedulerService.getTransitiveVertexOutputs(this.getContext().getCurrentDagInfo());
            }
            return this.transitiveOutputs;
        }
    }

    private static Map<Integer, Set<Integer>> getTransitiveVertexOutputs(DagInfo info) {
        if (!(info instanceof DAG)) {
            LOG.warn("DAG info is not a DAG - cannot derive dependencies");
            return null;
        }
        DAG dag = (DAG)info;
        int vc = dag.getVertices().size();
        HashMap result = Maps.newHashMapWithExpectedSize((int)vc);
        LinkedList<TezVertexID> queue = new LinkedList<TezVertexID>();
        for (Vertex v : dag.getVertices().values()) {
            Map out = v.getOutputVertices();
            if (out == null) {
                result.put(v.getVertexId().getId(), Sets.newHashSet());
            } else {
                HashSet set = Sets.newHashSetWithExpectedSize((int)vc);
                for (Object outV : out.keySet()) {
                    set.add(outV.getVertexId().getId());
                }
                result.put(v.getVertexId().getId(), set);
            }
            if (v.getOutputVerticesCount() != 0) continue;
            queue.add(v.getVertexId());
        }
        HashSet processed = Sets.newHashSetWithExpectedSize((int)vc);
        while (!queue.isEmpty()) {
            TezVertexID id = (TezVertexID)queue.poll();
            if (processed.contains(id.getId())) continue;
            Vertex v = dag.getVertex(id);
            Map out = v.getOutputVertices();
            if (out != null) {
                boolean doBacktrack = false;
                for (Vertex outV : out.keySet()) {
                    TezVertexID outId = outV.getVertexId();
                    int outNum = outId.getId();
                    if (processed.contains(outNum)) continue;
                    if (!doBacktrack) {
                        queue.addFirst(id);
                        doBacktrack = true;
                    }
                    queue.addFirst(outId);
                }
                if (doBacktrack) continue;
            }
            int num = id.getId();
            processed.add(num);
            Set deps = (Set)result.get(num);
            Map in = v.getInputVertices();
            if (in == null) continue;
            for (Vertex inV : in.keySet()) {
                queue.add(inV.getVertexId());
                ((Set)result.get(inV.getVertexId().getId())).addAll(deps);
            }
        }
        return result;
    }

    private static Token<JobTokenIdentifier> createAmsToken(ApplicationId id) {
        if (!UserGroupInformation.isSecurityEnabled()) {
            return null;
        }
        JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(id.toString()));
        JobTokenSecretManager jobTokenManager = new JobTokenSecretManager();
        Token sessionToken = new Token((TokenIdentifier)identifier, (SecretManager)jobTokenManager);
        sessionToken.setService(identifier.getJobId());
        return sessionToken;
    }

    private static String serializeToken(Token<JobTokenIdentifier> token) {
        byte[] bytes = null;
        try {
            ByteArrayDataOutput out = ByteStreams.newDataOutput();
            token.write((DataOutput)out);
            bytes = out.toByteArray();
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        return Base64.encodeBase64String((byte[])bytes);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    void updateGuaranteedCount(int newTotalGuaranteed) {
        long tgVersionForZk;
        ArrayList<TaskInfo> toUpdate = null;
        this.writeLock.lock();
        try {
            int totalUpdated;
            int delta = newTotalGuaranteed - this.totalGuaranteed;
            tgVersionForZk = ++this.totalGuaranteedVersion;
            WM_LOG.info("Received guaranteed tasks " + newTotalGuaranteed + " (internal version " + tgVersionForZk + "); the delta to adjust by is " + delta);
            if (delta == 0) {
                return;
            }
            this.totalGuaranteed = newTotalGuaranteed;
            if (this.metrics != null) {
                this.metrics.setWmTotalGuaranteed(this.totalGuaranteed);
            }
            if (delta > 0) {
                if (this.unusedGuaranteed == 0) {
                    toUpdate = new ArrayList<TaskInfo>();
                    totalUpdated = this.distributeGuaranteed(delta, null, toUpdate);
                    delta -= totalUpdated;
                    WM_LOG.info("Distributed " + totalUpdated);
                }
                int result = this.unusedGuaranteed += delta;
                if (this.metrics != null) {
                    this.metrics.setWmUnusedGuaranteed(result);
                }
                WM_LOG.info("Setting unused to " + result + " based on remaining delta " + delta);
            } else {
                if ((delta = -delta) <= this.unusedGuaranteed) {
                    int result = this.unusedGuaranteed -= delta;
                    if (this.metrics != null) {
                        this.metrics.setWmUnusedGuaranteed(result);
                    }
                    WM_LOG.info("Setting unused to " + result + " based on full delta " + delta);
                    return;
                }
                this.unusedGuaranteed = 0;
                toUpdate = new ArrayList();
                totalUpdated = this.revokeGuaranteed(delta -= this.unusedGuaranteed, null, toUpdate);
                if (this.metrics != null) {
                    this.metrics.setWmUnusedGuaranteed(0);
                }
                WM_LOG.info("Setting unused to 0; revoked " + totalUpdated + " / " + delta);
                if (delta != totalUpdated) {
                    throw new AssertionError((Object)("Failed to revoke " + delta + " guaranteed tasks locally"));
                }
            }
        }
        finally {
            this.writeLock.unlock();
        }
        this.updateGuaranteedInRegistry(tgVersionForZk, newTotalGuaranteed);
        if (toUpdate == null) {
            return;
        }
        WM_LOG.info("Sending updates to " + toUpdate.size() + " tasks");
        for (TaskInfo ti : toUpdate) {
            this.checkAndSendGuaranteedStateUpdate(ti);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    protected void checkAndSendGuaranteedStateUpdate(TaskInfo ti) {
        boolean newState = false;
        TaskInfo taskInfo = ti;
        synchronized (taskInfo) {
            assert (ti.isPendingUpdate);
            if (ti.lastSetGuaranteed != null && ti.lastSetGuaranteed == ti.isGuaranteed || ti.isGuaranteed == null) {
                ti.requestedValue = ti.isGuaranteed;
                this.setUpdateDoneUnderTiLock(ti);
                WM_LOG.info("Not sending update to " + ti.attemptId);
                return;
            }
            newState = ti.isGuaranteed;
        }
        this.sendUpdateMessageAsync(ti, newState);
    }

    private void setUpdateStartedUnderTiLock(TaskInfo ti) {
        ti.isPendingUpdate = true;
        ti.requestedValue = ti.isGuaranteed;
        if (this.metrics != null) {
            this.metrics.setWmPendingStarted(ti.requestedValue);
        }
    }

    private void setUpdateDoneUnderTiLock(TaskInfo ti) {
        ti.isPendingUpdate = false;
        if (this.metrics != null && ti.requestedValue != null) {
            this.metrics.setWmPendingDone(ti.requestedValue);
        }
        ti.lastSetGuaranteed = ti.requestedValue;
        ti.requestedValue = null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    protected void handleUpdateResult(TaskInfo ti, boolean isOk) {
        Boolean newStateSameTask = null;
        Boolean newStateAnyTask = null;
        WM_LOG.info("Received response for " + ti.attemptId + ", " + isOk);
        TaskInfo taskInfo = ti;
        synchronized (taskInfo) {
            assert (ti.isPendingUpdate);
            if (ti.isGuaranteed == null) {
                ti.isPendingUpdate = false;
                ti.requestedValue = null;
                return;
            }
            boolean requestedValue = ti.requestedValue;
            if (isOk) {
                this.setUpdateDoneUnderTiLock(ti);
                if (requestedValue == ti.isGuaranteed) {
                    return;
                }
                newStateSameTask = ti.isGuaranteed;
                this.setUpdateStartedUnderTiLock(ti);
            } else {
                if (this.metrics != null) {
                    this.metrics.setWmPendingFailed(requestedValue);
                }
                if (requestedValue != ti.isGuaranteed) {
                    ti.isPendingUpdate = false;
                    ti.requestedValue = null;
                    return;
                }
                newStateAnyTask = requestedValue;
            }
        }
        if (newStateSameTask != null) {
            WM_LOG.info("Sending update to the same task in response handling " + ti.attemptId + ", " + newStateSameTask);
            this.sendUpdateMessageAsync(ti, newStateSameTask);
        }
        if (newStateAnyTask == null) {
            return;
        }
        ArrayList<TaskInfo> toUpdate = new ArrayList<TaskInfo>(1);
        this.writeLock.lock();
        try {
            TaskInfo requestedValue = ti;
            synchronized (requestedValue) {
                block22: {
                    ti.isPendingUpdate = false;
                    ti.requestedValue = null;
                    if (newStateAnyTask == ti.isGuaranteed) break block22;
                    return;
                }
                WM_LOG.info("Sending update to a different task in response handling " + ti.attemptId + ", " + newStateAnyTask);
                boolean isRemoved = LlapTaskSchedulerService.removeFromRunningTaskMap(newStateAnyTask != false ? this.guaranteedTasks : this.speculativeTasks, ti.task, ti);
                if (!isRemoved) {
                    String error = "Couldn't find the task in the correct map after an update " + ti.task;
                    LOG.error(error);
                    throw new AssertionError((Object)error);
                }
                ti.isGuaranteed = newStateAnyTask == false;
                LlapTaskSchedulerService.addToRunningTasksMap(newStateAnyTask != false ? this.speculativeTasks : this.guaranteedTasks, ti);
            }
            int count = 0;
            count = newStateAnyTask != false ? this.distributeGuaranteed(1, ti, toUpdate) : this.revokeGuaranteed(1, ti, toUpdate);
            assert (count == 1 && toUpdate.size() == 1);
        }
        finally {
            this.writeLock.unlock();
        }
        this.checkAndSendGuaranteedStateUpdate((TaskInfo)toUpdate.get(0));
    }

    public void initialize() {
        this.registry.init(this.conf);
        if (this.pluginEndpoint != null) {
            this.pluginEndpoint.init(this.conf);
        }
    }

    public void start() throws IOException {
        if (this.pluginEndpoint != null) {
            this.pluginEndpoint.start();
        }
        this.writeLock.lock();
        try {
            this.scheduledLoggingExecutor.schedule(new Callable<Void>(){

                @Override
                public Void call() throws Exception {
                    LlapTaskSchedulerService.this.readLock.lock();
                    try {
                        if (LlapTaskSchedulerService.this.dagRunning) {
                            LOG.info("Stats for current dag: {}", (Object)LlapTaskSchedulerService.this.dagStats);
                        }
                    }
                    finally {
                        LlapTaskSchedulerService.this.readLock.unlock();
                    }
                    return null;
                }
            }, 10000L, TimeUnit.MILLISECONDS);
            this.nodeEnablerFuture = this.nodeEnabledExecutor.submit((Callable)this.nodeEnablerCallable);
            Futures.addCallback(this.nodeEnablerFuture, (FutureCallback)new LoggingFutureCallback("NodeEnablerThread", LOG), (Executor)MoreExecutors.directExecutor());
            this.delayedTaskSchedulerFuture = this.delayedTaskSchedulerExecutor.submit((Callable)this.delayedTaskSchedulerCallable);
            Futures.addCallback(this.delayedTaskSchedulerFuture, (FutureCallback)new LoggingFutureCallback("DelayedTaskSchedulerThread", LOG), (Executor)MoreExecutors.directExecutor());
            this.schedulerFuture = this.schedulerExecutor.submit((Callable)this.schedulerCallable);
            Futures.addCallback(this.schedulerFuture, (FutureCallback)new LoggingFutureCallback("SchedulerThread", LOG), (Executor)MoreExecutors.directExecutor());
            this.registry.start();
            this.registry.registerStateChangeListener((ServiceInstanceStateChangeListener)new NodeStateChangeListener());
            this.activeInstances = this.registry.getInstances();
            for (LlapServiceInstance inst : this.activeInstances.getAll()) {
                this.registerAndAddNode(new NodeInfo(inst, this.nodeBlacklistConf, this.clock, this.numSchedulableTasksPerNode, this.metrics), inst);
            }
            if (this.amRegistry != null) {
                this.amRegistry.start();
                int pluginPort = this.pluginEndpoint != null ? this.pluginEndpoint.getActualPort() : -1;
                this.amRegistry.register(this.amPort, pluginPort, this.amHiveSessionId, this.serializedToken, this.jobIdForToken, 0, this.amAppId);
            }
        }
        finally {
            this.writeLock.unlock();
        }
    }

    @VisibleForTesting
    protected void setServiceInstanceSet(LlapServiceInstanceSet serviceInstanceSet) {
        this.activeInstances = serviceInstanceSet;
    }

    private void startTimeoutMonitor() {
        this.timeoutLock.lock();
        try {
            if ((this.timeoutFuture == null || this.timeoutFuture != null && this.timeoutFuture.isDone()) && this.activeInstances.size() == 0) {
                this.timeoutFuture = this.timeoutExecutor.schedule(this.timeoutMonitor, this.timeout, TimeUnit.MILLISECONDS);
                this.timeoutFutureRef.set(this.timeoutFuture);
                LOG.info("Scheduled timeout monitor task to run after {} ms", (Object)this.timeout);
            } else {
                LOG.info("Timeout monitor task not started. Timeout future state: {}, #instances: {}", this.timeoutFuture == null ? "null" : Boolean.valueOf(this.timeoutFuture.isDone()), (Object)this.activeInstances.size());
            }
        }
        finally {
            this.timeoutLock.unlock();
        }
    }

    private void stopTimeoutMonitor() {
        this.timeoutLock.lock();
        try {
            if (this.timeoutFuture != null && this.activeInstances.size() != 0 && this.timeoutFuture.cancel(false)) {
                this.timeoutFutureRef.set(null);
                LOG.info("Stopped timeout monitor task");
            } else {
                LOG.info("Timeout monitor task not stopped. Timeout future state: {}, #instances: {}", this.timeoutFuture == null ? "null" : Boolean.valueOf(this.timeoutFuture.isDone()), (Object)this.activeInstances.size());
            }
            this.timeoutFuture = null;
        }
        finally {
            this.timeoutLock.unlock();
        }
    }

    public void shutdown() {
        this.writeLock.lock();
        try {
            if (!this.isStopped.getAndSet(true)) {
                this.scheduledLoggingExecutor.shutdownNow();
                this.nodeEnablerCallable.shutdown();
                if (this.nodeEnablerFuture != null) {
                    this.nodeEnablerFuture.cancel(true);
                }
                this.nodeEnabledExecutor.shutdownNow();
                this.timeoutExecutor.shutdown();
                if (this.timeoutFuture != null) {
                    this.timeoutFuture.cancel(true);
                    this.timeoutFuture = null;
                }
                this.timeoutExecutor.shutdownNow();
                this.delayedTaskSchedulerCallable.shutdown();
                if (this.delayedTaskSchedulerFuture != null) {
                    this.delayedTaskSchedulerFuture.cancel(true);
                }
                this.delayedTaskSchedulerExecutor.shutdownNow();
                this.schedulerCallable.shutdown();
                if (this.schedulerFuture != null) {
                    this.schedulerFuture.cancel(true);
                }
                this.schedulerExecutor.shutdownNow();
                if (this.registry != null) {
                    this.registry.stop();
                }
                if (this.amRegistry != null) {
                    this.amRegistry.stop();
                }
                if (this.pluginEndpoint != null) {
                    this.pluginEndpoint.stop();
                }
                if (this.pauseMonitor != null) {
                    this.pauseMonitor.stop();
                }
                if (this.metrics != null) {
                    LlapMetricsSystem.shutdown();
                }
            }
        }
        finally {
            this.writeLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Resource getTotalResources() {
        int memory = 0;
        int vcores = 0;
        this.readLock.lock();
        try {
            int numInstancesFound = 0;
            for (LlapServiceInstance inst : this.activeInstances.getAll()) {
                Resource r = inst.getResource();
                memory += r.getMemory();
                vcores += r.getVirtualCores();
                ++numInstancesFound;
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("GetTotalResources: numInstancesFound={}, totalMem={}, totalVcores={}", new Object[]{numInstancesFound, memory, vcores});
            }
        }
        finally {
            this.readLock.unlock();
        }
        return Resource.newInstance((int)memory, (int)vcores);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Resource getAvailableResources() {
        int memory = 0;
        int vcores = 0;
        this.readLock.lock();
        try {
            int numInstancesFound = 0;
            for (LlapServiceInstance inst : this.activeInstances.getAll()) {
                NodeInfo nodeInfo = this.instanceToNodeMap.get(inst.getWorkerIdentity());
                if (nodeInfo == null || nodeInfo.isDisabled()) continue;
                Resource r = inst.getResource();
                memory += r.getMemory();
                vcores += r.getVirtualCores();
                ++numInstancesFound;
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("GetAvailableResources: numInstancesFound={}, totalMem={}, totalVcores={}", new Object[]{numInstancesFound, memory, vcores});
            }
        }
        finally {
            this.readLock.unlock();
        }
        return Resource.newInstance((int)memory, (int)vcores);
    }

    public int getClusterNodeCount() {
        this.readLock.lock();
        try {
            int n = this.activeInstances.getAll().size();
            return n;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void dagComplete() {
        long tgVersionForZk;
        LOG.info("DAG: " + this.dagCounter.get() + " completed. Scheduling stats: " + this.dagStats);
        this.dagCounter.incrementAndGet();
        if (this.metrics != null) {
            this.metrics.incrCompletedDagCount();
        }
        this.writeLock.lock();
        try {
            this.setDagRunning(false);
            this.dagStats = new StatsPerDag();
            int pendingCount = 0;
            for (Map.Entry<Priority, List<TaskInfo>> entry : this.pendingTasks.entrySet()) {
                if (entry.getValue() == null) continue;
                pendingCount += entry.getValue().size();
            }
            int runningCount = 0;
            for (Map.Entry<Integer, TreeSet<TaskInfo>> entry : this.guaranteedTasks.entrySet()) {
                TreeSet<TaskInfo> set = this.speculativeTasks.get(entry.getKey());
                if (set == null) {
                    set = new TreeSet();
                    this.speculativeTasks.put(entry.getKey(), set);
                }
                Iterator<TaskInfo> iterator = entry.getValue().iterator();
                while (iterator.hasNext()) {
                    TaskInfo info;
                    TaskInfo taskInfo = info = iterator.next();
                    synchronized (taskInfo) {
                        info.isGuaranteed = false;
                    }
                    set.add(info);
                }
            }
            this.guaranteedTasks.clear();
            for (Map.Entry<Integer, TreeSet<TaskInfo>> entry : this.speculativeTasks.entrySet()) {
                if (entry.getValue() == null) continue;
                runningCount += entry.getValue().size();
            }
            this.unusedGuaranteed = 0;
            this.totalGuaranteed = 0;
            tgVersionForZk = ++this.totalGuaranteedVersion;
            if (this.metrics != null) {
                this.metrics.setDagId(null);
                this.metrics.resetWmMetrics();
            }
            LOG.info("DAG reset. Current knownTaskCount={}, pendingTaskCount={}, runningTaskCount={}", new Object[]{this.knownTasks.size(), pendingCount, runningCount});
        }
        finally {
            this.writeLock.unlock();
        }
        if (!StringUtils.isEmpty((CharSequence)this.conf.get(HiveConf.ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE.varname, "").trim())) {
            this.updateGuaranteedInRegistry(tgVersionForZk, 0);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateGuaranteedInRegistry(long tgVersionForZk, int newTotalGuaranteed) {
        if (this.amRegistry == null) {
            return;
        }
        Object object = this.registryUpdateLock;
        synchronized (object) {
            if (tgVersionForZk <= this.tgVersionSent) {
                return;
            }
            try {
                this.amRegistry.updateGuaranteed(newTotalGuaranteed);
                this.tgVersionSent = tgVersionForZk;
            }
            catch (IOException ex) {
                LOG.error("Failed to update guaranteed count in registry; ignoring", (Throwable)ex);
            }
        }
    }

    public void blacklistNode(NodeId nodeId) {
        LOG.info("BlacklistNode not supported");
    }

    public void unblacklistNode(NodeId nodeId) {
        LOG.info("unBlacklistNode not supported");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void allocateTask(Object task, Resource capability, String[] hosts, String[] racks, Priority priority, Object containerSignature, Object clientCookie) {
        TezTaskAttemptID id = this.getTaskAttemptId(task);
        TaskInfo taskInfo = new TaskInfo(this.localityDelayConf, this.clock, task, clientCookie, priority, capability, hosts, racks, this.clock.getTime(), id);
        LOG.info("Received allocateRequest. task={}, priority={}, capability={}, hosts={}", new Object[]{task, priority, capability, Arrays.toString(hosts)});
        this.writeLock.lock();
        try {
            if (!this.dagRunning && this.metrics != null && id != null) {
                this.metrics.setDagId(id.getTaskID().getVertexID().getDAGId().toString());
            }
            this.setDagRunning(true);
            this.dagStats.registerTaskRequest(hosts, racks);
        }
        finally {
            this.writeLock.unlock();
        }
        this.addPendingTask(taskInfo);
        this.trySchedulingPendingTasks();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void allocateTask(Object task, Resource capability, ContainerId containerId, Priority priority, Object containerSignature, Object clientCookie) {
        TezTaskAttemptID id = this.getTaskAttemptId(task);
        TaskInfo taskInfo = new TaskInfo(this.localityDelayConf, this.clock, task, clientCookie, priority, capability, null, null, this.clock.getTime(), id);
        LOG.info("Received allocateRequest. task={}, priority={}, capability={}, containerId={}", new Object[]{task, priority, capability, containerId});
        this.writeLock.lock();
        try {
            if (!this.dagRunning && this.metrics != null && id != null) {
                this.metrics.setDagId(id.getTaskID().getVertexID().getDAGId().toString());
            }
            this.setDagRunning(true);
            this.dagStats.registerTaskRequest(null, null);
        }
        finally {
            this.writeLock.unlock();
        }
        this.addPendingTask(taskInfo);
        this.trySchedulingPendingTasks();
    }

    protected TezTaskAttemptID getTaskAttemptId(Object task) {
        if (task instanceof TaskAttempt) {
            return ((TaskAttempt)task).getID();
        }
        throw new AssertionError((Object)"LLAP plugin can only schedule task attempts");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEndReason endReason, String diagnostics) {
        boolean isGuaranteedFreed;
        TaskInfo taskInfo;
        TaskInfo toUpdate;
        boolean isEarlyExit;
        block34: {
            block33: {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Processing deallocateTask for task={}, taskSucceeded={}, endReason={}", new Object[]{task, taskSucceeded, endReason});
                }
                isEarlyExit = false;
                toUpdate = null;
                this.writeLock.lock();
                try {
                    taskInfo = this.unregisterTask(task);
                    if (taskInfo != null) break block33;
                    LOG.error("Could not determine ContainerId for task: " + task + " . Could have hit a race condition. Ignoring." + " The query may hang since this \"unknown\" container is now taking up a slot permanently");
                    boolean bl = false;
                    this.writeLock.unlock();
                    if (isEarlyExit) {
                        this.checkAndSendGuaranteedStateUpdate(toUpdate);
                    }
                    return bl;
                }
                catch (Throwable throwable) {
                    this.writeLock.unlock();
                    if (isEarlyExit) {
                        this.checkAndSendGuaranteedStateUpdate(toUpdate);
                    }
                    throw throwable;
                }
            }
            isGuaranteedFreed = false;
            TaskInfo taskInfo2 = taskInfo;
            synchronized (taskInfo2) {
                if (taskInfo.isGuaranteed == null) {
                    WM_LOG.error("Task appears to have been deallocated twice: " + task + " There may be inconsistencies in guaranteed task counts.");
                } else {
                    if (this.metrics != null) {
                        this.metrics.setWmTaskFinished(taskInfo.isGuaranteed, taskInfo.isPendingUpdate);
                    }
                    isGuaranteedFreed = taskInfo.isGuaranteed;
                    taskInfo.isGuaranteed = null;
                }
            }
            if (taskInfo.containerId != null) break block34;
            if (taskInfo.getState() == TaskInfo.State.ASSIGNED) {
                LOG.error("Task: " + task + " assigned, but could not find the corresponding containerId." + " The query may hang since this \"unknown\" container is now taking up a slot permanently");
            } else {
                LOG.info("Ignoring deallocate request for task " + task + " which hasn't been assigned to a container");
                this.removePendingTask(taskInfo);
            }
            if (isGuaranteedFreed) {
                toUpdate = this.distributeGuaranteedOnTaskCompletion();
                isEarlyExit = true;
            }
            boolean bl = false;
            this.writeLock.unlock();
            if (isEarlyExit) {
                this.checkAndSendGuaranteedStateUpdate(toUpdate);
            }
            return bl;
        }
        NodeInfo nodeInfo = taskInfo.assignedNode;
        assert (nodeInfo != null);
        LOG.info("Processing de-allocate request for task={}, state={}, endReason={}", new Object[]{taskInfo.task, taskInfo.getState(), endReason});
        if (taskInfo.getState() == TaskInfo.State.PREEMPTED) {
            this.unregisterPendingPreemption(taskInfo.assignedNode.getHost());
            nodeInfo.registerUnsuccessfulTaskEnd(true);
            if (nodeInfo.isDisabled()) {
                this.queueNodeForReEnablement(nodeInfo);
            }
            this.trySchedulingPendingTasks();
        } else if (taskSucceeded) {
            nodeInfo.registerTaskSuccess();
            if (nodeInfo.isDisabled()) {
                this.queueNodeForReEnablement(nodeInfo);
            }
            this.trySchedulingPendingTasks();
        } else {
            nodeInfo.registerUnsuccessfulTaskEnd(false);
            if (endReason != null && EnumSet.of(TaskAttemptEndReason.EXECUTOR_BUSY, TaskAttemptEndReason.COMMUNICATION_ERROR).contains(endReason)) {
                if (endReason == TaskAttemptEndReason.COMMUNICATION_ERROR) {
                    this.dagStats.registerCommFailure(taskInfo.assignedNode.getHost());
                } else if (endReason == TaskAttemptEndReason.EXECUTOR_BUSY) {
                    this.dagStats.registerTaskRejected(taskInfo.assignedNode.getHost());
                }
            }
            if (endReason != null && endReason == TaskAttemptEndReason.NODE_FAILED) {
                LOG.info("Task {} ended on {} with a NODE_FAILED message. A message should come in from the registry to disable this node unless this was a temporary communication failure", task, (Object)nodeInfo.toShortString());
            }
            boolean commFailure = endReason != null && endReason == TaskAttemptEndReason.COMMUNICATION_ERROR;
            this.disableNode(nodeInfo, commFailure);
        }
        if (isGuaranteedFreed) {
            toUpdate = this.distributeGuaranteedOnTaskCompletion();
        }
        this.writeLock.unlock();
        if (isEarlyExit) {
            this.checkAndSendGuaranteedStateUpdate(toUpdate);
        }
        if (toUpdate != null) {
            assert (!isEarlyExit);
            this.checkAndSendGuaranteedStateUpdate(toUpdate);
        }
        this.getContext().containerBeingReleased(taskInfo.containerId);
        this.getContext().containerCompleted(taskInfo.task, ContainerStatus.newInstance((ContainerId)taskInfo.containerId, (ContainerState)ContainerState.COMPLETE, (String)"", (int)0));
        return true;
    }

    public void notifyStarted(TezTaskAttemptID attemptId) {
        TaskInfo info = null;
        this.writeLock.lock();
        try {
            info = this.tasksById.get(attemptId);
            if (info == null) {
                WM_LOG.warn("Unknown task start notification " + attemptId);
                return;
            }
        }
        finally {
            this.writeLock.unlock();
        }
        this.handleUpdateResult(info, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean isInitialGuaranteed(TezTaskAttemptID attemptId) {
        TaskInfo info = null;
        this.readLock.lock();
        try {
            info = this.tasksById.get(attemptId);
        }
        finally {
            this.readLock.unlock();
        }
        if (info == null) {
            WM_LOG.warn("Status requested for an unknown task " + attemptId);
            return false;
        }
        TaskInfo taskInfo = info;
        synchronized (taskInfo) {
            if (info.isGuaranteed == null) {
                return false;
            }
            assert (info.lastSetGuaranteed == null);
            info.requestedValue = info.isGuaranteed;
            return info.isGuaranteed;
        }
    }

    private TaskInfo distributeGuaranteedOnTaskCompletion() {
        ArrayList<TaskInfo> toUpdate = new ArrayList<TaskInfo>(1);
        int updatedCount = this.distributeGuaranteed(1, null, toUpdate);
        assert (updatedCount <= 1);
        if (updatedCount == 0) {
            int result = ++this.unusedGuaranteed;
            if (this.metrics != null) {
                this.metrics.setWmUnusedGuaranteed(result);
            }
            WM_LOG.info("Returning the unused duck; unused is now " + result);
        }
        if (toUpdate.isEmpty()) {
            return null;
        }
        assert (toUpdate.size() == 1);
        return (TaskInfo)toUpdate.get(0);
    }

    public Object deallocateContainer(ContainerId containerId) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Ignoring deallocateContainer for containerId: {}", (Object)containerId);
        }
        return null;
    }

    public void setShouldUnregister() {
    }

    public boolean hasUnregistered() {
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private SelectHostResult selectHost(TaskInfo request) {
        Object[] requestedHosts = request.requestedHosts;
        String requestedHostsDebugStr = Arrays.toString(requestedHosts);
        if (LOG.isDebugEnabled()) {
            LOG.debug("selectingHost for task={} on hosts={}", request.task, (Object)requestedHostsDebugStr);
        }
        long schedulerAttemptTime = this.clock.getTime();
        this.readLock.lock();
        try {
            NodeInfo nodeInfo;
            Object object;
            boolean shouldDelayForLocality = request.shouldDelayForLocality(schedulerAttemptTime);
            LOG.debug("ShouldDelayForLocality={} for task={} on hosts={}", new Object[]{shouldDelayForLocality, request.task, requestedHostsDebugStr});
            if (requestedHosts != null && requestedHosts.length > 0) {
                int prefHostCount = -1;
                boolean requestedHostsWillBecomeAvailable = false;
                for (Object host : requestedHosts) {
                    ++prefHostCount;
                    Set instances = this.activeInstances.getByHost((String)host);
                    if (instances.isEmpty()) continue;
                    for (LlapServiceInstance inst : instances) {
                        NodeInfo nodeInfo2 = this.instanceToNodeMap.get(inst.getWorkerIdentity());
                        if (nodeInfo2 != null) {
                            if (nodeInfo2.canAcceptTask()) {
                                LOG.info("Assigning {} when looking for {}. local=true FirstRequestedHost={}, #prefLocations={}", new Object[]{nodeInfo2.toShortString(), host, prefHostCount == 0, requestedHosts.length});
                                SelectHostResult selectHostResult = new SelectHostResult(nodeInfo2);
                                return selectHostResult;
                            }
                            if (!shouldDelayForLocality) continue;
                            if (request.shouldForceLocality()) {
                                requestedHostsWillBecomeAvailable = true;
                                continue;
                            }
                            if (nodeInfo2.getEnableTime() > request.getLocalityDelayTimeout() && nodeInfo2.isDisabled() && nodeInfo2.hadCommFailure()) {
                                LOG.debug("Host={} will not become available within requested timeout", (Object)nodeInfo2);
                                continue;
                            }
                            requestedHostsWillBecomeAvailable = true;
                            continue;
                        }
                        LOG.warn("Null NodeInfo when attempting to get host with worker {}, and host {}", (Object)inst, host);
                    }
                }
                if (shouldDelayForLocality) {
                    if (requestedHostsWillBecomeAvailable) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Delaying local allocation for [" + request.task + "] when trying to allocate on [" + requestedHostsDebugStr + "]" + ". ScheduleAttemptTime=" + schedulerAttemptTime + ", taskDelayTimeout=" + request.getLocalityDelayTimeout());
                        }
                        SelectHostResult selectHostResult = SELECT_HOST_RESULT_DELAYED_LOCALITY;
                        return selectHostResult;
                    }
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Skipping local allocation for [" + request.task + "] when trying to allocate on [" + requestedHostsDebugStr + "] since none of these hosts are part of the known list");
                    }
                }
            }
            Collection instances = this.consistentSplits ? this.activeInstances.getAllInstancesOrdered(true) : this.activeInstances.getAll();
            ArrayList<NodeInfo> allNodes = new ArrayList<NodeInfo>(instances.size());
            ArrayList<NodeInfo> arrayList = new ArrayList<NodeInfo>();
            for (LlapServiceInstance inst : instances) {
                if (inst instanceof InactiveServiceInstance) {
                    allNodes.add(null);
                    continue;
                }
                NodeInfo nodeInfo3 = this.instanceToNodeMap.get(inst.getWorkerIdentity());
                if (nodeInfo3 == null) {
                    allNodes.add(null);
                    continue;
                }
                allNodes.add(nodeInfo3);
                if (!nodeInfo3.canAcceptTask()) continue;
                arrayList.add(nodeInfo3);
            }
            if (allNodes.isEmpty()) {
                object = SELECT_HOST_RESULT_DELAYED_RESOURCES;
                return object;
            }
            if (requestedHosts == null || requestedHosts.length == 0) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("No-locality requested. Selecting a random host for task={}", request.task);
                }
                object = this.randomSelection(arrayList);
                return object;
            }
            Object firstRequestedHost = requestedHosts[0];
            int requestedHostIdx = -1;
            for (int i2 = 0; i2 < allNodes.size(); ++i2) {
                nodeInfo = (NodeInfo)allNodes.get(i2);
                if (nodeInfo == null || !nodeInfo.getHost().equals(firstRequestedHost)) continue;
                requestedHostIdx = i2;
                break;
            }
            if (requestedHostIdx == -1) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Requested node [{}] in consistent order does not exist. Falling back to random selection for request {}", firstRequestedHost, (Object)request);
                }
                SelectHostResult i2 = this.randomSelection(arrayList);
                return i2;
            }
            for (int i = 0; i < allNodes.size(); ++i) {
                nodeInfo = (NodeInfo)allNodes.get((i + requestedHostIdx + 1) % allNodes.size());
                if (nodeInfo == null || !nodeInfo.canAcceptTask()) continue;
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Assigning {} in consistent order when looking for first requested host, from #hosts={}, requestedHosts={}", new Object[]{nodeInfo.toShortString(), allNodes.size(), requestedHosts == null || requestedHosts.length == 0 ? "null" : requestedHostsDebugStr});
                }
                SelectHostResult selectHostResult = new SelectHostResult(nodeInfo);
                return selectHostResult;
            }
            SelectHostResult selectHostResult = SELECT_HOST_RESULT_DELAYED_RESOURCES;
            return selectHostResult;
        }
        finally {
            this.readLock.unlock();
        }
    }

    private SelectHostResult randomSelection(List<NodeInfo> nodesWithFreeSlots) {
        if (nodesWithFreeSlots.isEmpty()) {
            return SELECT_HOST_RESULT_DELAYED_RESOURCES;
        }
        NodeInfo randomNode = nodesWithFreeSlots.get(this.random.nextInt(nodesWithFreeSlots.size()));
        if (LOG.isInfoEnabled()) {
            LOG.info("Assigning {} when looking for any host, from #hosts={}, requestedHosts=null", (Object)randomNode.toShortString(), (Object)nodesWithFreeSlots.size());
        }
        return new SelectHostResult(randomNode);
    }

    private void registerAndAddNode(NodeInfo node, LlapServiceInstance serviceInstance) {
        if (this.communicator != null) {
            boolean registered = this.communicator.registerDag(node, new RegisterDagCallback(node, serviceInstance));
            if (!registered) {
                this.addNode(node, serviceInstance);
            }
        } else {
            this.addNode(node, serviceInstance);
        }
    }

    private void addNode(NodeInfo node, LlapServiceInstance serviceInstance) {
        if (this.activeInstances.size() != 0 && this.timeoutFutureRef.get() != null) {
            LOG.info("New node added. Signalling scheduler timeout monitor thread to stop timer.");
            this.stopTimeoutMonitor();
        }
        NodeReport nodeReport = LlapTaskSchedulerService.constructNodeReport(serviceInstance, true);
        this.getContext().nodesUpdated(Collections.singletonList(nodeReport));
        this.instanceToNodeMap.put(node.getNodeIdentity(), node);
        if (this.metrics != null) {
            this.metrics.setClusterNodeCount(this.activeInstances.size());
        }
        LOG.info("Adding new node: {}. TotalNodeCount={}. activeInstances.size={}", new Object[]{node, this.instanceToNodeMap.size(), this.activeInstances.size()});
        this.trySchedulingPendingTasks();
    }

    private void reenableDisabledNode(NodeInfo nodeInfo) {
        this.writeLock.lock();
        try {
            LOG.info("Attempting to re-enable node: " + nodeInfo.toShortString());
            if (this.activeInstances.getInstance(nodeInfo.getNodeIdentity()) != null) {
                nodeInfo.enableNode();
                if (this.metrics != null) {
                    this.metrics.setDisabledNodeCount(this.disabledNodesQueue.size());
                }
            } else if (LOG.isInfoEnabled()) {
                LOG.info("Not re-enabling node: {}, since it is not present in the RegistryActiveNodeList", (Object)nodeInfo.toShortString());
            }
        }
        finally {
            this.writeLock.unlock();
        }
    }

    private void queueNodeForReEnablement(NodeInfo nodeInfo) {
        if (this.disabledNodesQueue.remove(nodeInfo)) {
            LOG.info("Queueing node for re-enablement: {}", (Object)nodeInfo.toShortString());
            nodeInfo.resetExpireInformation();
            this.disabledNodesQueue.add(nodeInfo);
        }
    }

    private void disableNode(NodeInfo nodeInfo, boolean isCommFailure) {
        this.writeLock.lock();
        try {
            if (nodeInfo == null || nodeInfo.isDisabled()) {
                if (LOG.isDebugEnabled()) {
                    if (nodeInfo != null) {
                        LOG.debug("Node: " + nodeInfo.toShortString() + " already disabled, or invalid. Not doing anything.");
                    } else {
                        LOG.debug("Ignoring disableNode invocation for null NodeInfo");
                    }
                }
            } else {
                nodeInfo.disableNode(isCommFailure);
                this.disabledNodesQueue.add(nodeInfo);
                if (this.metrics != null) {
                    this.metrics.setDisabledNodeCount(this.disabledNodesQueue.size());
                }
                this.trySchedulingPendingTasks();
            }
        }
        finally {
            this.writeLock.unlock();
        }
    }

    private static NodeReport constructNodeReport(LlapServiceInstance serviceInstance, boolean healthy) {
        NodeReport nodeReport = NodeReport.newInstance((NodeId)NodeId.newInstance((String)serviceInstance.getHost(), (int)serviceInstance.getRpcPort()), (NodeState)(healthy ? NodeState.RUNNING : NodeState.LOST), (String)serviceInstance.getServicesAddress(), null, null, null, (int)0, (String)"", (long)0L);
        return nodeReport;
    }

    private void addPendingTask(TaskInfo taskInfo) {
        this.writeLock.lock();
        try {
            List<TaskInfo> tasksAtPriority = this.pendingTasks.get(taskInfo.priority);
            if (tasksAtPriority == null) {
                tasksAtPriority = new LinkedList<TaskInfo>();
                this.pendingTasks.put(taskInfo.priority, tasksAtPriority);
            }
            tasksAtPriority.add(taskInfo);
            this.knownTasks.putIfAbsent(taskInfo.task, taskInfo);
            this.tasksById.put(taskInfo.attemptId, taskInfo);
            if (this.metrics != null) {
                this.metrics.incrPendingTasksCount();
            }
            if (LOG.isInfoEnabled()) {
                LOG.info("PendingTasksInfo={}", (Object)this.constructPendingTaskCountsLogMessage());
            }
        }
        finally {
            this.writeLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void removePendingTask(TaskInfo taskInfo) {
        this.writeLock.lock();
        try {
            Priority priority = taskInfo.priority;
            List<TaskInfo> taskInfoList = this.pendingTasks.get(priority);
            if (taskInfoList == null || taskInfoList.isEmpty() || !taskInfoList.remove(taskInfo)) {
                LOG.warn("Could not find task: " + taskInfo.task + " in pending list, at priority: " + priority);
            }
        }
        finally {
            this.writeLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    protected void registerRunningTask(TaskInfo taskInfo) {
        boolean isGuaranteed = false;
        TaskInfo taskInfo2 = taskInfo;
        synchronized (taskInfo2) {
            assert (!taskInfo.isPendingUpdate);
            isGuaranteed = taskInfo.isGuaranteed;
            taskInfo.isPendingUpdate = true;
            taskInfo.requestedValue = taskInfo.isGuaranteed;
            if (this.metrics != null) {
                this.metrics.setWmTaskStarted(taskInfo.requestedValue);
            }
            this.setUpdateStartedUnderTiLock(taskInfo);
        }
        TreeMap<Integer, TreeSet<TaskInfo>> runningTasks = isGuaranteed ? this.guaranteedTasks : this.speculativeTasks;
        this.writeLock.lock();
        try {
            WM_LOG.info("Registering " + taskInfo.attemptId + "; " + taskInfo.isGuaranteed);
            LlapTaskSchedulerService.addToRunningTasksMap(runningTasks, taskInfo);
            if (this.metrics != null) {
                this.metrics.decrPendingTasksCount();
            }
        }
        finally {
            this.writeLock.unlock();
        }
    }

    @VisibleForTesting
    protected TaskInfo getTaskInfo(Object task) {
        return (TaskInfo)this.knownTasks.get(task);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private TaskInfo unregisterTask(Object task) {
        this.writeLock.lock();
        try {
            TaskInfo taskInfo = (TaskInfo)this.knownTasks.remove(task);
            if (taskInfo != null) {
                this.tasksById.remove(taskInfo.attemptId);
                WM_LOG.info("Unregistering " + taskInfo.attemptId + "; " + taskInfo.isGuaranteed);
                if (taskInfo.getState() == TaskInfo.State.ASSIGNED && !LlapTaskSchedulerService.removeFromRunningTaskMap(this.speculativeTasks, task, taskInfo) && !LlapTaskSchedulerService.removeFromRunningTaskMap(this.guaranteedTasks, task, taskInfo)) {
                    Preconditions.checkState((boolean)false, (String)"runningTasks should contain an entry if the task was in running state. Caused by task: {}", (Object)task);
                }
            } else {
                LOG.warn("Could not find TaskInfo for task: {}. Not removing it from the running set", task);
            }
            TaskInfo taskInfo2 = taskInfo;
            return taskInfo2;
        }
        finally {
            this.writeLock.unlock();
        }
    }

    private static void addToRunningTasksMap(TreeMap<Integer, TreeSet<TaskInfo>> runningTasks, TaskInfo taskInfo) {
        int priority = taskInfo.priority.getPriority();
        TreeSet<TaskInfo> tasksAtpriority = runningTasks.get(priority);
        if (tasksAtpriority == null) {
            tasksAtpriority = new TreeSet<TaskInfo>(TASK_INFO_COMPARATOR);
            runningTasks.put(priority, tasksAtpriority);
        }
        tasksAtpriority.add(taskInfo);
    }

    private static boolean removeFromRunningTaskMap(TreeMap<Integer, TreeSet<TaskInfo>> runningTasks, Object task, TaskInfo taskInfo) {
        int priority = taskInfo.priority.getPriority();
        Set tasksAtPriority = runningTasks.get(priority);
        if (tasksAtPriority == null) {
            return false;
        }
        boolean result = tasksAtPriority.remove(taskInfo);
        if (tasksAtPriority.isEmpty()) {
            runningTasks.remove(priority);
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    protected void schedulePendingTasks() throws InterruptedException {
        Ref downgradedTask = new Ref(null);
        this.writeLock.lock();
        try {
            if (LOG.isDebugEnabled()) {
                LOG.debug("ScheduleRun: {}", (Object)this.constructPendingTaskCountsLogMessage());
            }
            Iterator<Map.Entry<Priority, List<TaskInfo>>> pendingIterator = this.pendingTasks.entrySet().iterator();
            Resource totalResource = this.getTotalResources();
            while (pendingIterator.hasNext()) {
                Map.Entry<Priority, List<TaskInfo>> entry = pendingIterator.next();
                List<TaskInfo> taskListAtPriority = entry.getValue();
                Iterator<TaskInfo> taskIter = taskListAtPriority.iterator();
                boolean scheduledAllAtPriority = true;
                while (taskIter.hasNext()) {
                    Object[] potentialHosts;
                    TaskInfo taskInfo = taskIter.next();
                    if (taskInfo.getNumPreviousAssignAttempts() == 1) {
                        this.dagStats.registerDelayedAllocation();
                    }
                    taskInfo.triedAssigningTask();
                    ScheduleResult scheduleResult = this.scheduleTask(taskInfo, totalResource, (Ref<TaskInfo>)downgradedTask);
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("ScheduleResult for Task: {} = {}", (Object)taskInfo, (Object)scheduleResult);
                    }
                    if (scheduleResult == ScheduleResult.SCHEDULED) {
                        taskIter.remove();
                        continue;
                    }
                    if (scheduleResult == ScheduleResult.INADEQUATE_TOTAL_RESOURCES) {
                        LOG.info("Inadequate total resources before scheduling pending tasks. Signalling scheduler timeout monitor thread to start timer.");
                        this.startTimeoutMonitor();
                    }
                    if (scheduleResult == ScheduleResult.DELAYED_LOCALITY) {
                        this.maybeAddToDelayedTaskQueue(taskInfo);
                        potentialHosts = taskInfo.requestedHosts;
                        if (potentialHosts == null || potentialHosts.length == 0) {
                            potentialHosts = null;
                        }
                    } else {
                        potentialHosts = null;
                    }
                    if (potentialHosts != null) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Attempting to preempt on requested host for task={}, potentialHosts={}", (Object)taskInfo, (Object)Arrays.toString(potentialHosts));
                        }
                        boolean shouldPreempt = true;
                        for (Object host : potentialHosts) {
                            MutableInt pendingHostPreemptions = this.pendingPreemptionsPerHost.get(host);
                            if (pendingHostPreemptions == null || pendingHostPreemptions.intValue() <= 0) continue;
                            shouldPreempt = false;
                            LOG.debug("Not preempting for task={}. Found an existing preemption request on host={}, pendingPreemptionCount={}", new Object[]{taskInfo.task, host, pendingHostPreemptions.intValue()});
                            break;
                        }
                        if (shouldPreempt) {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Attempting to preempt for {} on potential hosts={}. TotalPendingPreemptions={}", new Object[]{taskInfo.task, Arrays.toString(potentialHosts), this.pendingPreemptions.get()});
                            }
                            this.preemptTasks(entry.getKey().getPriority(), LlapTaskSchedulerService.vertexNum(taskInfo), 1, (String[])potentialHosts);
                        } else if (LOG.isDebugEnabled()) {
                            LOG.debug("Not preempting for {} on potential hosts={}. An existing preemption request exists", taskInfo.task, (Object)Arrays.toString(potentialHosts));
                        }
                    } else {
                        LOG.debug("Attempting to preempt on any host for task={}, pendingPreemptions={}", taskInfo.task, (Object)this.pendingPreemptions.get());
                        if (this.pendingPreemptions.get() == 0) {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Attempting to preempt for task={}, priority={} on any available host", taskInfo.task, (Object)taskInfo.priority);
                            }
                            this.preemptTasks(entry.getKey().getPriority(), LlapTaskSchedulerService.vertexNum(taskInfo), 1, null);
                        } else if (LOG.isDebugEnabled()) {
                            LOG.debug("Skipping preemption since there are {} pending preemption request. For task={}", (Object)this.pendingPreemptions.get(), (Object)taskInfo);
                        }
                    }
                    scheduledAllAtPriority = false;
                    if (scheduleResult == ScheduleResult.DELAYED_LOCALITY) continue;
                    break;
                }
                if (taskListAtPriority.isEmpty()) {
                    pendingIterator.remove();
                }
                if (scheduledAllAtPriority) continue;
                LOG.debug("Unable to schedule all requests at priority={}. Skipping subsequent priority levels", (Object)entry.getKey());
                break;
            }
        }
        finally {
            this.writeLock.unlock();
        }
        if (downgradedTask.value != null) {
            WM_LOG.info("Downgrading " + ((TaskInfo)downgradedTask.value).attemptId);
            this.checkAndSendGuaranteedStateUpdate((TaskInfo)downgradedTask.value);
        }
    }

    private static int vertexNum(TaskInfo taskInfo) {
        return taskInfo.getAttemptId().getTaskID().getVertexID().getId();
    }

    private String constructPendingTaskCountsLogMessage() {
        StringBuilder sb = new StringBuilder();
        int totalCount = 0;
        sb.append("numPriorityLevels=").append(this.pendingTasks.size()).append(". ");
        for (Map.Entry<Priority, List<TaskInfo>> entry : this.pendingTasks.entrySet()) {
            int count = entry.getValue() == null ? 0 : entry.getValue().size();
            sb.append("[p=").append(entry.getKey().toString()).append(",c=").append(count).append("]");
            totalCount += count;
        }
        sb.append(". totalPendingTasks=").append(totalCount);
        sb.append(". delayedTaskQueueSize=").append(this.delayedTaskQueue.size());
        return sb.toString();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ScheduleResult scheduleTask(TaskInfo taskInfo, Resource totalResource, Ref<TaskInfo> downgradedTask) {
        Preconditions.checkNotNull((Object)totalResource, (Object)"totalResource can not be null");
        if (totalResource.getMemory() <= 0) {
            return LlapTaskSchedulerService.SELECT_HOST_RESULT_INADEQUATE_TOTAL_CAPACITY.scheduleResult;
        }
        SelectHostResult selectHostResult = this.selectHost(taskInfo);
        if (selectHostResult.scheduleResult != ScheduleResult.SCHEDULED) {
            return selectHostResult.scheduleResult;
        }
        if (this.unusedGuaranteed > 0) {
            boolean wasGuaranteed = false;
            TaskInfo taskInfo2 = taskInfo;
            synchronized (taskInfo2) {
                assert (!taskInfo.isPendingUpdate);
                wasGuaranteed = taskInfo.isGuaranteed;
                taskInfo.isGuaranteed = true;
            }
            if (wasGuaranteed) {
                WM_LOG.error("The task had guaranteed flag set before scheduling: " + taskInfo);
            } else {
                int result = --this.unusedGuaranteed;
                if (this.metrics != null) {
                    this.metrics.setWmUnusedGuaranteed(result);
                }
                WM_LOG.info("Using an unused duck for " + taskInfo.attemptId + "; unused is now " + result);
            }
        } else if (this.findGuaranteedToReallocate(taskInfo, downgradedTask)) {
            TaskInfo wasGuaranteed = taskInfo;
            synchronized (wasGuaranteed) {
                assert (!taskInfo.isPendingUpdate);
                taskInfo.isGuaranteed = true;
            }
        }
        NodeInfo nodeInfo = selectHostResult.nodeInfo;
        Container container = this.containerFactory.createContainer(nodeInfo.getResourcePerExecutor(), taskInfo.priority, nodeInfo.getHost(), nodeInfo.getRpcPort(), nodeInfo.getServiceAddress());
        this.writeLock.lock();
        try {
            this.assignedTaskCounter.incrementAndGet();
            LOG.info("Assigned #{}, task={} on node={}, to container={}", new Object[]{this.assignedTaskCounter.get(), taskInfo, nodeInfo.toShortString(), container.getId()});
            this.dagStats.registerTaskAllocated(taskInfo.requestedHosts, taskInfo.requestedRacks, nodeInfo.getHost());
            taskInfo.setAssignmentInfo(nodeInfo, container.getId(), this.clock.getTime());
            this.registerRunningTask(taskInfo);
            nodeInfo.registerTaskScheduled();
        }
        finally {
            this.writeLock.unlock();
        }
        this.getContext().taskAllocated(taskInfo.task, taskInfo.clientCookie, container);
        return selectHostResult.scheduleResult;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void preemptTasks(int forPriority, int forVertex, int numTasksToPreempt, String[] potentialHosts) {
        Set<String> preemptHosts = null;
        this.writeLock.lock();
        List<TaskInfo> preemptedTaskList = null;
        try {
            preemptedTaskList = this.preemptTasksFromMap(this.speculativeTasks, forPriority, forVertex, numTasksToPreempt, potentialHosts, preemptHosts, preemptedTaskList);
            if (preemptedTaskList != null) {
                numTasksToPreempt -= preemptedTaskList.size();
            }
            if (numTasksToPreempt > 0) {
                preemptedTaskList = this.preemptTasksFromMap(this.guaranteedTasks, forPriority, forVertex, numTasksToPreempt, potentialHosts, preemptHosts, preemptedTaskList);
            }
        }
        finally {
            this.writeLock.unlock();
        }
        if (preemptedTaskList != null) {
            for (TaskInfo taskInfo : preemptedTaskList) {
                LOG.info("Preempting task {}", (Object)taskInfo);
                this.getContext().preemptContainer(taskInfo.containerId);
            }
        }
    }

    private List<TaskInfo> preemptTasksFromMap(TreeMap<Integer, TreeSet<TaskInfo>> runningTasks, int forPriority, int forVertex, int numTasksToPreempt, String[] potentialHosts, Set<String> preemptHosts, List<TaskInfo> preemptedTaskList) {
        NavigableMap<Integer, TreeSet<TaskInfo>> orderedMap = runningTasks.descendingMap();
        Iterator iterator = orderedMap.entrySet().iterator();
        int preemptedCount = 0;
        while (iterator.hasNext() && preemptedCount < numTasksToPreempt) {
            Map.Entry entryAtPriority = iterator.next();
            if ((Integer)entryAtPriority.getKey() > forPriority) {
                if (potentialHosts != null && preemptHosts == null) {
                    preemptHosts = Sets.newHashSet((Object[])potentialHosts);
                }
                Iterator taskInfoIterator = ((TreeSet)entryAtPriority.getValue()).iterator();
                while (taskInfoIterator.hasNext() && preemptedCount < numTasksToPreempt) {
                    TaskInfo taskInfo = (TaskInfo)taskInfoIterator.next();
                    if (preemptHosts != null && !preemptHosts.contains(taskInfo.assignedNode.getHost())) continue;
                    Map<Integer, Set<Integer>> depInfo = this.getDependencyInfo(taskInfo.attemptId.getTaskID().getVertexID().getDAGId());
                    Set<Integer> vertexDepInfo = null;
                    if (depInfo != null) {
                        vertexDepInfo = depInfo.get(forVertex);
                    }
                    if (depInfo != null && vertexDepInfo == null) {
                        LOG.warn("Cannot find info for " + forVertex + " " + depInfo);
                    }
                    if (vertexDepInfo != null && !vertexDepInfo.contains(LlapTaskSchedulerService.vertexNum(taskInfo))) continue;
                    ++preemptedCount;
                    LOG.info("preempting {} for task at priority {} with potentialHosts={}", new Object[]{taskInfo, forPriority, potentialHosts == null ? "" : Arrays.toString(potentialHosts)});
                    taskInfo.setPreemptedInfo(this.clock.getTime());
                    if (preemptedTaskList == null) {
                        preemptedTaskList = new LinkedList<TaskInfo>();
                    }
                    this.dagStats.registerTaskPreempted(taskInfo.assignedNode.getHost());
                    preemptedTaskList.add(taskInfo);
                    this.registerPendingPreemption(taskInfo.assignedNode.getHost());
                    taskInfoIterator.remove();
                }
                if (!((TreeSet)entryAtPriority.getValue()).isEmpty()) continue;
                iterator.remove();
                continue;
            }
            LOG.debug("No tasks qualify as killable to schedule tasks at priority {}. Current priority={}", (Object)forPriority, entryAtPriority.getKey());
            break;
        }
        return preemptedTaskList;
    }

    private int distributeGuaranteed(int count, TaskInfo failedUpdate, List<TaskInfo> toUpdate) {
        WM_LOG.info("Distributing " + count + " among " + this.speculativeTasks.size() + " levels" + (failedUpdate == null ? "" : "; on failure"));
        Iterator<Map.Entry<Integer, TreeSet<TaskInfo>>> iterator = this.speculativeTasks.entrySet().iterator();
        int remainingCount = count;
        while (remainingCount > 0 && iterator.hasNext()) {
            remainingCount = this.handleUpdateForSinglePriorityLevel(remainingCount, iterator, failedUpdate, toUpdate, true);
        }
        return count - remainingCount;
    }

    private int revokeGuaranteed(int count, TaskInfo failedUpdate, List<TaskInfo> toUpdate) {
        WM_LOG.info("Revoking " + count + " from " + this.guaranteedTasks.size() + " levels" + (failedUpdate == null ? "" : "; on failure"));
        int remainingCount = count;
        Iterator<Map.Entry<Integer, TreeSet<TaskInfo>>> iterator = this.guaranteedTasks.descendingMap().entrySet().iterator();
        while (remainingCount > 0 && iterator.hasNext()) {
            remainingCount = this.handleUpdateForSinglePriorityLevel(remainingCount, iterator, failedUpdate, toUpdate, false);
        }
        return count - remainingCount;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean findGuaranteedToReallocate(TaskInfo candidate, Ref<TaskInfo> toUpdate) {
        Iterator iterator = this.guaranteedTasks.descendingMap().entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry entry = iterator.next();
            int priority = (Integer)entry.getKey();
            TreeSet atPriority = (TreeSet)entry.getValue();
            if (priority <= candidate.priority.getPriority()) {
                return false;
            }
            TaskInfo taskInfo = (TaskInfo)atPriority.pollLast();
            if (taskInfo != null) {
                TaskInfo taskInfo2 = taskInfo;
                synchronized (taskInfo2) {
                    assert (taskInfo.isGuaranteed.booleanValue());
                    taskInfo.isGuaranteed = false;
                    if (!taskInfo.isPendingUpdate) {
                        this.setUpdateStartedUnderTiLock(taskInfo);
                        toUpdate.value = taskInfo;
                    }
                }
                LlapTaskSchedulerService.addToRunningTasksMap(this.speculativeTasks, taskInfo);
            }
            if (atPriority.isEmpty()) {
                iterator.remove();
            }
            if (taskInfo == null) continue;
            return true;
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int handleUpdateForSinglePriorityLevel(int remainingCount, Iterator<Map.Entry<Integer, TreeSet<TaskInfo>>> iterator, TaskInfo failedUpdate, List<TaskInfo> toUpdate, boolean newValue) {
        TreeMap<Integer, TreeSet<TaskInfo>> fromMap;
        Map.Entry<Integer, TreeSet<TaskInfo>> entry = iterator.next();
        TreeSet<TaskInfo> atPriority = entry.getValue();
        WM_LOG.info("At priority " + entry.getKey() + " observing " + entry.getValue().size());
        Iterator<TaskInfo> atPriorityIter = newValue ? atPriority.iterator() : atPriority.descendingIterator();
        TreeMap<Integer, TreeSet<TaskInfo>> toMap = newValue ? this.guaranteedTasks : this.speculativeTasks;
        TreeMap<Integer, TreeSet<TaskInfo>> treeMap = fromMap = newValue ? this.speculativeTasks : this.guaranteedTasks;
        while (atPriorityIter.hasNext() && remainingCount > 0) {
            TaskInfo taskInfo = atPriorityIter.next();
            if (taskInfo == failedUpdate) continue;
            atPriorityIter.remove();
            TaskInfo taskInfo2 = taskInfo;
            synchronized (taskInfo2) {
                assert (taskInfo.isGuaranteed != newValue);
                taskInfo.isGuaranteed = newValue;
                if (!taskInfo.isPendingUpdate) {
                    this.setUpdateStartedUnderTiLock(taskInfo);
                    WM_LOG.info("Adding " + taskInfo.attemptId + " to update");
                    toUpdate.add(taskInfo);
                } else {
                    WM_LOG.info("Not adding " + taskInfo.attemptId + " to update - already pending");
                }
            }
            LlapTaskSchedulerService.addToRunningTasksMap(toMap, taskInfo);
            --remainingCount;
        }
        if (atPriority.isEmpty()) {
            iterator.remove();
        }
        if (failedUpdate != null && entry.getKey().intValue() == failedUpdate.priority.getPriority() && remainingCount > 0) {
            LlapTaskSchedulerService.removeFromRunningTaskMap(fromMap, failedUpdate.task, failedUpdate);
            TaskInfo taskInfo = failedUpdate;
            synchronized (taskInfo) {
                assert (failedUpdate.isGuaranteed != newValue);
                failedUpdate.isGuaranteed = newValue;
                this.setUpdateStartedUnderTiLock(failedUpdate);
            }
            WM_LOG.info("Adding failed " + failedUpdate.attemptId + " to update");
            toUpdate.add(failedUpdate);
            LlapTaskSchedulerService.addToRunningTasksMap(toMap, failedUpdate);
            --remainingCount;
        }
        return remainingCount;
    }

    private void registerPendingPreemption(String host) {
        this.writeLock.lock();
        try {
            MutableInt val;
            this.pendingPreemptions.incrementAndGet();
            if (this.metrics != null) {
                this.metrics.incrPendingPreemptionTasksCount();
            }
            if ((val = this.pendingPreemptionsPerHost.get(host)) == null) {
                val = new MutableInt(0);
                this.pendingPreemptionsPerHost.put(host, val);
            }
            val.increment();
        }
        finally {
            this.writeLock.unlock();
        }
    }

    private void unregisterPendingPreemption(String host) {
        this.writeLock.lock();
        try {
            this.pendingPreemptions.decrementAndGet();
            if (this.metrics != null) {
                this.metrics.decrPendingPreemptionTasksCount();
            }
            MutableInt val = this.pendingPreemptionsPerHost.get(host);
            Preconditions.checkNotNull((Object)val);
            val.decrement();
        }
        finally {
            this.writeLock.unlock();
        }
    }

    private void maybeAddToDelayedTaskQueue(TaskInfo taskInfo) {
        if (!taskInfo.shouldForceLocality() && !taskInfo.isInDelayedQueue()) {
            taskInfo.setInDelayedQueue(true);
            this.delayedTaskQueue.add(taskInfo);
        }
    }

    private void setDagRunning(boolean running) {
        this.dagRunning = running;
        if (this.metrics != null) {
            this.metrics.setDagRunning(running ? 1 : 0);
        }
    }

    @VisibleForTesting
    DelayedTaskSchedulerCallable createDelayedTaskSchedulerCallable() {
        return new DelayedTaskSchedulerCallable();
    }

    private void trySchedulingPendingTasks() {
        this.scheduleLock.lock();
        try {
            this.pendingScheduleInvocations.set(true);
            this.scheduleCondition.signal();
        }
        finally {
            this.scheduleLock.unlock();
        }
    }

    public void updateQuery(LlapPluginProtocolProtos.UpdateQueryRequestProto request) {
        if (request.hasGuaranteedTaskCount()) {
            this.updateGuaranteedCount(request.getGuaranteedTaskCount());
        }
    }

    void setTaskCommunicator(LlapTaskCommunicator communicator) {
        this.communicator = communicator;
    }

    protected void sendUpdateMessageAsync(TaskInfo ti, boolean newState) {
        WM_LOG.info("Sending message to " + ti.attemptId + ": " + newState);
        this.communicator.startUpdateGuaranteed(ti.attemptId, ti.assignedNode, newState, this.UPDATE_CALLBACK, ti);
    }

    @VisibleForTesting
    int getUnusedGuaranteedCount() {
        return this.unusedGuaranteed;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void taskInfoUpdated(TezTaskAttemptID attemptId, boolean isGuaranteed) {
        TaskInfo ti = null;
        this.writeLock.lock();
        try {
            ti = this.tasksById.get(attemptId);
            if (ti == null) {
                WM_LOG.warn("Unknown task from heartbeat " + attemptId);
                return;
            }
        }
        finally {
            this.writeLock.unlock();
        }
        boolean newState = false;
        TaskInfo taskInfo = ti;
        synchronized (taskInfo) {
            if (ti.isPendingUpdate) {
                return;
            }
            if (ti.isGuaranteed == null) {
                return;
            }
            if (ti.lastSetGuaranteed != null && ti.lastSetGuaranteed == isGuaranteed) {
                return;
            }
            ti.lastSetGuaranteed = isGuaranteed;
            if (isGuaranteed == ti.isGuaranteed) {
                return;
            }
            newState = ti.isGuaranteed;
            this.setUpdateStartedUnderTiLock(ti);
        }
        WM_LOG.info("Sending an update based on inconsistent state from heartbeat for " + attemptId + ", " + newState);
        this.sendUpdateMessageAsync(ti, newState);
    }

    @VisibleForTesting
    static final class LocalityDelayConf {
        private final long nodeLocalityDelay;

        public LocalityDelayConf(long nodeLocalityDelay) {
            this.nodeLocalityDelay = nodeLocalityDelay;
        }

        public long getNodeLocalityDelay() {
            return this.nodeLocalityDelay;
        }

        public String toString() {
            return "LocalityDelayConf{nodeLocalityDelay=" + this.nodeLocalityDelay + '}';
        }
    }

    private static final class NodeBlacklistConf {
        private final long minDelay;
        private final long maxDelay;
        private final float backoffFactor;

        public NodeBlacklistConf(long minDelay, long maxDelay, float backoffFactor) {
            this.minDelay = minDelay;
            this.maxDelay = maxDelay;
            this.backoffFactor = backoffFactor;
        }

        public String toString() {
            return "NodeBlacklistConf{minDelay=" + this.minDelay + ", maxDelay=" + this.maxDelay + ", backoffFactor=" + this.backoffFactor + '}';
        }
    }

    private static class SelectHostResult {
        final NodeInfo nodeInfo;
        final ScheduleResult scheduleResult;

        SelectHostResult(NodeInfo nodeInfo) {
            this.nodeInfo = nodeInfo;
            this.scheduleResult = ScheduleResult.SCHEDULED;
        }

        SelectHostResult(ScheduleResult scheduleResult) {
            this.nodeInfo = null;
            this.scheduleResult = scheduleResult;
        }
    }

    private static class TaskStartComparator
    implements Comparator<TaskInfo> {
        private TaskStartComparator() {
        }

        @Override
        public int compare(TaskInfo o1, TaskInfo o2) {
            if (o1.startTime > o2.startTime) {
                return -1;
            }
            if (o1.startTime < o2.startTime) {
                return 1;
            }
            if (o1.uniqueId > o2.uniqueId) {
                return -1;
            }
            if (o1.uniqueId < o2.uniqueId) {
                return 1;
            }
            return 0;
        }
    }

    @VisibleForTesting
    static class TaskInfo
    implements Delayed {
        static final AtomicLong ID_GEN = new AtomicLong(0L);
        final long uniqueId;
        final LocalityDelayConf localityDelayConf;
        final Clock clock;
        final Object task;
        final Object clientCookie;
        final Priority priority;
        final Resource capability;
        final String[] requestedHosts;
        final String[] requestedRacks;
        final long requestTime;
        final long localityDelayTimeout;
        long startTime;
        long preemptTime;
        ContainerId containerId;
        NodeInfo assignedNode;
        private State state = State.PENDING;
        boolean inDelayedQueue = false;
        private final TezTaskAttemptID attemptId;
        private Boolean isGuaranteed = false;
        private Boolean lastSetGuaranteed = null;
        private Boolean requestedValue = null;
        private boolean isPendingUpdate = false;
        private int numAssignAttempts = 0;

        public TaskInfo(LocalityDelayConf localityDelayConf, Clock clock, Object task, Object clientCookie, Priority priority, Resource capability, String[] hosts, String[] racks, long requestTime, TezTaskAttemptID id) {
            this.localityDelayConf = localityDelayConf;
            this.clock = clock;
            this.task = task;
            this.clientCookie = clientCookie;
            this.priority = priority;
            this.capability = capability;
            this.requestedHosts = hosts;
            this.requestedRacks = racks;
            this.requestTime = requestTime;
            this.localityDelayTimeout = localityDelayConf.getNodeLocalityDelay() == -1L ? Long.MAX_VALUE : (localityDelayConf.getNodeLocalityDelay() == 0L ? 0L : requestTime + localityDelayConf.getNodeLocalityDelay());
            this.uniqueId = ID_GEN.getAndIncrement();
            this.attemptId = id;
        }

        synchronized void setAssignmentInfo(NodeInfo nodeInfo, ContainerId containerId, long startTime) {
            this.assignedNode = nodeInfo;
            this.containerId = containerId;
            this.startTime = startTime;
            this.state = State.ASSIGNED;
        }

        synchronized void setPreemptedInfo(long preemptTime) {
            this.state = State.PREEMPTED;
            this.preemptTime = preemptTime;
        }

        synchronized void setInDelayedQueue(boolean val) {
            this.inDelayedQueue = val;
        }

        synchronized void triedAssigningTask() {
            ++this.numAssignAttempts;
        }

        synchronized int getNumPreviousAssignAttempts() {
            return this.numAssignAttempts;
        }

        synchronized State getState() {
            return this.state;
        }

        synchronized boolean isInDelayedQueue() {
            return this.inDelayedQueue;
        }

        boolean shouldDelayForLocality(long schedulerAttemptTime) {
            return this.localityDelayTimeout > schedulerAttemptTime;
        }

        boolean shouldForceLocality() {
            return this.localityDelayTimeout == Long.MAX_VALUE;
        }

        long getLocalityDelayTimeout() {
            return this.localityDelayTimeout;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            TaskInfo taskInfo = (TaskInfo)o;
            if (this.uniqueId != taskInfo.uniqueId) {
                return false;
            }
            return this.task.equals(taskInfo.task);
        }

        public int hashCode() {
            int result = (int)(this.uniqueId ^ this.uniqueId >>> 32);
            result = 31 * result + this.task.hashCode();
            return result;
        }

        public String toString() {
            return "TaskInfo{task=" + this.task + ", priority=" + this.priority + ", startTime=" + this.startTime + ", containerId=" + this.containerId + (this.assignedNode != null ? "assignedNode=" + this.assignedNode.toShortString() : "") + ", uniqueId=" + this.uniqueId + ", localityDelayTimeout=" + this.localityDelayTimeout + '}';
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(this.localityDelayTimeout - this.clock.getTime(), TimeUnit.MILLISECONDS);
        }

        @Override
        public int compareTo(Delayed o) {
            TaskInfo other = (TaskInfo)o;
            if (other.localityDelayTimeout > this.localityDelayTimeout) {
                return -1;
            }
            if (other.localityDelayTimeout < this.localityDelayTimeout) {
                return 1;
            }
            return 0;
        }

        @VisibleForTesting
        boolean isGuaranteed() {
            return this.isGuaranteed;
        }

        @VisibleForTesting
        boolean getLastSetGuaranteed() {
            return this.lastSetGuaranteed;
        }

        @VisibleForTesting
        boolean isUpdateInProgress() {
            return this.isPendingUpdate;
        }

        TezTaskAttemptID getAttemptId() {
            return this.attemptId;
        }

        static enum State {
            PENDING,
            ASSIGNED,
            PREEMPTED;

        }
    }

    @VisibleForTesting
    static class StatsPerDag {
        int numRequestedAllocations = 0;
        int numRequestsWithLocation = 0;
        int numRequestsWithoutLocation = 0;
        int numTotalAllocations = 0;
        int numLocalAllocations = 0;
        int numNonLocalAllocations = 0;
        int numAllocationsNoLocalityRequest = 0;
        int numRejectedTasks = 0;
        int numCommFailures = 0;
        int numDelayedAllocations = 0;
        int numPreemptedTasks = 0;
        Map<String, AtomicInteger> localityBasedNumAllocationsPerHost = new HashMap<String, AtomicInteger>();
        Map<String, AtomicInteger> numAllocationsPerHost = new HashMap<String, AtomicInteger>();

        StatsPerDag() {
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("NumPreemptedTasks=").append(this.numPreemptedTasks).append(", ");
            sb.append("NumRequestedAllocations=").append(this.numRequestedAllocations).append(", ");
            sb.append("NumRequestsWithlocation=").append(this.numRequestsWithLocation).append(", ");
            sb.append("NumLocalAllocations=").append(this.numLocalAllocations).append(",");
            sb.append("NumNonLocalAllocations=").append(this.numNonLocalAllocations).append(",");
            sb.append("NumTotalAllocations=").append(this.numTotalAllocations).append(",");
            sb.append("NumRequestsWithoutLocation=").append(this.numRequestsWithoutLocation).append(", ");
            sb.append("NumRejectedTasks=").append(this.numRejectedTasks).append(", ");
            sb.append("NumCommFailures=").append(this.numCommFailures).append(", ");
            sb.append("NumDelayedAllocations=").append(this.numDelayedAllocations).append(", ");
            sb.append("LocalityBasedAllocationsPerHost=").append(this.localityBasedNumAllocationsPerHost).append(", ");
            sb.append("NumAllocationsPerHost=").append(this.numAllocationsPerHost);
            return sb.toString();
        }

        void registerTaskRequest(String[] requestedHosts, String[] requestedRacks) {
            ++this.numRequestedAllocations;
            if (requestedHosts != null && requestedHosts.length != 0) {
                ++this.numRequestsWithLocation;
            } else {
                ++this.numRequestsWithoutLocation;
            }
        }

        void registerTaskAllocated(String[] requestedHosts, String[] requestedRacks, String allocatedHost) {
            if (requestedHosts != null && requestedHosts.length != 0) {
                HashSet<String> requestedHostSet = new HashSet<String>(Arrays.asList(requestedHosts));
                if (requestedHostSet.contains(allocatedHost)) {
                    ++this.numLocalAllocations;
                    this._registerAllocationInHostMap(allocatedHost, this.localityBasedNumAllocationsPerHost);
                } else {
                    ++this.numNonLocalAllocations;
                }
            } else {
                ++this.numAllocationsNoLocalityRequest;
            }
            ++this.numTotalAllocations;
            this._registerAllocationInHostMap(allocatedHost, this.numAllocationsPerHost);
        }

        void registerTaskPreempted(String host) {
            ++this.numPreemptedTasks;
        }

        void registerCommFailure(String host) {
            ++this.numCommFailures;
        }

        void registerTaskRejected(String host) {
            ++this.numRejectedTasks;
        }

        void registerDelayedAllocation() {
            ++this.numDelayedAllocations;
        }

        private void _registerAllocationInHostMap(String host, Map<String, AtomicInteger> hostMap) {
            AtomicInteger val = hostMap.get(host);
            if (val == null) {
                val = new AtomicInteger(0);
                hostMap.put(host, val);
            }
            val.incrementAndGet();
        }
    }

    @VisibleForTesting
    static class NodeInfo
    implements Delayed {
        private final NodeBlacklistConf blacklistConf;
        LlapServiceInstance serviceInstance;
        private final Clock clock;
        long expireTimeMillis = -1L;
        private long numSuccessfulTasks = 0L;
        private long numSuccessfulTasksAtLastBlacklist = -1L;
        float cumulativeBackoffFactor = 1.0f;
        private boolean hadCommFailure = false;
        private boolean disabled = false;
        private int numScheduledTasks = 0;
        private int numSchedulableTasks;
        private final LlapTaskSchedulerMetrics metrics;
        private Resource resourcePerExecutor;
        private String shortStringBase;

        NodeInfo(LlapServiceInstance serviceInstance, NodeBlacklistConf blacklistConf, Clock clock, int numSchedulableTasksConf, LlapTaskSchedulerMetrics metrics) {
            Preconditions.checkArgument((numSchedulableTasksConf >= -1 ? 1 : 0) != 0, (Object)"NumSchedulableTasks must be >=-1");
            this.blacklistConf = blacklistConf;
            this.clock = clock;
            this.metrics = metrics;
            this.updateLlapServiceInstance(serviceInstance, numSchedulableTasksConf);
        }

        String getNodeIdentity() {
            return this.serviceInstance.getWorkerIdentity();
        }

        String getHost() {
            return this.serviceInstance.getHost();
        }

        int getRpcPort() {
            return this.serviceInstance.getRpcPort();
        }

        String getServiceAddress() {
            return this.serviceInstance.getServicesAddress();
        }

        public Resource getResourcePerExecutor() {
            return this.resourcePerExecutor;
        }

        void updateLlapServiceInstance(LlapServiceInstance serviceInstance, int numSchedulableTasksConf) {
            this.serviceInstance = serviceInstance;
            int numVcores = serviceInstance.getResource().getVirtualCores();
            int memoryPerInstance = serviceInstance.getResource().getMemory();
            int memoryPerExecutor = (int)((double)memoryPerInstance / (double)numVcores);
            this.resourcePerExecutor = Resource.newInstance((int)memoryPerExecutor, (int)1);
            int oldNumSchedulableTasks = this.numSchedulableTasks;
            if (numSchedulableTasksConf == 0) {
                int pendingQueueuCapacity = 0;
                String pendingQueueCapacityString = (String)serviceInstance.getProperties().get("hive.llap.daemon.task.scheduler.enabled.wait.queue.size");
                if (pendingQueueCapacityString == null) {
                    pendingQueueCapacityString = (String)serviceInstance.getProperties().get(HiveConf.ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE.varname);
                }
                LOG.info("Setting up node: {} with available capacity={}, pendingQueueSize={}, memory={}", new Object[]{serviceInstance, serviceInstance.getResource().getVirtualCores(), pendingQueueCapacityString, serviceInstance.getResource().getMemory()});
                if (pendingQueueCapacityString != null) {
                    pendingQueueuCapacity = Integer.parseInt(pendingQueueCapacityString);
                }
                this.numSchedulableTasks = numVcores + pendingQueueuCapacity;
            } else {
                this.numSchedulableTasks = numSchedulableTasksConf;
                LOG.info("Setting up node: " + serviceInstance + " with schedulableCapacity=" + this.numSchedulableTasks);
            }
            if (this.metrics != null) {
                this.metrics.incrSchedulableTasksCount(this.numSchedulableTasks - oldNumSchedulableTasks);
            }
            this.shortStringBase = this.setupShortStringBase();
        }

        void resetExpireInformation() {
            this.expireTimeMillis = -1L;
            this.hadCommFailure = false;
        }

        void enableNode() {
            this.resetExpireInformation();
            this.disabled = false;
        }

        void disableNode(boolean commFailure) {
            long duration = this.blacklistConf.minDelay;
            long currentTime = this.clock.getTime();
            this.hadCommFailure = commFailure;
            this.disabled = true;
            this.cumulativeBackoffFactor = this.numSuccessfulTasksAtLastBlacklist == this.numSuccessfulTasks ? (this.cumulativeBackoffFactor *= this.blacklistConf.backoffFactor) : 1.0f;
            long delayTime = (long)((float)duration * this.cumulativeBackoffFactor);
            if (delayTime > this.blacklistConf.maxDelay) {
                delayTime = this.blacklistConf.maxDelay;
            }
            if (LOG.isInfoEnabled()) {
                LOG.info("Disabling instance {} for {} milli-seconds. commFailure={}", new Object[]{this.toShortString(), delayTime, commFailure});
            }
            this.expireTimeMillis = currentTime + delayTime;
            this.numSuccessfulTasksAtLastBlacklist = this.numSuccessfulTasks;
        }

        void registerTaskScheduled() {
            ++this.numScheduledTasks;
            if (this.metrics != null) {
                this.metrics.incrRunningTasksCount();
                this.metrics.decrSchedulableTasksCount();
            }
        }

        void registerTaskSuccess() {
            ++this.numSuccessfulTasks;
            --this.numScheduledTasks;
            if (this.metrics != null) {
                this.metrics.incrSuccessfulTasksCount();
                this.metrics.decrRunningTasksCount();
                this.metrics.incrSchedulableTasksCount();
            }
        }

        void registerUnsuccessfulTaskEnd(boolean wasPreempted) {
            --this.numScheduledTasks;
            if (this.metrics != null) {
                this.metrics.decrRunningTasksCount();
                this.metrics.incrSchedulableTasksCount();
            }
            if (wasPreempted && this.metrics != null) {
                this.metrics.incrPreemptedTasksCount();
            }
        }

        long getEnableTime() {
            return this.expireTimeMillis;
        }

        public boolean isDisabled() {
            return this.disabled;
        }

        boolean hadCommFailure() {
            return this.hadCommFailure;
        }

        boolean _canAccepInternal() {
            return !this.hadCommFailure && !this.disabled && (this.numSchedulableTasks == -1 || this.numSchedulableTasks - this.numScheduledTasks > 0);
        }

        boolean canAcceptTask() {
            boolean result = this._canAccepInternal();
            if (LOG.isTraceEnabled()) {
                LOG.trace(this.constructCanAcceptLogResult(result));
            }
            return result;
        }

        String constructCanAcceptLogResult(boolean result) {
            StringBuilder sb = new StringBuilder();
            sb.append("Node[").append(this.serviceInstance.getHost()).append(":").append(this.serviceInstance.getRpcPort()).append(", ").append(this.serviceInstance.getWorkerIdentity()).append("]: ").append("canAcceptTask=").append(result).append(", numScheduledTasks=").append(this.numScheduledTasks).append(", numSchedulableTasks=").append(this.numSchedulableTasks).append(", hadCommFailure=").append(this.hadCommFailure).append(", disabled=").append(this.disabled);
            return sb.toString();
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(this.expireTimeMillis - this.clock.getTime(), TimeUnit.MILLISECONDS);
        }

        @Override
        public int compareTo(Delayed o) {
            NodeInfo other = (NodeInfo)o;
            if (other.expireTimeMillis > this.expireTimeMillis) {
                return -1;
            }
            if (other.expireTimeMillis < this.expireTimeMillis) {
                return 1;
            }
            return 0;
        }

        private String setupShortStringBase() {
            return "{" + this.serviceInstance.getHost() + ":" + this.serviceInstance.getRpcPort() + ", id=" + this.getNodeIdentity();
        }

        public String toString() {
            return "NodeInfo{instance=" + this.serviceInstance + ", expireTimeMillis=" + this.expireTimeMillis + ", numSuccessfulTasks=" + this.numSuccessfulTasks + ", numSuccessfulTasksAtLastBlacklist=" + this.numSuccessfulTasksAtLastBlacklist + ", cumulativeBackoffFactor=" + this.cumulativeBackoffFactor + ", numSchedulableTasks=" + this.numSchedulableTasks + ", numScheduledTasks=" + this.numScheduledTasks + ", disabled=" + this.disabled + ", commFailures=" + this.hadCommFailure + '}';
        }

        private String toShortString() {
            StringBuilder sb = new StringBuilder();
            sb.append(", canAcceptTask=").append(this._canAccepInternal());
            sb.append(", st=").append(this.numScheduledTasks);
            sb.append(", ac=").append(this.numSchedulableTasks - this.numScheduledTasks);
            sb.append(", commF=").append(this.hadCommFailure);
            sb.append(", disabled=").append(this.disabled);
            sb.append("}");
            return this.shortStringBase + sb.toString();
        }
    }

    private class SchedulerCallable
    implements Callable<Void> {
        private AtomicBoolean isShutdown = new AtomicBoolean(false);

        private SchedulerCallable() {
        }

        @Override
        public Void call() throws Exception {
            while (!this.isShutdown.get() && !Thread.currentThread().isInterrupted()) {
                LlapTaskSchedulerService.this.scheduleLock.lock();
                try {
                    while (!LlapTaskSchedulerService.this.pendingScheduleInvocations.get()) {
                        LlapTaskSchedulerService.this.scheduleCondition.await();
                    }
                }
                catch (InterruptedException e) {
                    if (this.isShutdown.get()) {
                        LOG.info("Scheduler thread interrupted after shutdown");
                        break;
                    }
                    LOG.warn("Scheduler thread interrupted without being shutdown");
                    throw new RuntimeException("Scheduler thread interrupted without being shutdown", e);
                }
                finally {
                    LlapTaskSchedulerService.this.scheduleLock.unlock();
                }
                LlapTaskSchedulerService.this.pendingScheduleInvocations.set(false);
                try {
                    LlapTaskSchedulerService.this.schedulePendingTasks();
                }
                catch (InterruptedException ie) {
                    if (this.isShutdown.get()) {
                        return null;
                    }
                    LOG.error("Scheduler thread was interrupte without shutdown and will now exit", (Throwable)ie);
                    throw ie;
                }
                catch (Throwable t) {
                    LOG.error("Fatal error: scheduler thread has failed and will now exit", t);
                    throw t instanceof Exception ? (Exception)t : new Exception(t);
                }
            }
            return null;
        }

        public void shutdown() {
            this.isShutdown.set(true);
        }
    }

    private class SchedulerTimeoutMonitor
    implements Runnable {
        private final Logger LOG = LoggerFactory.getLogger(SchedulerTimeoutMonitor.class);

        private SchedulerTimeoutMonitor() {
        }

        @Override
        public void run() {
            this.LOG.info("Reporting SERVICE_UNAVAILABLE error as no instances are running");
            try {
                LlapTaskSchedulerService.this.getContext().reportError((ServicePluginError)ServicePluginErrorDefaults.SERVICE_UNAVAILABLE, "No LLAP Daemons are running", LlapTaskSchedulerService.this.getContext().getCurrentDagInfo());
            }
            catch (Exception e) {
                DagInfo currentDagInfo = LlapTaskSchedulerService.this.getContext().getCurrentDagInfo();
                this.LOG.error("Exception when reporting SERVICE_UNAVAILABLE error for dag: {}", (Object)(currentDagInfo == null ? "" : currentDagInfo.getName()), (Object)e);
            }
        }
    }

    private class NodeEnablerCallable
    implements Callable<Void> {
        private final AtomicBoolean isShutdown = new AtomicBoolean(false);
        private static final long POLL_TIMEOUT = 10000L;

        private NodeEnablerCallable() {
        }

        @Override
        public Void call() {
            while (!this.isShutdown.get() && !Thread.currentThread().isInterrupted()) {
                try {
                    NodeInfo nodeInfo = (NodeInfo)LlapTaskSchedulerService.this.disabledNodesQueue.poll(10000L, TimeUnit.MILLISECONDS);
                    if (nodeInfo == null) continue;
                    LlapTaskSchedulerService.this.reenableDisabledNode(nodeInfo);
                    LlapTaskSchedulerService.this.trySchedulingPendingTasks();
                }
                catch (InterruptedException e) {
                    if (this.isShutdown.get()) {
                        LOG.info("NodeEnabler thread interrupted after shutdown");
                        break;
                    }
                    LOG.warn("NodeEnabler thread interrupted without being shutdown");
                    throw new RuntimeException("NodeEnabler thread interrupted without being shutdown", e);
                }
            }
            return null;
        }

        public void shutdown() {
            this.isShutdown.set(true);
        }
    }

    @VisibleForTesting
    class DelayedTaskSchedulerCallable
    implements Callable<Void> {
        private final AtomicBoolean isShutdown = new AtomicBoolean(false);

        DelayedTaskSchedulerCallable() {
        }

        @Override
        public Void call() {
            while (!this.isShutdown.get() && !Thread.currentThread().isInterrupted()) {
                try {
                    TaskInfo taskInfo = this.getNextTask();
                    taskInfo.setInDelayedQueue(false);
                    this.processEvictedTask(taskInfo);
                }
                catch (InterruptedException e) {
                    if (this.isShutdown.get()) {
                        LOG.info("DelayedTaskScheduler thread interrupted after shutdown");
                        break;
                    }
                    LOG.warn("DelayedTaskScheduler thread interrupted before being shutdown");
                    throw new RuntimeException("DelayedTaskScheduler thread interrupted without being shutdown", e);
                }
            }
            return null;
        }

        public void shutdown() {
            this.isShutdown.set(true);
        }

        public TaskInfo getNextTask() throws InterruptedException {
            return (TaskInfo)LlapTaskSchedulerService.this.delayedTaskQueue.take();
        }

        public void processEvictedTask(TaskInfo taskInfo) {
            if (this.shouldScheduleTask(taskInfo)) {
                LlapTaskSchedulerService.this.trySchedulingPendingTasks();
            }
        }

        public boolean shouldScheduleTask(TaskInfo taskInfo) {
            return taskInfo.getState() == TaskInfo.State.PENDING;
        }
    }

    private static enum ScheduleResult {
        SCHEDULED,
        DELAYED_LOCALITY,
        DELAYED_RESOURCES,
        INADEQUATE_TOTAL_RESOURCES;

    }

    private class NodeStateChangeListener
    implements ServiceInstanceStateChangeListener<LlapServiceInstance> {
        private final Logger LOG = LoggerFactory.getLogger(NodeStateChangeListener.class);

        private NodeStateChangeListener() {
        }

        public void onCreate(LlapServiceInstance serviceInstance, int ephSeqVersion) {
            this.LOG.info("Added node with identity: {} as a result of registry callback", (Object)serviceInstance.getWorkerIdentity());
            LlapTaskSchedulerService.this.registerAndAddNode(new NodeInfo(serviceInstance, LlapTaskSchedulerService.this.nodeBlacklistConf, LlapTaskSchedulerService.this.clock, LlapTaskSchedulerService.this.numSchedulableTasksPerNode, LlapTaskSchedulerService.this.metrics), serviceInstance);
        }

        public void onUpdate(LlapServiceInstance serviceInstance, int ephSeqVersion) {
            NodeInfo nodeInfo = LlapTaskSchedulerService.this.instanceToNodeMap.get(serviceInstance.getWorkerIdentity());
            nodeInfo.updateLlapServiceInstance(serviceInstance, LlapTaskSchedulerService.this.numSchedulableTasksPerNode);
            this.LOG.info("Updated node with identity: {} as a result of registry callback", (Object)serviceInstance.getWorkerIdentity());
        }

        public void onRemove(LlapServiceInstance serviceInstance, int ephSeqVersion) {
            NodeReport nodeReport = LlapTaskSchedulerService.constructNodeReport(serviceInstance, false);
            this.LOG.info("Sending out nodeReport for onRemove: {}", (Object)nodeReport);
            LlapTaskSchedulerService.this.getContext().nodesUpdated(Collections.singletonList(nodeReport));
            LlapTaskSchedulerService.this.instanceToNodeMap.remove(serviceInstance.getWorkerIdentity());
            this.LOG.info("Removed node with identity: {} due to RegistryNotification. currentActiveInstances={}", (Object)serviceInstance.getWorkerIdentity(), (Object)LlapTaskSchedulerService.this.activeInstances.size());
            if (LlapTaskSchedulerService.this.metrics != null) {
                LlapTaskSchedulerService.this.metrics.setClusterNodeCount(LlapTaskSchedulerService.this.activeInstances.size());
            }
            if (LlapTaskSchedulerService.this.activeInstances.size() == 0) {
                this.LOG.info("No node found. Signalling scheduler timeout monitor thread to start timer.");
                LlapTaskSchedulerService.this.startTimeoutMonitor();
            }
        }
    }

    private final class RegisterDagCallback
    implements LlapTaskCommunicator.OperationCallback<LlapDaemonProtocolProtos.QueryIdentifierProto, Void> {
        private final LlapServiceInstance llapServiceInstance;
        private final NodeInfo nodeInfo;

        RegisterDagCallback(NodeInfo nodeInfo, LlapServiceInstance llapServiceInstance) {
            this.nodeInfo = nodeInfo;
            this.llapServiceInstance = llapServiceInstance;
        }

        @Override
        public void setDone(Void v, LlapDaemonProtocolProtos.QueryIdentifierProto result) {
            LOG.info("Dag with appId=" + result.getApplicationIdString() + " dagId=" + result.getDagIndex() + " registered successfully for node " + this.nodeInfo.getHost());
            LlapTaskSchedulerService.this.addNode(this.nodeInfo, this.llapServiceInstance);
        }

        @Override
        public void setError(Void v, Throwable t) {
            LOG.warn("Error registering dag for node " + this.nodeInfo.getHost(), t);
            LlapTaskSchedulerService.this.addNode(this.nodeInfo, this.llapServiceInstance);
        }
    }

    private final class UpdateOperationCallback
    implements LlapTaskCommunicator.OperationCallback<Boolean, TaskInfo> {
        private UpdateOperationCallback() {
        }

        @Override
        public void setDone(TaskInfo ctx, Boolean result) {
            LlapTaskSchedulerService.this.handleUpdateResult(ctx, result);
        }

        @Override
        public void setError(TaskInfo ctx, Throwable t) {
            LlapTaskSchedulerService.this.handleUpdateResult(ctx, false);
        }
    }
}

