/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.spi.collision.jobstealing;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJobContext;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.IgniteSpiAdapter;
import org.apache.ignite.spi.IgniteSpiConfiguration;
import org.apache.ignite.spi.IgniteSpiConsistencyChecked;
import org.apache.ignite.spi.IgniteSpiContext;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport;
import org.apache.ignite.spi.collision.CollisionContext;
import org.apache.ignite.spi.collision.CollisionExternalListener;
import org.apache.ignite.spi.collision.CollisionJobContext;
import org.apache.ignite.spi.collision.CollisionSpi;
import org.apache.ignite.spi.collision.jobstealing.JobStealingCollisionSpiMBean;
import org.apache.ignite.spi.collision.jobstealing.JobStealingDisabled;
import org.apache.ignite.spi.collision.jobstealing.JobStealingRequest;
import org.jsr166.ConcurrentHashMap8;
import org.jsr166.ConcurrentLinkedDeque8;

@IgniteSpiMultipleInstancesSupport(value=true)
@IgniteSpiConsistencyChecked(optional=true)
public class JobStealingCollisionSpi
extends IgniteSpiAdapter
implements CollisionSpi,
JobStealingCollisionSpiMBean {
    public static final int DFLT_MAX_STEALING_ATTEMPTS = 5;
    public static final int DFLT_ACTIVE_JOBS_THRESHOLD = 95;
    public static final long DFLT_MSG_EXPIRE_TIME = 1000L;
    public static final int DFLT_WAIT_JOBS_THRESHOLD = 0;
    public static final int DFLT_JOB_PRIORITY = 0;
    private static final String JOB_STEALING_COMM_TOPIC = "ignite.collision.job.stealing.topic";
    public static final String THIEF_NODE_ATTR = "ignite.collision.thief.node";
    public static final String WAIT_JOBS_THRESHOLD_NODE_ATTR = "ignite.collision.wait.jobs.threshold";
    public static final String ACTIVE_JOBS_THRESHOLD_NODE_ATTR = "ignite.collision.active.jobs.threshold";
    public static final String STEALING_ATTEMPT_COUNT_ATTR = "ignite.stealing.attempt.count";
    public static final String MAX_STEALING_ATTEMPT_ATTR = "ignite.stealing.max.attempts";
    public static final String MSG_EXPIRE_TIME_ATTR = "ignite.stealing.msg.expire.time";
    public static final String STEALING_PRIORITY_ATTR = "ignite.stealing.priority";
    @LoggerResource
    private IgniteLogger log;
    private volatile int activeJobsThreshold = 95;
    private volatile int waitJobsThreshold = 0;
    private volatile long msgExpireTime = 1000L;
    private volatile int maxStealingAttempts = 5;
    private volatile boolean isStealingEnabled = true;
    @GridToStringInclude
    private Map<String, ? extends Serializable> stealAttrs;
    private volatile int runningNum;
    private volatile int waitingNum;
    private volatile int heldNum;
    private final AtomicInteger totalStolenJobsNum = new AtomicInteger();
    private final ConcurrentMap<UUID, MessageInfo> sndMsgMap = new ConcurrentHashMap8<UUID, MessageInfo>();
    private final ConcurrentMap<UUID, MessageInfo> rcvMsgMap = new ConcurrentHashMap8<UUID, MessageInfo>();
    private final Queue<ClusterNode> nodeQueue = new ConcurrentLinkedDeque8<ClusterNode>();
    private CollisionExternalListener extLsnr;
    private GridLocalEventListener discoLsnr;
    private GridMessageListener msgLsnr;
    private final AtomicInteger stealReqs = new AtomicInteger();
    private Comparator<CollisionJobContext> cmp;

    @Override
    @IgniteSpiConfiguration(optional=true)
    public void setActiveJobsThreshold(int activeJobsThreshold) {
        A.ensure(activeJobsThreshold >= 0, "activeJobsThreshold >= 0");
        this.activeJobsThreshold = activeJobsThreshold;
    }

    @Override
    public int getActiveJobsThreshold() {
        return this.activeJobsThreshold;
    }

    @Override
    @IgniteSpiConfiguration(optional=true)
    public void setWaitJobsThreshold(int waitJobsThreshold) {
        A.ensure(waitJobsThreshold >= 0, "waitJobsThreshold >= 0");
        this.waitJobsThreshold = waitJobsThreshold;
    }

    @Override
    public int getWaitJobsThreshold() {
        return this.waitJobsThreshold;
    }

    @Override
    @IgniteSpiConfiguration(optional=true)
    public void setMessageExpireTime(long msgExpireTime) {
        A.ensure(msgExpireTime > 0L, "messageExpireTime > 0");
        this.msgExpireTime = msgExpireTime;
    }

    @Override
    public long getMessageExpireTime() {
        return this.msgExpireTime;
    }

    @Override
    @IgniteSpiConfiguration(optional=true)
    public void setStealingEnabled(boolean isStealingEnabled) {
        this.isStealingEnabled = isStealingEnabled;
    }

    @Override
    public boolean isStealingEnabled() {
        return this.isStealingEnabled;
    }

    @Override
    @IgniteSpiConfiguration(optional=true)
    public void setMaximumStealingAttempts(int maxStealingAttempts) {
        A.ensure(maxStealingAttempts > 0, "maxStealingAttempts > 0");
        this.maxStealingAttempts = maxStealingAttempts;
    }

    @Override
    public int getMaximumStealingAttempts() {
        return this.maxStealingAttempts;
    }

    @IgniteSpiConfiguration(optional=true)
    public void setStealingAttributes(Map<String, ? extends Serializable> stealAttrs) {
        this.stealAttrs = stealAttrs;
    }

    @Override
    public Map<String, ? extends Serializable> getStealingAttributes() {
        return this.stealAttrs;
    }

    @Override
    public int getCurrentRunningJobsNumber() {
        return this.runningNum;
    }

    @Override
    public int getCurrentHeldJobsNumber() {
        return this.heldNum;
    }

    @Override
    public int getCurrentWaitJobsNumber() {
        return this.waitingNum;
    }

    @Override
    public int getCurrentActiveJobsNumber() {
        return this.runningNum + this.heldNum;
    }

    @Override
    public int getTotalStolenJobsNumber() {
        return this.totalStolenJobsNum.get();
    }

    @Override
    public int getCurrentJobsToStealNumber() {
        return this.stealReqs.get();
    }

    @Override
    public Map<String, Object> getNodeAttributes() throws IgniteSpiException {
        HashMap<String, Object> res = new HashMap<String, Object>(4);
        res.put(this.createSpiAttributeName(WAIT_JOBS_THRESHOLD_NODE_ATTR), this.waitJobsThreshold);
        res.put(this.createSpiAttributeName(ACTIVE_JOBS_THRESHOLD_NODE_ATTR), this.activeJobsThreshold);
        res.put(this.createSpiAttributeName(MAX_STEALING_ATTEMPT_ATTR), this.maxStealingAttempts);
        res.put(this.createSpiAttributeName(MSG_EXPIRE_TIME_ATTR), this.msgExpireTime);
        return res;
    }

    @Override
    public void spiStart(String gridName) throws IgniteSpiException {
        this.assertParameter(this.activeJobsThreshold >= 0, "activeJobsThreshold >= 0");
        this.assertParameter(this.waitJobsThreshold >= 0, "waitJobsThreshold >= 0");
        this.assertParameter(this.msgExpireTime > 0L, "messageExpireTime > 0");
        this.assertParameter(this.maxStealingAttempts > 0, "maxStealingAttempts > 0");
        this.startStopwatch();
        if (this.log.isDebugEnabled()) {
            this.log.debug(this.configInfo("activeJobsThreshold", this.activeJobsThreshold));
            this.log.debug(this.configInfo("waitJobsThreshold", this.waitJobsThreshold));
            this.log.debug(this.configInfo("messageExpireTime", this.msgExpireTime));
            this.log.debug(this.configInfo("maxStealingAttempts", this.maxStealingAttempts));
        }
        this.registerMBean(gridName, this, JobStealingCollisionSpiMBean.class);
        if (this.log.isDebugEnabled()) {
            this.log.debug(this.startInfo());
        }
    }

    @Override
    public void spiStop() throws IgniteSpiException {
        this.unregisterMBean();
        if (this.log.isDebugEnabled()) {
            this.log.debug(this.stopInfo());
        }
    }

    @Override
    public void setExternalCollisionListener(CollisionExternalListener extLsnr) {
        this.extLsnr = extLsnr;
    }

    @Override
    protected void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException {
        this.discoLsnr = new GridLocalEventListener(){

            @Override
            public void onEvent(Event evt) {
                assert (evt instanceof DiscoveryEvent);
                DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
                UUID evtNodeId = discoEvt.eventNode().id();
                switch (discoEvt.type()) {
                    case 10: {
                        ClusterNode node = JobStealingCollisionSpi.this.getSpiContext().node(evtNodeId);
                        if (node == null) break;
                        JobStealingCollisionSpi.this.nodeQueue.offer(node);
                        JobStealingCollisionSpi.this.sndMsgMap.putIfAbsent(node.id(), new MessageInfo());
                        JobStealingCollisionSpi.this.rcvMsgMap.putIfAbsent(node.id(), new MessageInfo());
                        break;
                    }
                    case 11: 
                    case 12: {
                        Iterator iter = JobStealingCollisionSpi.this.nodeQueue.iterator();
                        while (iter.hasNext()) {
                            ClusterNode nextNode = (ClusterNode)iter.next();
                            if (!nextNode.id().equals(evtNodeId)) continue;
                            iter.remove();
                        }
                        JobStealingCollisionSpi.this.sndMsgMap.remove(evtNodeId);
                        JobStealingCollisionSpi.this.rcvMsgMap.remove(evtNodeId);
                        break;
                    }
                    default: {
                        assert (false) : "Unexpected event: " + evt;
                        break;
                    }
                }
            }
        };
        spiCtx.addLocalEventListener(this.discoLsnr, 12, 10, 11);
        Collection<ClusterNode> rmtNodes = spiCtx.remoteNodes();
        for (ClusterNode node : rmtNodes) {
            UUID id = node.id();
            if (spiCtx.node(id) == null) continue;
            this.sndMsgMap.putIfAbsent(id, new MessageInfo());
            this.rcvMsgMap.putIfAbsent(id, new MessageInfo());
            if (spiCtx.node(id) != null) continue;
            this.sndMsgMap.remove(id);
            this.rcvMsgMap.remove(id);
        }
        this.nodeQueue.addAll(rmtNodes);
        Iterator iter = this.nodeQueue.iterator();
        while (iter.hasNext()) {
            ClusterNode nextNode = (ClusterNode)iter.next();
            if (spiCtx.node(nextNode.id()) != null) continue;
            iter.remove();
        }
        this.msgLsnr = new GridMessageListener(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void onMessage(UUID nodeId, Object msg) {
                CollisionExternalListener tmp;
                int stealReqs0;
                MessageInfo info = (MessageInfo)JobStealingCollisionSpi.this.rcvMsgMap.get(nodeId);
                if (info == null) {
                    if (JobStealingCollisionSpi.this.log.isDebugEnabled()) {
                        JobStealingCollisionSpi.this.log.debug("Ignoring message steal request as discovery event has not yet been received for node: " + nodeId);
                    }
                    return;
                }
                MessageInfo messageInfo = info;
                synchronized (messageInfo) {
                    JobStealingRequest req = (JobStealingRequest)msg;
                    stealReqs0 = JobStealingCollisionSpi.this.stealReqs.addAndGet(req.delta() - info.jobsToSteal());
                    info.reset(req.delta());
                }
                if (JobStealingCollisionSpi.this.log.isDebugEnabled()) {
                    JobStealingCollisionSpi.this.log.debug("Received steal request [nodeId=" + nodeId + ", msg=" + msg + ", stealReqs=" + stealReqs0 + ']');
                }
                if ((tmp = JobStealingCollisionSpi.this.extLsnr) != null) {
                    tmp.onExternalCollision();
                }
            }
        };
        spiCtx.addMessageListener(this.msgLsnr, JOB_STEALING_COMM_TOPIC);
    }

    @Override
    public void onContextDestroyed0() {
        if (this.discoLsnr != null) {
            this.getSpiContext().removeLocalEventListener(this.discoLsnr);
        }
        if (this.msgLsnr != null) {
            this.getSpiContext().removeMessageListener(this.msgLsnr, JOB_STEALING_COMM_TOPIC);
        }
    }

    @Override
    public void onCollision(CollisionContext ctx) {
        assert (ctx != null);
        Collection<CollisionJobContext> activeJobs = ctx.activeJobs();
        Collection<CollisionJobContext> waitJobs = ctx.waitingJobs();
        this.heldNum = ctx.heldJobs().size();
        int rejected = this.checkBusy(waitJobs, activeJobs);
        this.totalStolenJobsNum.addAndGet(rejected);
        if (rejected > 0) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Total count of rejected jobs: " + rejected);
            }
            return;
        }
        if (this.isStealingEnabled) {
            this.checkIdle(waitJobs, activeJobs);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int checkBusy(Collection<CollisionJobContext> waitJobs, Collection<CollisionJobContext> activeJobs) {
        int activeSize = activeJobs.size();
        int waitSize = waitJobs.size();
        this.waitingNum = waitJobs.size();
        this.runningNum = activeSize;
        IgniteSpiContext ctx = this.getSpiContext();
        int activated = 0;
        int rejected = 0;
        Collection<CollisionJobContext> waitPriJobs = this.sortJobs(waitJobs, waitSize);
        int activeJobsThreshold0 = this.activeJobsThreshold;
        int waitJobsThreshold0 = this.waitJobsThreshold;
        block9: for (CollisionJobContext waitCtx : waitPriJobs) {
            if (activeJobs.size() < activeJobsThreshold0) {
                ++activated;
                ComputeJobContext computeJobContext = waitCtx.getJobContext();
                synchronized (computeJobContext) {
                    waitCtx.activate();
                    continue;
                }
            }
            if (this.stealReqs.get() <= 0) break;
            if (waitCtx.getJob().getClass().isAnnotationPresent(JobStealingDisabled.class)) continue;
            Integer stealingCnt = (Integer)waitCtx.getJobContext().getAttribute(STEALING_ATTEMPT_COUNT_ATTR);
            if (stealingCnt != null) {
                if (stealingCnt >= this.maxStealingAttempts) {
                    if (!this.log.isDebugEnabled()) continue;
                    this.log.debug("Waiting job exceeded stealing attempts and won't be rejected (will try other jobs on waiting list): " + waitCtx);
                    continue;
                }
            } else {
                stealingCnt = 0;
            }
            int jobsToReject = waitPriJobs.size() - activated - rejected - waitJobsThreshold0;
            if (this.log.isDebugEnabled()) {
                this.log.debug("Jobs to reject count [jobsToReject=" + jobsToReject + ", waitCtx=" + waitCtx + ']');
            }
            if (jobsToReject <= 0) break;
            Integer pri = (Integer)waitCtx.getJobContext().getAttribute(STEALING_PRIORITY_ATTR);
            if (pri == null) {
                pri = 0;
            }
            Iterator iter = this.rcvMsgMap.entrySet().iterator();
            while (iter.hasNext() && this.stealReqs.get() > 0) {
                MessageInfo info;
                Map.Entry entry = iter.next();
                UUID nodeId = (UUID)entry.getKey();
                if (ctx.node(nodeId) == null) {
                    iter.remove();
                    continue;
                }
                MessageInfo messageInfo = info = (MessageInfo)entry.getValue();
                synchronized (messageInfo) {
                    int jobsAsked = info.jobsToSteal();
                    assert (jobsAsked >= 0);
                    if (jobsAsked == 0) {
                        continue;
                    }
                    if (info.expired()) {
                        this.stealReqs.addAndGet(-info.jobsToSteal());
                        info.reset(0);
                        continue;
                    }
                    boolean found = false;
                    for (UUID id : waitCtx.getTaskSession().getTopology()) {
                        if (!id.equals(nodeId)) continue;
                        found = true;
                        break;
                    }
                    if (!found) {
                        if (this.log.isDebugEnabled()) {
                            this.log.debug("Thief node does not belong to task topology [thief=" + nodeId + ", task=" + waitCtx.getTaskSession() + ']');
                        }
                        continue;
                    }
                    if (this.stealReqs.get() <= 0) {
                        continue block9;
                    }
                    ComputeJobContext computeJobContext = waitCtx.getJobContext();
                    synchronized (computeJobContext) {
                        boolean cancel;
                        boolean bl = cancel = waitCtx.getJobContext().getAttribute(THIEF_NODE_ATTR) == null;
                        if (cancel) {
                            int i;
                            waitCtx.getJobContext().setAttribute(THIEF_NODE_ATTR, nodeId);
                            waitCtx.getJobContext().setAttribute(STEALING_ATTEMPT_COUNT_ATTR, stealingCnt + 1);
                            waitCtx.getJobContext().setAttribute(STEALING_PRIORITY_ATTR, pri + 1);
                            if (this.log.isDebugEnabled()) {
                                this.log.debug("Will try to reject job due to steal request [ctx=" + waitCtx + ", thief=" + nodeId + ']');
                            }
                            if ((i = this.stealReqs.decrementAndGet()) >= 0 && waitCtx.cancel()) {
                                ++rejected;
                                info.reset(jobsAsked - 1);
                                if (this.log.isDebugEnabled()) {
                                    this.log.debug("Rejected job due to steal request [ctx=" + waitCtx + ", nodeId=" + nodeId + ']');
                                }
                            } else {
                                if (this.log.isDebugEnabled()) {
                                    this.log.debug("Failed to reject job [i=" + i + ']');
                                }
                                waitCtx.getJobContext().setAttribute(THIEF_NODE_ATTR, null);
                                waitCtx.getJobContext().setAttribute(STEALING_ATTEMPT_COUNT_ATTR, stealingCnt);
                                waitCtx.getJobContext().setAttribute(STEALING_PRIORITY_ATTR, pri);
                                this.stealReqs.incrementAndGet();
                            }
                        }
                    }
                }
            }
        }
        return rejected;
    }

    private Collection<CollisionJobContext> sortJobs(Collection<CollisionJobContext> waitJobs, int waitSize) {
        ArrayList<CollisionJobContext> passiveList = new ArrayList<CollisionJobContext>(waitJobs.size());
        int i = 0;
        for (CollisionJobContext waitJob : waitJobs) {
            passiveList.add(waitJob);
            if (i++ != waitSize) continue;
            break;
        }
        Collections.sort(passiveList, this.comparator());
        return passiveList;
    }

    private Comparator<CollisionJobContext> comparator() {
        if (this.cmp == null) {
            this.cmp = new Comparator<CollisionJobContext>(){

                @Override
                public int compare(CollisionJobContext o1, CollisionJobContext o2) {
                    int p1 = JobStealingCollisionSpi.this.getJobPriority(o1.getJobContext());
                    int p2 = JobStealingCollisionSpi.this.getJobPriority(o2.getJobContext());
                    return Integer.compare(p2, p1);
                }
            };
        }
        return this.cmp;
    }

    private int getJobPriority(ComputeJobContext ctx) {
        Integer p;
        assert (ctx != null);
        try {
            p = (Integer)ctx.getAttribute(STEALING_PRIORITY_ATTR);
        }
        catch (ClassCastException e) {
            U.error(this.log, "Type of job context priority attribute 'ignite.stealing.priority' is not java.lang.Integer (will use default priority) [type=" + ctx.getAttribute(STEALING_PRIORITY_ATTR).getClass() + ", dfltPriority=" + 0 + ']', e);
            p = 0;
        }
        if (p == null) {
            p = 0;
        }
        return p;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void checkIdle(Collection<CollisionJobContext> waitJobs, Collection<CollisionJobContext> activeJobs) {
        int max = this.waitJobsThreshold + this.activeJobsThreshold;
        if (max < 0) {
            max = Integer.MAX_VALUE;
        }
        int jobsToSteal = max - (waitJobs.size() + activeJobs.size());
        if (this.log.isDebugEnabled()) {
            this.log.debug("Total number of jobs to be stolen: " + jobsToSteal);
        }
        if (jobsToSteal > 0) {
            ClusterNode next;
            int jobsLeft = jobsToSteal;
            int nodeCnt = this.getSpiContext().remoteNodes().size();
            int idx = 0;
            while (jobsLeft > 0 && idx++ < nodeCnt && (next = this.nodeQueue.poll()) != null) {
                if (this.getSpiContext().node(next.id()) == null) continue;
                if (!(F.isEmpty(this.stealAttrs) || next.attributes() != null && U.containsAll(next.attributes(), this.stealAttrs))) {
                    if (!this.log.isDebugEnabled()) continue;
                    this.log.debug("Skip node as it does not have all attributes: " + next.id());
                    continue;
                }
                int delta = 0;
                try {
                    MessageInfo msgInfo = (MessageInfo)this.sndMsgMap.get(next.id());
                    if (msgInfo == null) {
                        if (!this.log.isDebugEnabled()) continue;
                        this.log.debug("Failed to find message info for node: " + next.id());
                        continue;
                    }
                    Integer waitThreshold = (Integer)next.attribute(this.createSpiAttributeName(WAIT_JOBS_THRESHOLD_NODE_ATTR));
                    if (waitThreshold == null) {
                        U.error(this.log, "Remote node is not configured with GridJobStealingCollisionSpi and jobs will not be stolen from it (you must stop it and update its configuration to use GridJobStealingCollisionSpi): " + next);
                        continue;
                    }
                    delta = next.metrics().getCurrentWaitingJobs() - waitThreshold;
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Maximum number of jobs to steal from node [jobsToSteal=" + delta + ", node=" + next.id() + ']');
                    }
                    if (delta <= 0) continue;
                    MessageInfo messageInfo = msgInfo;
                    synchronized (messageInfo) {
                        block22: {
                            if (msgInfo.expired() || msgInfo.jobsToSteal() <= 0) break block22;
                            jobsLeft -= msgInfo.jobsToSteal();
                            continue;
                        }
                        if (jobsLeft < delta) {
                            delta = jobsLeft;
                        }
                        jobsLeft -= delta;
                        msgInfo.reset(delta);
                    }
                    this.getSpiContext().send(next, new JobStealingRequest(delta), JOB_STEALING_COMM_TOPIC);
                }
                catch (IgniteSpiException e) {
                    U.error(this.log, "Failed to send job stealing message to node: " + next, e);
                    jobsLeft += delta;
                }
                finally {
                    if (this.getSpiContext().node(next.id()) == null) continue;
                    this.nodeQueue.offer(next);
                }
            }
        }
    }

    @Override
    protected List<String> getConsistentAttributeNames() {
        ArrayList<String> attrs = new ArrayList<String>(2);
        attrs.add(this.createSpiAttributeName(MAX_STEALING_ATTEMPT_ATTR));
        attrs.add(this.createSpiAttributeName(MSG_EXPIRE_TIME_ATTR));
        return attrs;
    }

    public String toString() {
        return S.toString(JobStealingCollisionSpi.class, this);
    }

    private class MessageInfo {
        private int jobsToSteal;
        private long ts = U.currentTimeMillis();

        private MessageInfo() {
        }

        int jobsToSteal() {
            assert (Thread.holdsLock(this));
            return this.jobsToSteal;
        }

        boolean expired() {
            assert (Thread.holdsLock(this));
            return this.jobsToSteal > 0 && U.currentTimeMillis() - this.ts >= JobStealingCollisionSpi.this.msgExpireTime;
        }

        void reset(int jobsToSteal) {
            assert (Thread.holdsLock(this));
            this.jobsToSteal = jobsToSteal;
            this.ts = U.currentTimeMillis();
        }

        public String toString() {
            return S.toString(MessageInfo.class, this);
        }
    }
}

