/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
import org.apache.ignite.internal.processors.dr.GridDrType;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
import org.apache.ignite.internal.util.GridLeanSet;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.spi.IgniteSpiException;
import org.jetbrains.annotations.Nullable;

public class GridDhtPartitionDemander {
    private final GridCacheContext<?, ?> cctx;
    private final IgniteLogger log;
    private IgnitePredicate<GridCacheEntryInfo> preloadPred;
    @GridToStringInclude
    private final GridFutureAdapter syncFut = new GridFutureAdapter();
    @GridToStringInclude
    private volatile RebalanceFuture rebalanceFut;
    private AtomicReference<GridTimeoutObject> lastTimeoutObj = new AtomicReference();
    private volatile GridDhtPartitionsExchangeFuture lastExchangeFut;
    @Deprecated
    private final ReadWriteLock demandLock;
    @Deprecated
    private final AtomicInteger dmIdx = new AtomicInteger();
    @Deprecated
    private volatile DemandWorker worker;
    private final Map<Integer, Object> rebalanceTopics;
    private final AtomicBoolean startedEvtSent = new AtomicBoolean();
    private final AtomicBoolean stoppedEvtSent = new AtomicBoolean();

    public GridDhtPartitionDemander(GridCacheContext<?, ?> cctx, ReadWriteLock demandLock) {
        assert (cctx != null);
        this.cctx = cctx;
        this.demandLock = demandLock;
        this.log = cctx.logger(this.getClass());
        boolean enabled = cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode();
        this.rebalanceFut = new RebalanceFuture();
        if (!enabled) {
            this.rebalanceFut.onDone(true);
            this.syncFut.onDone();
        }
        HashMap<Integer, Object> tops = new HashMap<Integer, Object>();
        for (int idx = 0; idx < cctx.gridConfig().getRebalanceThreadPoolSize(); ++idx) {
            tops.put(idx, GridCachePartitionExchangeManager.rebalanceTopic(idx));
        }
        this.rebalanceTopics = tops;
    }

    void start() {
    }

    void stop() {
        try {
            this.rebalanceFut.cancel();
        }
        catch (Exception ignored) {
            this.rebalanceFut.onDone(false);
        }
        DemandWorker dw = this.worker;
        if (dw != null) {
            dw.cancel();
        }
        this.lastExchangeFut = null;
        this.lastTimeoutObj.set(null);
    }

    IgniteInternalFuture<?> syncFuture() {
        return this.syncFut;
    }

    IgniteInternalFuture<Boolean> rebalanceFuture() {
        return this.rebalanceFut;
    }

    void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
        this.preloadPred = preloadPred;
    }

    IgniteInternalFuture<Boolean> forceRebalance() {
        GridDhtPartitionsExchangeFuture exchFut;
        GridTimeoutObject obj = this.lastTimeoutObj.getAndSet(null);
        if (obj != null) {
            this.cctx.time().removeTimeoutObject(obj);
        }
        if ((exchFut = this.lastExchangeFut) != null) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Forcing rebalance event for future: " + exchFut);
            }
            final GridFutureAdapter<Boolean> fut = new GridFutureAdapter<Boolean>();
            exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>(){

                @Override
                public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
                    IgniteInternalFuture<Boolean> fut0 = GridDhtPartitionDemander.this.cctx.shared().exchange().forceRebalance(exchFut);
                    fut0.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>(){

                        @Override
                        public void apply(IgniteInternalFuture<Boolean> future) {
                            try {
                                fut.onDone(future.get());
                            }
                            catch (Exception e) {
                                fut.onDone(e);
                            }
                        }
                    });
                }
            });
            return fut;
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Ignoring force rebalance request (no topology event happened yet).");
        }
        return new GridFinishedFuture<Boolean>(true);
    }

    private boolean topologyChanged(RebalanceFuture fut) {
        return !this.cctx.affinity().affinityTopologyVersion().equals(fut.topologyVersion()) || fut != this.rebalanceFut;
    }

    private void preloadEvent(int part, int type, DiscoveryEvent discoEvt) {
        assert (discoEvt != null);
        this.cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
    }

    void onTopologyChanged(GridDhtPartitionsExchangeFuture lastFut) {
        this.lastExchangeFut = lastFut;
    }

    Runnable addAssignments(final GridDhtPreloaderAssignments assigns, boolean force, int cnt, final Runnable next, final @Nullable GridFutureAdapter<Boolean> forcedRebFut) {
        if (this.log.isDebugEnabled()) {
            this.log.debug("Adding partition assignments: " + assigns);
        }
        assert (force == (forcedRebFut != null));
        long delay = this.cctx.config().getRebalanceDelay();
        if (delay == 0L || force) {
            final RebalanceFuture oldFut = this.rebalanceFut;
            final RebalanceFuture fut = new RebalanceFuture(assigns, this.cctx, this.log, this.startedEvtSent, this.stoppedEvtSent, cnt);
            if (!oldFut.isInitial()) {
                oldFut.cancel();
            } else {
                fut.listen(new CI1<IgniteInternalFuture<Boolean>>(){

                    @Override
                    public void apply(IgniteInternalFuture<Boolean> fut) {
                        oldFut.onDone(fut.result());
                    }
                });
            }
            if (forcedRebFut != null) {
                fut.listen(new CI1<IgniteInternalFuture<Boolean>>(){

                    @Override
                    public void apply(IgniteInternalFuture<Boolean> future) {
                        try {
                            forcedRebFut.onDone(future.get());
                        }
                        catch (Exception e) {
                            forcedRebFut.onDone(e);
                        }
                    }
                });
            }
            this.rebalanceFut = fut;
            fut.sendRebalanceStartedEvent();
            if (assigns.cancelled()) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Rebalancing skipped due to cancelled assignments.");
                }
                fut.onDone(false);
                fut.sendRebalanceFinishedEvent();
                return null;
            }
            if (assigns.isEmpty()) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Rebalancing skipped due to empty assignments.");
                }
                fut.onDone(true);
                ((GridFutureAdapter)this.cctx.preloader().syncFuture()).onDone();
                fut.sendRebalanceFinishedEvent();
                return null;
            }
            return new Runnable(){

                @Override
                public void run() {
                    block6: {
                        try {
                            if (next != null) {
                                fut.listen(new CI1<IgniteInternalFuture<Boolean>>(){

                                    @Override
                                    public void apply(IgniteInternalFuture<Boolean> f) {
                                        block3: {
                                            try {
                                                if (f.get().booleanValue()) {
                                                    next.run();
                                                }
                                            }
                                            catch (IgniteCheckedException e) {
                                                if (!GridDhtPartitionDemander.this.log.isDebugEnabled()) break block3;
                                                GridDhtPartitionDemander.this.log.debug(e.getMessage());
                                            }
                                        }
                                    }
                                });
                            }
                            GridDhtPartitionDemander.this.requestPartitions(fut, assigns);
                        }
                        catch (IgniteCheckedException e) {
                            ClusterTopologyCheckedException cause = e.getCause(ClusterTopologyCheckedException.class);
                            if (cause != null) {
                                GridDhtPartitionDemander.this.log.warning("Failed to send initial demand request to node. " + e.getMessage());
                            } else {
                                GridDhtPartitionDemander.this.log.error("Failed to send initial demand request to node.", e);
                            }
                            fut.cancel();
                        }
                        catch (Throwable th) {
                            GridDhtPartitionDemander.this.log.error("Runtime error caught during initial demand request sending.", th);
                            fut.cancel();
                            if (!(th instanceof Error)) break block6;
                            throw th;
                        }
                    }
                }
            };
        }
        if (delay > 0L) {
            GridTimeoutObject obj = this.lastTimeoutObj.get();
            if (obj != null) {
                this.cctx.time().removeTimeoutObject(obj);
            }
            final GridDhtPartitionsExchangeFuture exchFut = this.lastExchangeFut;
            assert (exchFut != null) : "Delaying rebalance process without topology event.";
            obj = new GridTimeoutObjectAdapter(delay){

                @Override
                public void onTimeout() {
                    exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>(){

                        @Override
                        public void apply(IgniteInternalFuture<AffinityTopologyVersion> f) {
                            GridDhtPartitionDemander.this.cctx.shared().exchange().forceRebalance(exchFut);
                        }
                    });
                }
            };
            this.lastTimeoutObj.set(obj);
            this.cctx.time().addTimeoutObject(obj);
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void requestPartitions(RebalanceFuture fut, GridDhtPreloaderAssignments assigns) throws IgniteCheckedException {
        ClusterNode node;
        if (this.topologyChanged(fut)) {
            fut.cancel();
            return;
        }
        for (Map.Entry e : assigns.entrySet()) {
            node = (ClusterNode)e.getKey();
            GridDhtPartitionDemandMessage d = (GridDhtPartitionDemandMessage)e.getValue();
            fut.appendPartitions(node.id(), d.partitions());
        }
        for (Map.Entry e : assigns.entrySet()) {
            node = (ClusterNode)e.getKey();
            CacheConfiguration cfg = this.cctx.config();
            Collection parts = (Collection)((T2)fut.remaining.get(node.id())).get2();
            GridDhtPartitionDemandMessage d = (GridDhtPartitionDemandMessage)e.getValue();
            if (node.version().compareTo(GridDhtPreloader.REBALANCING_VER_2_SINCE) >= 0) {
                U.log(this.log, "Starting rebalancing [mode=" + (Object)((Object)cfg.getRebalanceMode()) + ", fromNode=" + node.id() + ", partitionsCount=" + parts.size() + ", topology=" + fut.topologyVersion() + ", updateSeq=" + fut.updateSeq + "]");
                int lsnrCnt = this.cctx.gridConfig().getRebalanceThreadPoolSize();
                ArrayList sParts = new ArrayList(lsnrCnt);
                for (int cnt = 0; cnt < lsnrCnt; ++cnt) {
                    sParts.add(new HashSet());
                }
                Iterator it = parts.iterator();
                int cnt = 0;
                while (it.hasNext()) {
                    ((Set)sParts.get(cnt++ % lsnrCnt)).add(it.next());
                }
                for (cnt = 0; cnt < lsnrCnt; ++cnt) {
                    if (((Set)sParts.get(cnt)).isEmpty()) continue;
                    GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, (Collection)sParts.get(cnt));
                    initD.topic(this.rebalanceTopics.get(cnt));
                    initD.updateSequence(fut.updateSeq);
                    initD.timeout(this.cctx.config().getRebalanceTimeout());
                    RebalanceFuture rebalanceFuture = fut;
                    synchronized (rebalanceFuture) {
                        if (!fut.isDone()) {
                            this.cctx.io().sendOrderedMessage(node, this.rebalanceTopics.get(cnt), initD, this.cctx.ioPolicy(), initD.timeout());
                        }
                    }
                    if (!this.log.isDebugEnabled()) continue;
                    this.log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" + cnt + ", partitions count=" + ((Set)sParts.get(cnt)).size() + " (" + this.partitionsList((Collection)sParts.get(cnt)) + ")]");
                }
                continue;
            }
            U.log(this.log, "Starting rebalancing (old api) [cache=" + this.cctx.name() + ", mode=" + (Object)((Object)cfg.getRebalanceMode()) + ", fromNode=" + node.id() + ", partitionsCount=" + parts.size() + ", topology=" + fut.topologyVersion() + ", updateSeq=" + fut.updateSeq + "]");
            d.timeout(this.cctx.config().getRebalanceTimeout());
            d.workerId(0);
            this.worker = new DemandWorker(this.dmIdx.incrementAndGet(), fut);
            this.worker.run(node, d);
        }
    }

    private String partitionsList(Collection<Integer> c) {
        ArrayList<Integer> s = new ArrayList<Integer>(c);
        Collections.sort(s);
        StringBuilder sb = new StringBuilder();
        int start = -1;
        int prev = -1;
        Iterator sit = s.iterator();
        while (sit.hasNext()) {
            int p = (Integer)sit.next();
            if (start == -1) {
                start = p;
                prev = p;
            }
            if (prev < p - 1) {
                sb.append(start);
                if (start != prev) {
                    sb.append("-").append(prev);
                }
                sb.append(", ");
                start = p;
            }
            if (!sit.hasNext()) {
                sb.append(start);
                if (start != p) {
                    sb.append("-").append(p);
                }
            }
            prev = p;
        }
        return sb.toString();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void handleSupplyMessage(int idx, UUID id, GridDhtPartitionSupplyMessageV2 supply) {
        block23: {
            AffinityTopologyVersion topVer = supply.topologyVersion();
            RebalanceFuture fut = this.rebalanceFut;
            ClusterNode node = this.cctx.node(id);
            if (node == null) {
                return;
            }
            if (!fut.isActual(supply.updateSequence())) {
                return;
            }
            if (this.topologyChanged(fut)) {
                return;
            }
            if (this.log.isDebugEnabled()) {
                this.log.debug("Received supply message: " + supply);
            }
            if (supply.classError() != null) {
                U.warn(this.log, "Rebalancing from node cancelled [node=" + id + "]. Class got undeployed during preloading: " + supply.classError());
                fut.cancel(id);
                return;
            }
            GridDhtPartitionTopology top = this.cctx.dht().topology();
            try {
                for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {
                    int p = e.getKey();
                    if (this.cctx.affinity().partitionLocalNode(p, topVer)) {
                        GridDhtLocalPartition part = top.localPartition(p, topVer, true);
                        assert (part != null);
                        boolean last = supply.last().contains(p);
                        if (part.state() == GridDhtPartitionState.MOVING) {
                            boolean reserved = part.reserve();
                            assert (reserved) : "Failed to reserve partition [gridName=" + this.cctx.gridName() + ", cacheName=" + this.cctx.namex() + ", part=" + part + ']';
                            part.lock();
                            try {
                                for (GridCacheEntryInfo entry : e.getValue().infos()) {
                                    if (!part.preloadingPermitted(entry.key(), entry.version())) {
                                        if (!this.log.isDebugEnabled()) continue;
                                        this.log.debug("Preloading is not permitted for entry due to evictions [key=" + entry.key() + ", ver=" + entry.version() + ']');
                                        continue;
                                    }
                                    if (this.preloadEntry(node, p, entry, topVer)) continue;
                                    if (!this.log.isDebugEnabled()) break;
                                    this.log.debug("Got entries for invalid partition during preloading (will skip) [p=" + p + ", entry=" + entry + ']');
                                    break;
                                }
                                if (!last) continue;
                                top.own(part);
                                fut.partitionDone(id, p);
                                if (!this.log.isDebugEnabled()) continue;
                                this.log.debug("Finished rebalancing partition: " + part);
                                continue;
                            }
                            finally {
                                part.unlock();
                                part.release();
                                continue;
                            }
                        }
                        if (last) {
                            fut.partitionDone(id, p);
                        }
                        if (!this.log.isDebugEnabled()) continue;
                        this.log.debug("Skipping rebalancing partition (state is not MOVING): " + part);
                        continue;
                    }
                    fut.partitionDone(id, p);
                    if (!this.log.isDebugEnabled()) continue;
                    this.log.debug("Skipping rebalancing partition (it does not belong on current node): " + p);
                }
                for (Integer miss : supply.missed()) {
                    if (!this.cctx.affinity().partitionLocalNode(miss, topVer)) continue;
                    fut.partitionMissed(id, miss);
                }
                for (Integer miss : supply.missed()) {
                    fut.partitionDone(id, miss);
                }
                GridDhtPartitionDemandMessage d = new GridDhtPartitionDemandMessage(supply.updateSequence(), supply.topologyVersion(), this.cctx.cacheId());
                d.timeout(this.cctx.config().getRebalanceTimeout());
                d.topic(this.rebalanceTopics.get(idx));
                if (!this.topologyChanged(fut) && !fut.isDone()) {
                    this.cctx.io().sendOrderedMessage(node, this.rebalanceTopics.get(idx), d, this.cctx.ioPolicy(), this.cctx.config().getRebalanceTimeout());
                }
            }
            catch (IgniteCheckedException e) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Node left during rebalancing [node=" + node.id() + ", msg=" + e.getMessage() + ']');
                }
            }
            catch (IgniteSpiException e) {
                if (!this.log.isDebugEnabled()) break block23;
                this.log.debug("Failed to send message to node (current node is stopping?) [node=" + node.id() + ", msg=" + e.getMessage() + ']');
            }
        }
    }

    private boolean preloadEntry(ClusterNode pick, int p, GridCacheEntryInfo entry, AffinityTopologyVersion topVer) throws IgniteCheckedException {
        try {
            GridCacheEntryEx cached = null;
            try {
                cached = this.cctx.dht().entryEx(entry.key());
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Rebalancing key [key=" + entry.key() + ", part=" + p + ", node=" + pick.id() + ']');
                }
                if (this.cctx.dht().isIgfsDataCache() && this.cctx.dht().igfsDataSpaceUsed() > this.cctx.dht().igfsDataSpaceMax()) {
                    LT.error(this.log, null, "Failed to rebalance IGFS data cache (IGFS space size exceeded maximum value, will ignore rebalance entries)");
                    if (cached.markObsoleteIfEmpty(null)) {
                        cached.context().cache().removeEntry(cached);
                    }
                    return true;
                }
                if (this.preloadPred == null || this.preloadPred.apply(entry)) {
                    if (cached.initialValue(entry.value(), entry.version(), entry.ttl(), entry.expireTime(), true, topVer, this.cctx.isDrEnabled() ? GridDrType.DR_PRELOAD : GridDrType.DR_NONE, false)) {
                        this.cctx.evicts().touch(cached, topVer);
                        if (this.cctx.events().isRecordable(84) && !cached.isInternal()) {
                            this.cctx.events().addEvent(cached.partition(), cached.key(), this.cctx.localNodeId(), (IgniteUuid)null, null, 84, entry.value(), true, null, false, null, null, null, true);
                        }
                    } else {
                        if (this.cctx.isSwapOrOffheapEnabled()) {
                            this.cctx.evicts().touch(cached, topVer);
                        }
                        if (this.log.isDebugEnabled()) {
                            this.log.debug("Rebalancing entry is already in cache (will ignore) [key=" + cached.key() + ", part=" + p + ']');
                        }
                    }
                } else if (this.log.isDebugEnabled()) {
                    this.log.debug("Rebalance predicate evaluated to false for entry (will ignore): " + entry);
                }
            }
            catch (GridCacheEntryRemovedException ignored) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Entry has been concurrently removed while rebalancing (will ignore) [key=" + cached.key() + ", part=" + p + ']');
                }
            }
            catch (GridDhtInvalidPartitionException ignored) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Partition became invalid during rebalancing (will ignore): " + p);
                }
                return false;
            }
        }
        catch (IgniteInterruptedCheckedException e) {
            throw e;
        }
        catch (IgniteCheckedException e) {
            throw new IgniteCheckedException("Failed to cache rebalanced entry (will stop rebalancing) [local=" + this.cctx.nodeId() + ", node=" + pick.id() + ", key=" + entry.key() + ", part=" + p + ']', e);
        }
        return true;
    }

    public String toString() {
        return S.toString(GridDhtPartitionDemander.class, this);
    }

    @Deprecated
    private class DemandWorker {
        private int id;
        private final LinkedBlockingDeque<GridDhtPreloaderAssignments> assignQ = new LinkedBlockingDeque();
        private final LinkedBlockingDeque<SupplyMessage> msgQ = new LinkedBlockingDeque();
        private long cntr;
        private IgniteLogger log = GridDhtPartitionDemander.access$400(GridDhtPartitionDemander.this);
        private volatile RebalanceFuture fut;

        private DemandWorker(int id, RebalanceFuture fut) {
            assert (id >= 0);
            this.id = id;
            this.fut = fut;
        }

        private void addMessage(SupplyMessage msg) {
            this.msgQ.offer(msg);
        }

        @Nullable
        private <T> T poll(BlockingQueue<T> deque, long time) throws InterruptedException {
            return deque.poll(time, TimeUnit.MILLISECONDS);
        }

        public Object topic(long idx) {
            return GridTopic.TOPIC_CACHE.topic(GridDhtPartitionDemander.this.cctx.namexx(), GridDhtPartitionDemander.this.cctx.nodeId(), this.id, idx);
        }

        public void cancel() {
            this.msgQ.clear();
            this.msgQ.offer(new SupplyMessage(null, null));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void demandFromNode(ClusterNode node, AffinityTopologyVersion topVer, GridDhtPartitionDemandMessage d, GridDhtPartitionsExchangeFuture exchFut) throws InterruptedException, IgniteCheckedException {
            GridDhtPartitionTopology top = GridDhtPartitionDemander.this.cctx.dht().topology();
            ++this.cntr;
            d.topic(this.topic(this.cntr));
            d.workerId(this.id);
            if (this.fut.isDone() || GridDhtPartitionDemander.this.topologyChanged(this.fut)) {
                return;
            }
            GridDhtPartitionDemander.this.cctx.io().addOrderedHandler(d.topic(), (IgniteBiInClosure<UUID, ? extends GridCacheMessage>)new CI2<UUID, GridDhtPartitionSupplyMessage>(){

                @Override
                public void apply(UUID nodeId, GridDhtPartitionSupplyMessage msg) {
                    DemandWorker.this.addMessage(new SupplyMessage(nodeId, msg));
                }
            });
            try {
                boolean retry;
                block7: do {
                    retry = false;
                    d = new GridDhtPartitionDemandMessage(d, (Collection)((T2)this.fut.remaining.get(node.id())).get2());
                    long timeout = GridDhtPartitionDemander.this.cctx.config().getRebalanceTimeout();
                    d.timeout(timeout);
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Sending demand message [node=" + node.id() + ", demand=" + d + ']');
                    }
                    GridDhtPartitionDemander.this.cctx.io().send(node, (GridCacheMessage)d, GridDhtPartitionDemander.this.cctx.ioPolicy());
                    while (!this.fut.isDone() && !GridDhtPartitionDemander.this.topologyChanged(this.fut)) {
                        GridDhtPartitionSupplyMessage supply;
                        SupplyMessage s = this.poll(this.msgQ, timeout);
                        if (s == null) {
                            if (!this.msgQ.isEmpty()) continue;
                            U.warn(this.log, "Timed out waiting for partitions to load, will retry in " + timeout + " ms (you may need to increase 'networkTimeout' or 'rebalanceBatchSize'" + " configuration properties).");
                            GridDhtPartitionDemander.this.cctx.io().removeOrderedHandler(d.topic());
                            d = new GridDhtPartitionDemandMessage(d, (Collection)((T2)this.fut.remaining.get(node.id())).get2());
                            d.topic(this.topic(++this.cntr));
                            GridDhtPartitionDemander.this.cctx.io().addOrderedHandler(d.topic(), (IgniteBiInClosure<UUID, ? extends GridCacheMessage>)new CI2<UUID, GridDhtPartitionSupplyMessage>(){

                                @Override
                                public void apply(UUID nodeId, GridDhtPartitionSupplyMessage msg) {
                                    DemandWorker.this.addMessage(new SupplyMessage(nodeId, msg));
                                }
                            });
                            retry = true;
                            continue block7;
                        }
                        if (s.senderId() == null) {
                            return;
                        }
                        if (!s.senderId().equals(node.id())) {
                            U.warn(this.log, "Received supply message from unexpected node [expectedId=" + node.id() + ", rcvdId=" + s.senderId() + ", msg=" + s + ']');
                            continue;
                        }
                        if (this.log.isDebugEnabled()) {
                            this.log.debug("Received supply message: " + s);
                        }
                        if ((supply = s.supply()).classError() != null) {
                            if (this.log.isDebugEnabled()) {
                                this.log.debug("Class got undeployed during preloading: " + supply.classError());
                            }
                            retry = true;
                            continue block7;
                        }
                        for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {
                            int p = e.getKey();
                            if (GridDhtPartitionDemander.this.cctx.affinity().partitionLocalNode(p, topVer)) {
                                GridDhtLocalPartition part = top.localPartition(p, topVer, true);
                                assert (part != null);
                                if (part.state() == GridDhtPartitionState.MOVING) {
                                    boolean reserved = part.reserve();
                                    assert (reserved) : "Failed to reserve partition [gridName=" + GridDhtPartitionDemander.access$000(GridDhtPartitionDemander.this).gridName() + ", cacheName=" + GridDhtPartitionDemander.access$000(GridDhtPartitionDemander.this).namex() + ", part=" + part + ']';
                                    part.lock();
                                    try {
                                        GridLeanSet<Integer> invalidParts = new GridLeanSet<Integer>();
                                        for (GridCacheEntryInfo entry : e.getValue().infos()) {
                                            if (invalidParts.contains(p)) continue;
                                            if (!part.preloadingPermitted(entry.key(), entry.version())) {
                                                if (!this.log.isDebugEnabled()) continue;
                                                this.log.debug("Preloading is not permitted for entry due to evictions [key=" + entry.key() + ", ver=" + entry.version() + ']');
                                                continue;
                                            }
                                            if (GridDhtPartitionDemander.this.preloadEntry(node, p, entry, topVer)) continue;
                                            invalidParts.add(p);
                                            if (!this.log.isDebugEnabled()) continue;
                                            this.log.debug("Got entries for invalid partition during preloading (will skip) [p=" + p + ", entry=" + entry + ']');
                                        }
                                        boolean last = supply.last().contains(p);
                                        if (!last) continue;
                                        this.fut.partitionDone(node.id(), p);
                                        top.own(part);
                                        if (this.log.isDebugEnabled()) {
                                            this.log.debug("Finished rebalancing partition: " + part);
                                        }
                                        if (!GridDhtPartitionDemander.this.cctx.events().isRecordable(82)) continue;
                                        GridDhtPartitionDemander.this.preloadEvent(p, 82, exchFut.discoveryEvent());
                                        continue;
                                    }
                                    finally {
                                        part.unlock();
                                        part.release();
                                        continue;
                                    }
                                }
                                this.fut.partitionDone(node.id(), p);
                                if (!this.log.isDebugEnabled()) continue;
                                this.log.debug("Skipping rebalancing partition (state is not MOVING): " + part);
                                continue;
                            }
                            this.fut.partitionDone(node.id(), p);
                            if (!this.log.isDebugEnabled()) continue;
                            this.log.debug("Skipping rebalancing partition (it does not belong on current node): " + p);
                        }
                        for (Integer miss : s.supply().missed()) {
                            if (!GridDhtPartitionDemander.this.cctx.affinity().partitionLocalNode(miss, topVer)) continue;
                            this.fut.partitionMissed(node.id(), miss);
                        }
                        for (Integer miss : s.supply().missed()) {
                            this.fut.partitionDone(node.id(), miss);
                        }
                        if (this.fut.remaining.get(node.id()) == null) continue block7;
                        if (!s.supply().ack()) continue;
                        retry = true;
                        continue block7;
                    }
                } while (retry && !this.fut.isDone() && !GridDhtPartitionDemander.this.topologyChanged(this.fut));
            }
            finally {
                GridDhtPartitionDemander.this.cctx.io().removeOrderedHandler(d.topic());
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(ClusterNode node, GridDhtPartitionDemandMessage d) throws IgniteCheckedException {
            GridDhtPartitionDemander.this.demandLock.readLock().lock();
            try {
                GridDhtPartitionsExchangeFuture exchFut = this.fut.exchFut;
                AffinityTopologyVersion topVer = this.fut.topVer;
                try {
                    this.demandFromNode(node, topVer, d, exchFut);
                }
                catch (InterruptedException e) {
                    throw new IgniteCheckedException(e);
                }
            }
            finally {
                GridDhtPartitionDemander.this.demandLock.readLock().unlock();
            }
        }

        public String toString() {
            return S.toString(DemandWorker.class, this, "assignQ", this.assignQ, "msgQ", this.msgQ, "super", super.toString());
        }
    }

    @Deprecated
    private static class SupplyMessage {
        private UUID sndId;
        private GridDhtPartitionSupplyMessage supply;

        private SupplyMessage() {
        }

        SupplyMessage(UUID sndId, GridDhtPartitionSupplyMessage supply) {
            this.sndId = sndId;
            this.supply = supply;
        }

        UUID senderId() {
            return this.sndId;
        }

        GridDhtPartitionSupplyMessage supply() {
            return this.supply;
        }

        public String toString() {
            return S.toString(SupplyMessage.class, this);
        }
    }

    public static class RebalanceFuture
    extends GridFutureAdapter<Boolean> {
        private static final long serialVersionUID = 1L;
        private final AtomicBoolean startedEvtSent;
        private final AtomicBoolean stoppedEvtSent;
        private final GridCacheContext<?, ?> cctx;
        private final IgniteLogger log;
        private final Map<UUID, T2<Long, Collection<Integer>>> remaining = new HashMap<UUID, T2<Long, Collection<Integer>>>();
        private final Map<UUID, Collection<Integer>> missed = new HashMap<UUID, Collection<Integer>>();
        @GridToStringExclude
        private final GridDhtPartitionsExchangeFuture exchFut;
        private final AffinityTopologyVersion topVer;
        private final long updateSeq;

        RebalanceFuture(GridDhtPreloaderAssignments assigns, GridCacheContext<?, ?> cctx, IgniteLogger log, AtomicBoolean startedEvtSent, AtomicBoolean stoppedEvtSent, long updateSeq) {
            assert (assigns != null);
            this.exchFut = assigns.exchangeFuture();
            this.topVer = assigns.topologyVersion();
            this.cctx = cctx;
            this.log = log;
            this.startedEvtSent = startedEvtSent;
            this.stoppedEvtSent = stoppedEvtSent;
            this.updateSeq = updateSeq;
        }

        public RebalanceFuture() {
            this.exchFut = null;
            this.topVer = null;
            this.cctx = null;
            this.log = null;
            this.startedEvtSent = null;
            this.stoppedEvtSent = null;
            this.updateSeq = -1L;
        }

        public AffinityTopologyVersion topologyVersion() {
            return this.topVer;
        }

        private boolean isActual(long updateSeq) {
            return this.updateSeq == updateSeq;
        }

        private boolean isInitial() {
            return this.topVer == null;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void appendPartitions(UUID nodeId, Collection<Integer> parts) {
            RebalanceFuture rebalanceFuture = this;
            synchronized (rebalanceFuture) {
                assert (parts != null) : "Partitions are null [cache=" + this.cctx.name() + ", fromNode=" + nodeId + "]";
                this.remaining.put(nodeId, new T2<Long, Collection<Integer>>(U.currentTimeMillis(), parts));
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public boolean cancel() {
            RebalanceFuture rebalanceFuture = this;
            synchronized (rebalanceFuture) {
                if (this.isDone()) {
                    return true;
                }
                U.log(this.log, "Cancelled rebalancing from all nodes [topology=" + this.topologyVersion() + ']');
                if (!this.cctx.kernalContext().isStopping()) {
                    for (UUID nodeId : this.remaining.keySet()) {
                        this.cleanupRemoteContexts(nodeId);
                    }
                }
                this.remaining.clear();
                this.checkIsDone(true);
            }
            return true;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void cancel(UUID nodeId) {
            RebalanceFuture rebalanceFuture = this;
            synchronized (rebalanceFuture) {
                if (this.isDone()) {
                    return;
                }
                U.log(this.log, "Cancelled rebalancing [cache=" + this.cctx.name() + ", fromNode=" + nodeId + ", topology=" + this.topologyVersion() + ", time=" + (U.currentTimeMillis() - (Long)this.remaining.get(nodeId).get1()) + " ms]");
                this.cleanupRemoteContexts(nodeId);
                this.remaining.remove(nodeId);
                this.onDone(false);
                this.checkIsDone();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void partitionMissed(UUID nodeId, int p) {
            RebalanceFuture rebalanceFuture = this;
            synchronized (rebalanceFuture) {
                if (this.isDone()) {
                    return;
                }
                if (this.missed.get(nodeId) == null) {
                    this.missed.put(nodeId, new HashSet());
                }
                this.missed.get(nodeId).add(p);
            }
        }

        private void cleanupRemoteContexts(UUID nodeId) {
            block5: {
                ClusterNode node = this.cctx.discovery().node(nodeId);
                if (node == null) {
                    return;
                }
                if (node.version().compareTo(GridDhtPreloader.REBALANCING_VER_2_SINCE) >= 0) {
                    GridDhtPartitionDemandMessage d = new GridDhtPartitionDemandMessage(-1L, this.topologyVersion(), this.cctx.cacheId());
                    d.timeout(this.cctx.config().getRebalanceTimeout());
                    try {
                        for (int idx = 0; idx < this.cctx.gridConfig().getRebalanceThreadPoolSize(); ++idx) {
                            d.topic(GridCachePartitionExchangeManager.rebalanceTopic(idx));
                            this.cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx), d, this.cctx.ioPolicy(), this.cctx.config().getRebalanceTimeout());
                        }
                    }
                    catch (IgniteCheckedException ignored) {
                        if (!this.log.isDebugEnabled()) break block5;
                        this.log.debug("Failed to send failover context cleanup request to node");
                    }
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void partitionDone(UUID nodeId, int p) {
            RebalanceFuture rebalanceFuture = this;
            synchronized (rebalanceFuture) {
                if (this.isDone()) {
                    return;
                }
                if (this.cctx.events().isRecordable(82)) {
                    this.preloadEvent(p, 82, this.exchFut.discoveryEvent());
                }
                T2<Long, Collection<Integer>> t = this.remaining.get(nodeId);
                assert (t != null) : "Remaining not found [cache=" + this.cctx.name() + ", fromNode=" + nodeId + ", part=" + p + "]";
                Collection parts = (Collection)t.get2();
                boolean rmvd = parts.remove(p);
                assert (rmvd) : "Partition already done [cache=" + this.cctx.name() + ", fromNode=" + nodeId + ", part=" + p + ", left=" + parts + "]";
                if (parts.isEmpty()) {
                    U.log(this.log, "Completed " + (this.remaining.size() == 1 ? "(final) " : "") + "rebalancing [fromNode=" + nodeId + ", topology=" + this.topologyVersion() + ", time=" + (U.currentTimeMillis() - (Long)t.get1()) + " ms]");
                    this.remaining.remove(nodeId);
                }
                this.checkIsDone();
            }
        }

        private void preloadEvent(int part, int type, DiscoveryEvent discoEvt) {
            assert (discoEvt != null);
            this.cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
        }

        private void preloadEvent(int type, DiscoveryEvent discoEvt) {
            this.preloadEvent(-1, type, discoEvt);
        }

        private void checkIsDone() {
            this.checkIsDone(false);
        }

        private void checkIsDone(boolean cancelled) {
            if (this.remaining.isEmpty()) {
                this.sendRebalanceFinishedEvent();
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Completed rebalance future: " + this);
                }
                this.cctx.shared().exchange().scheduleResendPartitions();
                HashSet<Integer> m = new HashSet<Integer>();
                for (Map.Entry<UUID, Collection<Integer>> e : this.missed.entrySet()) {
                    if (e.getValue() == null || e.getValue().isEmpty()) continue;
                    m.addAll(e.getValue());
                }
                if (!m.isEmpty()) {
                    U.log(this.log, "Reassigning partitions that were missed: " + m);
                    this.onDone(false);
                    this.cctx.shared().exchange().forceDummyExchange(true, this.exchFut);
                    return;
                }
                if (!cancelled && !this.cctx.preloader().syncFuture().isDone()) {
                    ((GridFutureAdapter)this.cctx.preloader().syncFuture()).onDone();
                }
                this.onDone(!cancelled);
            }
        }

        private void sendRebalanceStartedEvent() {
            if (!(!this.cctx.events().isRecordable(80) || this.cctx.isReplicated() && this.startedEvtSent.get())) {
                this.preloadEvent(80, this.exchFut.discoveryEvent());
                this.startedEvtSent.set(true);
            }
        }

        private void sendRebalanceFinishedEvent() {
            if (!(!this.cctx.events().isRecordable(81) || this.cctx.isReplicated() && this.stoppedEvtSent.get())) {
                this.preloadEvent(81, this.exchFut.discoveryEvent());
                this.stoppedEvtSent.set(true);
            }
        }

        @Override
        public String toString() {
            return S.toString(RebalanceFuture.class, this);
        }
    }
}

