/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.GridCachePreloaderAdapter;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplier;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
import org.apache.ignite.internal.util.GridConcurrentFactory;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.GPC;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentLinkedDeque8;

public class GridDhtPreloader
extends GridCachePreloaderAdapter {
    public static final IgniteProductVersion REBALANCING_VER_2_SINCE = IgniteProductVersion.fromString("1.5.0");
    public static final long DFLT_PRELOAD_RESEND_TIMEOUT = 1500L;
    private GridDhtPartitionTopology top;
    private final ConcurrentMap<IgniteUuid, GridDhtForceKeysFuture<?, ?>> forceKeyFuts = GridConcurrentFactory.newMap();
    private GridDhtPartitionSupplier supplier;
    private GridDhtPartitionDemander demander;
    private GridFutureAdapter<Object> startFut;
    private final ReadWriteLock busyLock = new ReentrantReadWriteLock();
    private final ReadWriteLock demandLock = new ReentrantReadWriteLock();
    private final ConcurrentLinkedDeque8<GridDhtLocalPartition> partsToEvict = new ConcurrentLinkedDeque8();
    private final AtomicInteger partsEvictOwning = new AtomicInteger();
    private volatile boolean stopping;
    private final GridLocalEventListener discoLsnr = new GridLocalEventListener(){

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onEvent(Event evt) {
            if (!GridDhtPreloader.this.enterBusy()) {
                return;
            }
            DiscoveryEvent e = (DiscoveryEvent)evt;
            try {
                ClusterNode loc = GridDhtPreloader.this.cctx.localNode();
                assert (e.type() == 10 || e.type() == 11 || e.type() == 12);
                ClusterNode n = e.eventNode();
                assert (!loc.id().equals(n.id()));
                for (GridDhtForceKeysFuture f : GridDhtPreloader.this.forceKeyFuts.values()) {
                    f.onDiscoveryEvent(e);
                }
                assert (e.type() != 10 || n.order() > loc.order()) : "Node joined with smaller-than-local order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']';
            }
            finally {
                GridDhtPreloader.this.leaveBusy();
            }
        }
    };

    public GridDhtPreloader(GridCacheContext<?, ?> cctx) {
        super(cctx);
        this.top = cctx.dht().topology();
        this.startFut = new GridFutureAdapter();
    }

    @Override
    public void start() {
        if (this.log.isDebugEnabled()) {
            this.log.debug("Starting DHT rebalancer...");
        }
        this.cctx.io().addHandler(this.cctx.cacheId(), GridDhtForceKeysRequest.class, (IgniteBiInClosure<UUID, ? extends GridCacheMessage>)new MessageHandler<GridDhtForceKeysRequest>(){

            @Override
            public void onMessage(ClusterNode node, GridDhtForceKeysRequest msg) {
                GridDhtPreloader.this.processForceKeysRequest(node, msg);
            }
        });
        this.cctx.io().addHandler(this.cctx.cacheId(), GridDhtForceKeysResponse.class, (IgniteBiInClosure<UUID, ? extends GridCacheMessage>)new MessageHandler<GridDhtForceKeysResponse>(){

            @Override
            public void onMessage(ClusterNode node, GridDhtForceKeysResponse msg) {
                GridDhtPreloader.this.processForceKeyResponse(node, msg);
            }
        });
        if (!this.cctx.kernalContext().clientNode()) {
            this.cctx.io().addHandler(this.cctx.cacheId(), GridDhtAffinityAssignmentRequest.class, (IgniteBiInClosure<UUID, ? extends GridCacheMessage>)new MessageHandler<GridDhtAffinityAssignmentRequest>(){

                @Override
                protected void onMessage(ClusterNode node, GridDhtAffinityAssignmentRequest msg) {
                    GridDhtPreloader.this.processAffinityAssignmentRequest(node, msg);
                }
            });
        }
        this.cctx.shared().affinity().onCacheCreated(this.cctx);
        this.supplier = new GridDhtPartitionSupplier(this.cctx);
        this.demander = new GridDhtPartitionDemander(this.cctx, this.demandLock);
        this.supplier.start();
        this.demander.start();
        this.cctx.events().addListener(this.discoLsnr, 10, 11, 12);
    }

    @Override
    public void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
        super.preloadPredicate(preloadPred);
        assert (this.supplier != null && this.demander != null) : "preloadPredicate may be called only after start()";
        this.supplier.preloadPredicate(preloadPred);
        this.demander.preloadPredicate(preloadPred);
    }

    @Override
    public void onKernalStop() {
        if (this.log.isDebugEnabled()) {
            this.log.debug("DHT rebalancer onKernalStop callback.");
        }
        this.stopping = true;
        this.cctx.events().removeListener(this.discoLsnr);
        this.busyLock.writeLock().lock();
        if (this.supplier != null) {
            this.supplier.stop();
        }
        if (this.demander != null) {
            this.demander.stop();
        }
        IgniteCheckedException err = this.stopError();
        for (GridDhtForceKeysFuture fut : this.forceKeyFuts.values()) {
            fut.onDone(err);
        }
        this.top = null;
    }

    private IgniteCheckedException stopError() {
        return new NodeStoppingException("Operation has been cancelled (cache or node is stopping).");
    }

    @Override
    public void onInitialExchangeComplete(@Nullable Throwable err) {
        if (err == null) {
            this.startFut.onDone();
        } else {
            this.startFut.onDone(err);
        }
    }

    @Override
    public void onTopologyChanged(GridDhtPartitionsExchangeFuture lastFut) {
        this.supplier.onTopologyChanged(lastFut.topologyVersion());
        this.demander.onTopologyChanged(lastFut);
    }

    @Override
    public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
        GridDhtPartitionTopology top = this.cctx.dht().topology();
        if (!this.cctx.rebalanceEnabled()) {
            return new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
        }
        int partCnt = this.cctx.affinity().partitions();
        assert (exchFut.forcePreload() || exchFut.dummyReassign() || exchFut.exchangeId().topologyVersion().equals(top.topologyVersion())) : "Topology version mismatch [exchId=" + exchFut.exchangeId() + ", cache=" + this.cctx.name() + ", topVer=" + top.topologyVersion() + ']';
        GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
        AffinityTopologyVersion topVer = assigns.topologyVersion();
        for (int p = 0; p < partCnt; ++p) {
            if (this.cctx.shared().exchange().hasPendingExchange()) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Skipping assignments creation, exchange worker has pending assignments: " + exchFut.exchangeId());
                }
                assigns.cancelled(true);
                return assigns;
            }
            if (!this.cctx.affinity().partitionLocalNode(p, topVer)) continue;
            GridDhtLocalPartition part = top.localPartition(p, topVer, true);
            assert (part != null);
            assert (part.id() == p);
            if (part.state() != GridDhtPartitionState.MOVING) {
                if (!this.log.isDebugEnabled()) continue;
                this.log.debug("Skipping partition assignment (state is not MOVING): " + part);
                continue;
            }
            Collection<ClusterNode> picked = this.pickedOwners(p, topVer);
            if (picked.isEmpty()) {
                top.own(part);
                if (this.cctx.events().isRecordable(86)) {
                    DiscoveryEvent discoEvt = exchFut.discoveryEvent();
                    this.cctx.events().addPreloadEvent(p, 86, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
                }
                if (!this.log.isDebugEnabled()) continue;
                this.log.debug("Owning partition as there are no other owners: " + part);
                continue;
            }
            ClusterNode n = F.rand(picked);
            GridDhtPartitionDemandMessage msg = (GridDhtPartitionDemandMessage)assigns.get(n);
            if (msg == null) {
                msg = new GridDhtPartitionDemandMessage(top.updateSequence(), exchFut.exchangeId().topologyVersion(), this.cctx.cacheId());
                assigns.put(n, msg);
            }
            msg.addPartition(p);
        }
        return assigns;
    }

    @Override
    public void onReconnected() {
        this.startFut = new GridFutureAdapter();
    }

    private Collection<ClusterNode> pickedOwners(int p, AffinityTopologyVersion topVer) {
        List<ClusterNode> affNodes = this.cctx.affinity().nodesByPartition(p, topVer);
        int affCnt = affNodes.size();
        Collection<ClusterNode> rmts = this.remoteOwners(p, topVer);
        int rmtCnt = rmts.size();
        if (rmtCnt <= affCnt) {
            return rmts;
        }
        ArrayList<ClusterNode> sorted = new ArrayList<ClusterNode>(rmts);
        Collections.sort(sorted, CU.nodeComparator(false));
        return sorted.subList(0, affCnt);
    }

    private Collection<ClusterNode> remoteOwners(int p, AffinityTopologyVersion topVer) {
        return F.view(this.cctx.dht().topology().owners(p, topVer), F.remoteNodes(this.cctx.nodeId()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void handleSupplyMessage(int idx, UUID id, GridDhtPartitionSupplyMessageV2 s) {
        if (!this.enterBusy()) {
            return;
        }
        try {
            this.demandLock.readLock().lock();
            try {
                this.demander.handleSupplyMessage(idx, id, s);
            }
            finally {
                this.demandLock.readLock().unlock();
            }
        }
        finally {
            this.leaveBusy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void handleDemandMessage(int idx, UUID id, GridDhtPartitionDemandMessage d) {
        if (!this.enterBusy()) {
            return;
        }
        try {
            this.supplier.handleDemandMessage(idx, id, d);
        }
        finally {
            this.leaveBusy();
        }
    }

    @Override
    public Runnable addAssignments(GridDhtPreloaderAssignments assignments, boolean forceRebalance, int cnt, Runnable next, @Nullable GridFutureAdapter<Boolean> forcedRebFut) {
        return this.demander.addAssignments(assignments, forceRebalance, cnt, next, forcedRebFut);
    }

    @Override
    public IgniteInternalFuture<Object> startFuture() {
        return this.startFut;
    }

    @Override
    public IgniteInternalFuture<?> syncFuture() {
        return this.cctx.kernalContext().clientNode() ? this.startFut : this.demander.syncFuture();
    }

    @Override
    public IgniteInternalFuture<Boolean> rebalanceFuture() {
        return this.cctx.kernalContext().clientNode() ? new GridFinishedFuture<Boolean>(true) : this.demander.rebalanceFuture();
    }

    private boolean enterBusy() {
        if (this.busyLock.readLock().tryLock()) {
            return true;
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Failed to enter busy state on node (exchanger is stopping): " + this.cctx.nodeId());
        }
        return false;
    }

    private void leaveBusy() {
        this.busyLock.readLock().unlock();
    }

    private void processForceKeysRequest(final ClusterNode node, final GridDhtForceKeysRequest msg) {
        IgniteInternalFuture<?> fut = this.cctx.mvcc().finishKeys(msg.keys(), msg.cacheId(), msg.topologyVersion());
        if (fut.isDone()) {
            this.processForceKeysRequest0(node, msg);
        } else {
            fut.listen(new CI1<IgniteInternalFuture<?>>(){

                @Override
                public void apply(IgniteInternalFuture<?> t) {
                    GridDhtPreloader.this.processForceKeysRequest0(node, msg);
                }
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processForceKeysRequest0(ClusterNode node, GridDhtForceKeysRequest msg) {
        if (!this.enterBusy()) {
            return;
        }
        try {
            ClusterNode loc = this.cctx.localNode();
            GridDhtForceKeysResponse res = new GridDhtForceKeysResponse(this.cctx.cacheId(), msg.futureId(), msg.miniId(), this.cctx.deploymentEnabled());
            for (KeyCacheObject k : msg.keys()) {
                GridCacheEntryEx entry;
                block20: {
                    int p = this.cctx.affinity().partition(k);
                    GridDhtLocalPartition locPart = this.top.localPartition(p, AffinityTopologyVersion.NONE, false);
                    if (locPart == null && !this.top.owners(p).contains(loc)) {
                        res.addMissed(k);
                        continue;
                    }
                    entry = null;
                    if (this.cctx.isSwapOrOffheapEnabled()) {
                        while (true) {
                            try {
                                entry = this.cctx.dht().entryEx(k);
                                entry.unswap();
                                break block20;
                            }
                            catch (GridCacheEntryRemovedException ignore) {
                                if (!this.log.isDebugEnabled()) continue;
                                this.log.debug("Got removed entry: " + k);
                                continue;
                            }
                            catch (GridDhtInvalidPartitionException ignore) {
                                if (this.log.isDebugEnabled()) {
                                    this.log.debug("Local node is no longer an owner: " + p);
                                }
                                res.addMissed(k);
                                break block20;
                            }
                            break;
                        }
                    }
                    entry = this.cctx.dht().peekEx(k);
                }
                if (entry != null) {
                    GridCacheEntryInfo info = entry.info();
                    if (info != null && !info.isNew()) {
                        res.addInfo(info);
                    }
                    if (!this.cctx.isSwapOrOffheapEnabled()) continue;
                    this.cctx.evicts().touch(entry, msg.topologyVersion());
                    continue;
                }
                if (!this.log.isDebugEnabled()) continue;
                this.log.debug("Key is not present in DHT cache: " + k);
            }
            if (this.log.isDebugEnabled()) {
                this.log.debug("Sending force key response [node=" + node.id() + ", res=" + res + ']');
            }
            this.cctx.io().send(node, (GridCacheMessage)res, this.cctx.ioPolicy());
        }
        catch (ClusterTopologyCheckedException ignore) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Received force key request form failed node (will ignore) [nodeId=" + node.id() + ", req=" + msg + ']');
            }
        }
        catch (IgniteCheckedException e) {
            U.error(this.log, "Failed to reply to force key request [nodeId=" + node.id() + ", req=" + msg + ']', e);
        }
        finally {
            this.leaveBusy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processForceKeyResponse(ClusterNode node, GridDhtForceKeysResponse msg) {
        if (!this.enterBusy()) {
            return;
        }
        try {
            GridDhtForceKeysFuture f = (GridDhtForceKeysFuture)this.forceKeyFuts.get(msg.futureId());
            if (f != null) {
                f.onResult(node.id(), msg);
            } else if (this.log.isDebugEnabled()) {
                this.log.debug("Receive force key response for unknown future (is it duplicate?) [nodeId=" + node.id() + ", res=" + msg + ']');
            }
        }
        finally {
            this.leaveBusy();
        }
    }

    private void processAffinityAssignmentRequest(final ClusterNode node, GridDhtAffinityAssignmentRequest req) {
        final AffinityTopologyVersion topVer = req.topologyVersion();
        if (this.log.isDebugEnabled()) {
            this.log.debug("Processing affinity assignment request [node=" + node + ", req=" + req + ']');
        }
        this.cctx.affinity().affinityReadyFuture(req.topologyVersion()).listen((IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>)new CI1<IgniteInternalFuture<AffinityTopologyVersion>>(){

            @Override
            public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
                if (GridDhtPreloader.this.log.isDebugEnabled()) {
                    GridDhtPreloader.this.log.debug("Affinity is ready for topology version, will send response [topVer=" + topVer + ", node=" + node + ']');
                }
                AffinityAssignment assignment = GridDhtPreloader.this.cctx.affinity().assignment(topVer);
                boolean newAffMode = node.version().compareTo(CacheAffinitySharedManager.LATE_AFF_ASSIGN_SINCE) >= 0;
                GridDhtAffinityAssignmentResponse res = new GridDhtAffinityAssignmentResponse(GridDhtPreloader.this.cctx.cacheId(), topVer, assignment.assignment(), newAffMode);
                if (newAffMode && GridDhtPreloader.this.cctx.affinity().affinityCache().centralizedAffinityFunction()) {
                    assert (assignment.idealAssignment() != null);
                    res.idealAffinityAssignment(assignment.idealAssignment());
                }
                try {
                    GridDhtPreloader.this.cctx.io().send(node, (GridCacheMessage)res, (byte)4);
                }
                catch (IgniteCheckedException e) {
                    U.error(GridDhtPreloader.this.log, "Failed to send affinity assignment response to remote node [node=" + node + ']', e);
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onPartitionEvicted(GridDhtLocalPartition part, boolean updateSeq) {
        if (!this.enterBusy()) {
            return;
        }
        try {
            this.top.onEvicted(part, updateSeq);
            if (this.cctx.events().isRecordable(83)) {
                this.cctx.events().addUnloadEvent(part.id());
            }
            if (updateSeq) {
                this.cctx.shared().exchange().scheduleResendPartitions();
            }
        }
        finally {
            this.leaveBusy();
        }
    }

    @Override
    public boolean needForceKeys() {
        IgniteInternalFuture<Boolean> rebalanceFut;
        return !this.cctx.rebalanceEnabled() || !(rebalanceFut = this.rebalanceFuture()).isDone() || !Boolean.TRUE.equals(rebalanceFut.result());
    }

    @Override
    public IgniteInternalFuture<Object> request(GridNearAtomicAbstractUpdateRequest req, AffinityTopologyVersion topVer) {
        if (!this.needForceKeys()) {
            return null;
        }
        return this.request0(req.keys(), topVer);
    }

    public GridDhtFuture<Object> request(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) {
        if (!this.needForceKeys()) {
            return null;
        }
        return this.request0(keys, topVer);
    }

    private GridDhtFuture<Object> request0(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) {
        final GridDhtForceKeysFuture fut = new GridDhtForceKeysFuture(this.cctx, topVer, keys, this);
        IgniteInternalFuture<AffinityTopologyVersion> topReadyFut = this.cctx.affinity().affinityReadyFuturex(topVer);
        if (this.startFut.isDone() && topReadyFut == null) {
            fut.init();
        } else if (topReadyFut == null) {
            this.startFut.listen(new CI1<IgniteInternalFuture<?>>(){

                @Override
                public void apply(IgniteInternalFuture<?> syncFut) {
                    GridDhtPreloader.this.cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable(){

                        @Override
                        public void run() {
                            fut.init();
                        }
                    });
                }
            });
        } else {
            GridCompoundFuture compound = new GridCompoundFuture();
            compound.add(this.startFut);
            compound.add(topReadyFut);
            compound.markInitialized();
            compound.listen(new CI1<IgniteInternalFuture<?>>(){

                @Override
                public void apply(IgniteInternalFuture<?> syncFut) {
                    fut.init();
                }
            });
        }
        return fut;
    }

    @Override
    public IgniteInternalFuture<Boolean> forceRebalance() {
        return this.demander.forceRebalance();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void unwindUndeploys() {
        this.demandLock.writeLock().lock();
        try {
            this.cctx.deploy().unwind(this.cctx);
        }
        finally {
            this.demandLock.writeLock().unlock();
        }
    }

    boolean addFuture(GridDhtForceKeysFuture<?, ?> fut) {
        this.forceKeyFuts.put(fut.futureId(), fut);
        if (this.stopping) {
            fut.onDone(this.stopError());
            return false;
        }
        return true;
    }

    void remoteFuture(GridDhtForceKeysFuture<?, ?> fut) {
        this.forceKeyFuts.remove(fut.futureId(), fut);
    }

    @Override
    public void evictPartitionAsync(GridDhtLocalPartition part) {
        this.partsToEvict.add(part);
        if (this.partsEvictOwning.get() == 0 && this.partsEvictOwning.compareAndSet(0, 1)) {
            this.cctx.closures().callLocalSafe(new GPC<Boolean>(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public Boolean call() {
                    boolean locked = true;
                    while (locked || !GridDhtPreloader.this.partsToEvict.isEmptyx()) {
                        if (!locked && !GridDhtPreloader.this.partsEvictOwning.compareAndSet(0, 1)) {
                            return false;
                        }
                        try {
                            GridDhtLocalPartition part = (GridDhtLocalPartition)GridDhtPreloader.this.partsToEvict.poll();
                            if (part == null) continue;
                            try {
                                part.tryEvict();
                            }
                            catch (Throwable ex) {
                                if (GridDhtPreloader.this.cctx.kernalContext().isStopping()) {
                                    LT.warn(GridDhtPreloader.this.log, ex, "Partition eviction failed (current node is stopping).", false, true);
                                    GridDhtPreloader.this.partsToEvict.clear();
                                    Boolean bl = true;
                                    if (!GridDhtPreloader.this.partsToEvict.isEmptyx()) {
                                        locked = true;
                                    } else {
                                        boolean res = GridDhtPreloader.this.partsEvictOwning.compareAndSet(1, 0);
                                        assert (res);
                                        locked = false;
                                    }
                                    return bl;
                                }
                                LT.error(GridDhtPreloader.this.log, ex, "Partition eviction failed, this can cause grid hang.");
                            }
                        }
                        finally {
                            if (!GridDhtPreloader.this.partsToEvict.isEmptyx()) {
                                locked = true;
                                continue;
                            }
                            boolean res = GridDhtPreloader.this.partsEvictOwning.compareAndSet(1, 0);
                            assert (res);
                            locked = false;
                        }
                    }
                    return true;
                }
            }, true);
        }
    }

    @Override
    public void dumpDebugInfo() {
        if (!this.forceKeyFuts.isEmpty()) {
            U.warn(this.log, "Pending force key futures [cache=" + this.cctx.name() + "]:");
            for (GridDhtForceKeysFuture fut : this.forceKeyFuts.values()) {
                U.warn(this.log, ">>> " + fut);
            }
        }
        this.supplier.dumpDebugInfo();
    }

    private abstract class MessageHandler<M>
    implements IgniteBiInClosure<UUID, M> {
        private static final long serialVersionUID = 0L;

        private MessageHandler() {
        }

        @Override
        public void apply(UUID nodeId, M msg) {
            ClusterNode node = GridDhtPreloader.this.cctx.node(nodeId);
            if (node == null) {
                if (GridDhtPreloader.this.log.isDebugEnabled()) {
                    GridDhtPreloader.this.log.debug("Received message from failed node [node=" + nodeId + ", msg=" + msg + ']');
                }
                return;
            }
            if (GridDhtPreloader.this.log.isDebugEnabled()) {
                GridDhtPreloader.this.log.debug("Received message from node [node=" + nodeId + ", msg=" + msg + ']');
            }
            this.onMessage(node, msg);
        }

        protected abstract void onMessage(ClusterNode var1, M var2);
    }
}

