/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.cache;

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheExplicitLockSpan;
import org.apache.ignite.internal.processors.cache.GridCacheFuture;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.GridCacheMvccManager;
import org.apache.ignite.internal.processors.cache.GridCachePreloader;
import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.GridListSet;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.IgnitePair;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;

public class GridCachePartitionExchangeManager<K, V>
extends GridCacheSharedManagerAdapter<K, V> {
    private static final int EXCHANGE_HISTORY_SIZE = 1000;
    private AtomicReference<ResendTimeoutObject> pendingResend = new AtomicReference();
    private final long partResendTimeout = IgniteSystemProperties.getLong("IGNITE_PRELOAD_RESEND_TIMEOUT", 1500L);
    private final ReadWriteLock busyLock = new ReentrantReadWriteLock();
    private final AtomicLong lastRefresh = new AtomicLong(-1L);
    @GridToStringInclude
    private ExchangeWorker exchWorker;
    @GridToStringExclude
    private final ConcurrentMap<Integer, GridClientPartitionTopology> clientTops = new ConcurrentHashMap8<Integer, GridClientPartitionTopology>();
    private volatile GridDhtPartitionsExchangeFuture lastInitializedFut;
    private final ConcurrentMap<AffinityTopologyVersion, AffinityReadyFuture> readyFuts = new ConcurrentHashMap8<AffinityTopologyVersion, AffinityReadyFuture>();
    private final ConcurrentSkipListMap<AffinityTopologyVersion, IgnitePair<IgniteProductVersion>> nodeVers = new ConcurrentSkipListMap();
    private final AtomicReference<AffinityTopologyVersion> readyTopVer = new AtomicReference<AffinityTopologyVersion>(AffinityTopologyVersion.NONE);
    private GridFutureAdapter<?> reconnectExchangeFut;
    private ExchangeFutureSet exchFuts = new ExchangeFutureSet();
    private volatile IgniteCheckedException stopErr;
    private int longRunningOpsDumpCnt;
    private DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss.SSS");
    private final GridLocalEventListener discoLsnr = new GridLocalEventListener(){

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onEvent(Event evt) {
            if (!GridCachePartitionExchangeManager.this.enterBusy()) {
                return;
            }
            try {
                DiscoveryEvent e = (DiscoveryEvent)evt;
                ClusterNode loc = GridCachePartitionExchangeManager.this.cctx.localNode();
                assert (e.type() == 10 || e.type() == 11 || e.type() == 12 || e.type() == 18);
                ClusterNode n = e.eventNode();
                GridDhtPartitionExchangeId exchId = null;
                GridDhtPartitionsExchangeFuture exchFut = null;
                if (e.type() != 18) {
                    assert (!loc.id().equals(n.id()));
                    if (e.type() == 11 || e.type() == 12) {
                        assert (GridCachePartitionExchangeManager.this.cctx.discovery().node(n.id()) == null);
                        GridDhtPartitionsExchangeFuture initFut = null;
                        if (((AffinityTopologyVersion)GridCachePartitionExchangeManager.this.readyTopVer.get()).equals(AffinityTopologyVersion.NONE)) {
                            initFut = GridCachePartitionExchangeManager.this.exchangeFuture(GridCachePartitionExchangeManager.this.initialExchangeId(), null, null, null);
                            initFut.onNodeLeft(n);
                        }
                        for (GridDhtPartitionsExchangeFuture f : GridCachePartitionExchangeManager.this.exchFuts.values()) {
                            if (f == initFut) continue;
                            f.onNodeLeft(n);
                        }
                    }
                    assert (e.type() != 10 || n.order() > loc.order()) : "Node joined with smaller-than-local order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']';
                    exchId = GridCachePartitionExchangeManager.this.exchangeId(n.id(), GridCachePartitionExchangeManager.this.affinityTopologyVersion(e), e.type());
                    exchFut = GridCachePartitionExchangeManager.this.exchangeFuture(exchId, e, null, null);
                } else {
                    DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)e;
                    if (customEvt.customMessage() instanceof DynamicCacheChangeBatch) {
                        DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customEvt.customMessage();
                        ArrayList<DynamicCacheChangeRequest> valid = new ArrayList<DynamicCacheChangeRequest>(batch.requests().size());
                        for (final DynamicCacheChangeRequest req : batch.requests()) {
                            if (req.exchangeNeeded()) {
                                valid.add(req);
                                continue;
                            }
                            IgniteInternalFuture<?> fut = null;
                            if (req.cacheFutureTopologyVersion() != null) {
                                fut = GridCachePartitionExchangeManager.this.affinityReadyFuture(req.cacheFutureTopologyVersion());
                            }
                            if (fut == null || fut.isDone()) {
                                GridCachePartitionExchangeManager.this.cctx.cache().completeStartFuture(req);
                                continue;
                            }
                            fut.listen(new CI1<IgniteInternalFuture<?>>(){

                                @Override
                                public void apply(IgniteInternalFuture<?> fut) {
                                    GridCachePartitionExchangeManager.this.cctx.cache().completeStartFuture(req);
                                }
                            });
                        }
                        if (!F.isEmpty(valid)) {
                            exchId = GridCachePartitionExchangeManager.this.exchangeId(n.id(), GridCachePartitionExchangeManager.this.affinityTopologyVersion(e), e.type());
                            exchFut = GridCachePartitionExchangeManager.this.exchangeFuture(exchId, e, valid, null);
                        }
                    } else if (customEvt.customMessage() instanceof CacheAffinityChangeMessage) {
                        CacheAffinityChangeMessage msg = (CacheAffinityChangeMessage)customEvt.customMessage();
                        if (msg.exchangeId() == null) {
                            if (msg.exchangeNeeded()) {
                                exchId = GridCachePartitionExchangeManager.this.exchangeId(n.id(), GridCachePartitionExchangeManager.this.affinityTopologyVersion(e), e.type());
                                exchFut = GridCachePartitionExchangeManager.this.exchangeFuture(exchId, e, null, msg);
                            }
                        } else {
                            GridCachePartitionExchangeManager.this.exchangeFuture(msg.exchangeId(), null, null, null).onAffinityChangeMessage(customEvt.eventNode(), msg);
                        }
                    }
                }
                if (exchId != null) {
                    if (GridCachePartitionExchangeManager.this.log.isDebugEnabled()) {
                        GridCachePartitionExchangeManager.this.log.debug("Discovery event (will start exchange): " + exchId);
                    }
                    exchFut.onEvent(exchId, e);
                    GridCachePartitionExchangeManager.this.addFuture(exchFut);
                } else if (GridCachePartitionExchangeManager.this.log.isDebugEnabled()) {
                    GridCachePartitionExchangeManager.this.log.debug("Do not start exchange for discovery event: " + evt);
                }
            }
            finally {
                GridCachePartitionExchangeManager.this.leaveBusy();
            }
        }
    };

    @Override
    protected void start0() throws IgniteCheckedException {
        super.start0();
        this.exchWorker = new ExchangeWorker();
        this.cctx.gridEvents().addLocalEventListener(this.discoLsnr, 10, 11, 12, 18);
        this.cctx.io().addHandler(0, GridDhtPartitionsSingleMessage.class, (IgniteBiInClosure<UUID, ? extends GridCacheMessage>)new MessageHandler<GridDhtPartitionsSingleMessage>(){

            @Override
            public void onMessage(ClusterNode node, GridDhtPartitionsSingleMessage msg) {
                GridCachePartitionExchangeManager.this.processSinglePartitionUpdate(node, msg);
            }
        });
        this.cctx.io().addHandler(0, GridDhtPartitionsFullMessage.class, (IgniteBiInClosure<UUID, ? extends GridCacheMessage>)new MessageHandler<GridDhtPartitionsFullMessage>(){

            @Override
            public void onMessage(ClusterNode node, GridDhtPartitionsFullMessage msg) {
                GridCachePartitionExchangeManager.this.processFullPartitionUpdate(node, msg);
            }
        });
        this.cctx.io().addHandler(0, GridDhtPartitionsSingleRequest.class, (IgniteBiInClosure<UUID, ? extends GridCacheMessage>)new MessageHandler<GridDhtPartitionsSingleRequest>(){

            @Override
            public void onMessage(ClusterNode node, GridDhtPartitionsSingleRequest msg) {
                GridCachePartitionExchangeManager.this.processSinglePartitionRequest(node, msg);
            }
        });
    }

    public IgniteInternalFuture<?> reconnectExchangeFuture() {
        return this.reconnectExchangeFut;
    }

    private GridDhtPartitionExchangeId initialExchangeId() {
        DiscoveryEvent discoEvt = this.cctx.discovery().localJoinEvent();
        assert (discoEvt != null);
        AffinityTopologyVersion startTopVer = this.affinityTopologyVersion(discoEvt);
        assert (discoEvt.topologyVersion() == startTopVer.topologyVersion());
        return this.exchangeId(this.cctx.localNode().id(), startTopVer, 10);
    }

    @Override
    protected void onKernalStart0(boolean reconnect) throws IgniteCheckedException {
        super.onKernalStart0(reconnect);
        ClusterNode loc = this.cctx.localNode();
        long startTime = loc.metrics().getStartTime();
        assert (startTime > 0L);
        DiscoveryEvent discoEvt = this.cctx.discovery().localJoinEvent();
        GridDhtPartitionExchangeId exchId = this.initialExchangeId();
        GridDhtPartitionsExchangeFuture fut = this.exchangeFuture(exchId, discoEvt, null, null);
        if (reconnect) {
            this.reconnectExchangeFut = new GridFutureAdapter();
        }
        this.exchWorker.futQ.addFirst(fut);
        if (!this.cctx.kernalContext().clientNode()) {
            for (int cnt = 0; cnt < this.cctx.gridConfig().getRebalanceThreadPoolSize(); ++cnt) {
                final int idx = cnt;
                this.cctx.io().addOrderedHandler(GridCachePartitionExchangeManager.rebalanceTopic(cnt), (IgniteBiInClosure<UUID, ? extends GridCacheMessage>)new CI2<UUID, GridCacheMessage>(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    public void apply(UUID id, GridCacheMessage m) {
                        if (!GridCachePartitionExchangeManager.this.enterBusy()) {
                            return;
                        }
                        try {
                            GridCacheContext cacheCtx = GridCachePartitionExchangeManager.this.cctx.cacheContext(m.cacheId);
                            if (cacheCtx != null) {
                                if (m instanceof GridDhtPartitionSupplyMessageV2) {
                                    cacheCtx.preloader().handleSupplyMessage(idx, id, (GridDhtPartitionSupplyMessageV2)m);
                                } else if (m instanceof GridDhtPartitionDemandMessage) {
                                    cacheCtx.preloader().handleDemandMessage(idx, id, (GridDhtPartitionDemandMessage)m);
                                } else {
                                    U.error(GridCachePartitionExchangeManager.this.log, "Unsupported message type: " + m.getClass().getName());
                                }
                            }
                        }
                        finally {
                            GridCachePartitionExchangeManager.this.leaveBusy();
                        }
                    }
                });
            }
        }
        new IgniteThread(this.cctx.gridName(), "exchange-worker", this.exchWorker).start();
        if (reconnect) {
            fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>(){

                @Override
                public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
                    try {
                        fut.get();
                        for (GridCacheContext cacheCtx : GridCachePartitionExchangeManager.this.cctx.cacheContexts()) {
                            cacheCtx.preloader().onInitialExchangeComplete(null);
                        }
                        GridCachePartitionExchangeManager.this.reconnectExchangeFut.onDone();
                    }
                    catch (IgniteCheckedException e) {
                        for (GridCacheContext cacheCtx : GridCachePartitionExchangeManager.this.cctx.cacheContexts()) {
                            cacheCtx.preloader().onInitialExchangeComplete(e);
                        }
                        GridCachePartitionExchangeManager.this.reconnectExchangeFut.onDone(e);
                    }
                }
            });
        } else {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Beginning to wait on local exchange future: " + fut);
            }
            boolean first = true;
            while (true) {
                try {
                    fut.get(this.cctx.preloadExchangeTimeout());
                }
                catch (IgniteFutureTimeoutCheckedException ignored) {
                    if (first) {
                        U.warn(this.log, "Failed to wait for initial partition map exchange. Possible reasons are: " + U.nl() + "  ^-- Transactions in deadlock." + U.nl() + "  ^-- Long running transactions (ignore if this is the case)." + U.nl() + "  ^-- Unreleased explicit locks.");
                        first = false;
                        continue;
                    }
                    U.warn(this.log, "Still waiting for initial partition map exchange [fut=" + fut + ']');
                    continue;
                }
                break;
            }
            for (GridCacheContext cacheCtx : this.cctx.cacheContexts()) {
                if (cacheCtx.startTopologyVersion() != null) continue;
                cacheCtx.preloader().onInitialExchangeComplete(null);
            }
            if (this.log.isDebugEnabled()) {
                this.log.debug("Finished waiting for initial exchange: " + fut.exchangeId());
            }
        }
    }

    public static Object rebalanceTopic(int idx) {
        return GridTopic.TOPIC_CACHE.topic("Rebalance", (long)idx);
    }

    @Override
    protected void onKernalStop0(boolean cancel) {
        this.cctx.gridEvents().removeLocalEventListener(this.discoLsnr, new int[0]);
        this.cctx.io().removeHandler(0, GridDhtPartitionsSingleMessage.class);
        this.cctx.io().removeHandler(0, GridDhtPartitionsFullMessage.class);
        this.cctx.io().removeHandler(0, GridDhtPartitionsSingleRequest.class);
        this.stopErr = this.cctx.kernalContext().clientDisconnected() ? new IgniteClientDisconnectedCheckedException(this.cctx.kernalContext().cluster().clientReconnectFuture(), "Client node disconnected: " + this.cctx.gridName()) : new IgniteInterruptedCheckedException("Node is stopping: " + this.cctx.gridName());
        ExchangeFutureSet exchFuts0 = this.exchFuts;
        if (exchFuts0 != null) {
            for (GridDhtPartitionsExchangeFuture gridDhtPartitionsExchangeFuture : this.exchFuts.values()) {
                gridDhtPartitionsExchangeFuture.onDone(this.stopErr);
            }
        }
        for (AffinityReadyFuture affinityReadyFuture : this.readyFuts.values()) {
            affinityReadyFuture.onDone(this.stopErr);
        }
        if (!this.cctx.kernalContext().clientNode()) {
            for (int cnt = 0; cnt < this.cctx.gridConfig().getRebalanceThreadPoolSize(); ++cnt) {
                this.cctx.io().removeOrderedHandler(GridCachePartitionExchangeManager.rebalanceTopic(cnt));
            }
        }
        U.cancel(this.exchWorker);
        if (this.log.isDebugEnabled()) {
            this.log.debug("Before joining on exchange worker: " + this.exchWorker);
        }
        U.join(this.exchWorker, this.log);
        ResendTimeoutObject resendTimeoutObj = this.pendingResend.getAndSet(null);
        if (resendTimeoutObj != null) {
            this.cctx.time().removeTimeoutObject(resendTimeoutObj);
        }
    }

    @Override
    protected void stop0(boolean cancel) {
        super.stop0(cancel);
        this.busyLock.writeLock().lock();
        this.exchFuts = null;
    }

    public GridDhtPartitionTopology clientTopology(int cacheId, GridDhtPartitionsExchangeFuture exchFut) {
        GridClientPartitionTopology top = (GridClientPartitionTopology)this.clientTops.get(cacheId);
        if (top != null) {
            return top;
        }
        Object affKey = null;
        DynamicCacheDescriptor desc = this.cctx.cache().cacheDescriptor(cacheId);
        if (desc != null) {
            CacheConfiguration ccfg = desc.cacheConfiguration();
            AffinityFunction aff = ccfg.getAffinity();
            affKey = this.cctx.kernalContext().affinity().similaryAffinityKey(aff, ccfg.getNodeFilter(), ccfg.getBackups(), aff.partitions());
        }
        top = new GridClientPartitionTopology(this.cctx, cacheId, exchFut, affKey);
        GridClientPartitionTopology old = this.clientTops.putIfAbsent(cacheId, top);
        return old != null ? old : top;
    }

    public Collection<GridClientPartitionTopology> clientTopologies() {
        return this.clientTops.values();
    }

    public GridClientPartitionTopology clearClientTopology(int cacheId) {
        return (GridClientPartitionTopology)this.clientTops.remove(cacheId);
    }

    public AffinityTopologyVersion topologyVersion() {
        GridDhtPartitionsExchangeFuture lastInitializedFut0 = this.lastInitializedFut;
        return lastInitializedFut0 != null ? lastInitializedFut0.exchangeId().topologyVersion() : AffinityTopologyVersion.NONE;
    }

    public AffinityTopologyVersion readyAffinityVersion() {
        return this.readyTopVer.get();
    }

    public GridDhtTopologyFuture lastTopologyFuture() {
        return this.lastInitializedFut;
    }

    @Nullable
    public IgniteInternalFuture<?> affinityReadyFuture(AffinityTopologyVersion ver) {
        GridDhtPartitionsExchangeFuture lastInitializedFut0 = this.lastInitializedFut;
        if (lastInitializedFut0 != null && lastInitializedFut0.topologyVersion().compareTo(ver) == 0) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Return lastInitializedFut for topology ready future [ver=" + ver + ", fut=" + lastInitializedFut0 + ']');
            }
            return lastInitializedFut0;
        }
        AffinityTopologyVersion topVer = this.readyTopVer.get();
        if (topVer.compareTo(ver) >= 0) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Return finished future for topology ready future [ver=" + ver + ", topVer=" + topVer + ']');
            }
            return null;
        }
        GridFutureAdapter fut = F.addIfAbsent(this.readyFuts, ver, new AffinityReadyFuture(ver));
        if (this.log.isDebugEnabled()) {
            this.log.debug("Created topology ready future [ver=" + ver + ", fut=" + fut + ']');
        }
        if ((topVer = this.readyTopVer.get()).compareTo(ver) >= 0) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Completing created topology ready future [ver=" + topVer + ", topVer=" + topVer + ", fut=" + fut + ']');
            }
            fut.onDone(topVer);
        } else if (this.stopErr != null) {
            fut.onDone(this.stopErr);
        }
        return fut;
    }

    public IgniteProductVersion minimumNodeVersion(AffinityTopologyVersion topVer) {
        IgnitePair<IgniteProductVersion> vers = this.nodeVers.get(topVer);
        return vers == null ? this.cctx.localNode().version() : (IgniteProductVersion)vers.get1();
    }

    private boolean enterBusy() {
        if (this.busyLock.readLock().tryLock()) {
            return true;
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Failed to enter to busy state (exchange manager is stopping): " + this.cctx.localNodeId());
        }
        return false;
    }

    private void leaveBusy() {
        this.busyLock.readLock().unlock();
    }

    public List<GridDhtPartitionsExchangeFuture> exchangeFutures() {
        return this.exchFuts.values();
    }

    public boolean hasPendingExchange() {
        return !this.exchWorker.futQ.isEmpty();
    }

    private AffinityTopologyVersion affinityTopologyVersion(DiscoveryEvent evt) {
        if (evt.type() == 18) {
            return ((DiscoveryCustomEvent)evt).affinityTopologyVersion();
        }
        return new AffinityTopologyVersion(evt.topologyVersion());
    }

    public void forceDummyExchange(boolean reassign, GridDhtPartitionsExchangeFuture exchFut) {
        this.exchWorker.addFuture(new GridDhtPartitionsExchangeFuture(this.cctx, reassign, exchFut.discoveryEvent(), exchFut.exchangeId()));
    }

    public IgniteInternalFuture<Boolean> forceRebalance(GridDhtPartitionsExchangeFuture exchFut) {
        GridFutureAdapter<Boolean> fut = new GridFutureAdapter<Boolean>();
        this.exchWorker.addFuture(new GridDhtPartitionsExchangeFuture(this.cctx, exchFut.discoveryEvent(), exchFut.exchangeId(), fut));
        return fut;
    }

    public void scheduleResendPartitions() {
        ResendTimeoutObject update;
        ResendTimeoutObject timeout = this.pendingResend.get();
        if ((timeout == null || timeout.started()) && this.pendingResend.compareAndSet(timeout, update = new ResendTimeoutObject())) {
            this.cctx.time().addTimeoutObject(update);
        }
    }

    private void refreshPartitions() {
        ClusterNode oldest = this.cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
        if (oldest == null) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Skip partitions refresh, there are no server nodes [loc=" + this.cctx.localNodeId() + ']');
            }
            return;
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Refreshing partitions [oldest=" + oldest.id() + ", loc=" + this.cctx.localNodeId() + ']');
        }
        if (oldest.id().equals(this.cctx.localNodeId())) {
            GridDhtPartitionsExchangeFuture lastFut = this.lastInitializedFut;
            AffinityTopologyVersion rmtTopVer = lastFut != null ? lastFut.topologyVersion() : AffinityTopologyVersion.NONE;
            Collection<ClusterNode> rmts = CU.remoteNodes(this.cctx, rmtTopVer);
            if (this.log.isDebugEnabled()) {
                this.log.debug("Refreshing partitions from oldest node: " + this.cctx.localNodeId());
            }
            this.sendAllPartitions(rmts);
        } else {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Refreshing local partitions from non-oldest node: " + this.cctx.localNodeId());
            }
            this.sendLocalPartitions(oldest, null);
        }
    }

    private boolean sendAllPartitions(Collection<ClusterNode> nodes) {
        GridDhtPartitionsFullMessage m = this.createPartitionsFullMessage(nodes, null, null, true);
        if (this.log.isDebugEnabled()) {
            this.log.debug("Sending all partitions [nodeIds=" + U.nodeIds(nodes) + ", msg=" + m + ']');
        }
        for (ClusterNode node : nodes) {
            try {
                assert (!node.equals(this.cctx.localNode()));
                this.cctx.io().sendNoRetry(node, m, (byte)2);
            }
            catch (ClusterTopologyCheckedException ignore) {
                if (!this.log.isDebugEnabled()) continue;
                this.log.debug("Failed to send partition update to node because it left grid (will ignore) [node=" + node.id() + ", msg=" + m + ']');
            }
            catch (IgniteCheckedException e) {
                U.warn(this.log, "Failed to send partitions full message [node=" + node + ", err=" + e + ']');
            }
        }
        return true;
    }

    public GridDhtPartitionsFullMessage createPartitionsFullMessage(Collection<ClusterNode> nodes, @Nullable GridDhtPartitionExchangeId exchId, @Nullable GridCacheVersion lastVer, boolean compress) {
        GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(exchId, lastVer, exchId != null ? exchId.topologyVersion() : AffinityTopologyVersion.NONE);
        boolean useOldApi = false;
        if (nodes != null) {
            for (ClusterNode node : nodes) {
                if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0) {
                    useOldApi = true;
                    compress = false;
                    break;
                }
                if (this.canUsePartitionMapCompression(node)) continue;
                compress = false;
            }
        }
        m.compress(compress);
        HashMap<Object, T2<Integer, GridDhtPartitionFullMap>> dupData = new HashMap<Object, T2<Integer, GridDhtPartitionFullMap>>();
        for (GridCacheContext cacheCtx : this.cctx.cacheContexts()) {
            AffinityTopologyVersion startTopVer;
            boolean ready;
            if (cacheCtx.isLocal() || !(ready = exchId != null ? (startTopVer = cacheCtx.startTopologyVersion()) == null || startTopVer.compareTo(exchId.topologyVersion()) <= 0 : cacheCtx.started())) continue;
            GridAffinityAssignmentCache affCache = cacheCtx.affinity().affinityCache();
            if (affCache != null) {
                GridDhtPartitionFullMap locMap = cacheCtx.topology().partitionMap(true);
                if (useOldApi) {
                    locMap = new GridDhtPartitionFullMap(locMap.nodeId(), locMap.nodeOrder(), locMap.updateSequence(), locMap);
                }
                this.addFullPartitionsMap(m, dupData, compress, cacheCtx.cacheId(), locMap, affCache.similarAffinityKey());
                if (exchId == null) continue;
                m.addPartitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters(true));
                continue;
            }
            assert (this.cctx.cacheContext(cacheCtx.cacheId()) == null) : cacheCtx.name();
        }
        for (GridClientPartitionTopology top : this.cctx.exchange().clientTopologies()) {
            GridDhtPartitionFullMap map = top.partitionMap(true);
            this.addFullPartitionsMap(m, dupData, compress, top.cacheId(), map, top.similarAffinityKey());
            if (exchId == null) continue;
            m.addPartitionUpdateCounters(top.cacheId(), top.updateCounters(true));
        }
        return m;
    }

    private void addFullPartitionsMap(GridDhtPartitionsFullMessage m, Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData, boolean compress, Integer cacheId, GridDhtPartitionFullMap map, Object affKey) {
        Integer dupDataCache = null;
        if (compress && affKey != null && !m.containsCache(cacheId)) {
            T2<Integer, GridDhtPartitionFullMap> state0 = dupData.get(affKey);
            if (state0 != null && ((GridDhtPartitionFullMap)state0.get2()).partitionStateEquals(map)) {
                GridDhtPartitionFullMap map0 = new GridDhtPartitionFullMap(map.nodeId(), map.nodeOrder(), map.updateSequence());
                for (Map.Entry e : map.entrySet()) {
                    map0.put(e.getKey(), ((GridDhtPartitionMap2)e.getValue()).emptyCopy());
                }
                map = map0;
                dupDataCache = (Integer)state0.get1();
            } else {
                dupData.put(affKey, new T2<Integer, GridDhtPartitionFullMap>(cacheId, map));
            }
        }
        m.addFullPartitionsMap(cacheId, map, dupDataCache);
    }

    private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) {
        GridDhtPartitionsSingleMessage m = this.createPartitionsSingleMessage(node, id, this.cctx.kernalContext().clientNode(), false);
        if (this.log.isDebugEnabled()) {
            this.log.debug("Sending local partitions [nodeId=" + node.id() + ", msg=" + m + ']');
        }
        try {
            this.cctx.io().sendNoRetry(node, m, (byte)2);
        }
        catch (ClusterTopologyCheckedException ignore) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Failed to send partition update to node because it left grid (will ignore) [node=" + node.id() + ", msg=" + m + ']');
            }
        }
        catch (IgniteCheckedException e) {
            U.error(this.log, "Failed to send local partition map to node [node=" + node + ", exchId=" + id + ']', e);
        }
    }

    public GridDhtPartitionsSingleMessage createPartitionsSingleMessage(ClusterNode targetNode, @Nullable GridDhtPartitionExchangeId exchangeId, boolean clientOnlyExchange, boolean sndCounters) {
        GridDhtPartitionMap2 locMap;
        boolean compress = this.canUsePartitionMapCompression(targetNode);
        GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(exchangeId, clientOnlyExchange, this.cctx.versions().last(), compress);
        HashMap<Object, T2<Integer, Map<Integer, GridDhtPartitionState>>> dupData = new HashMap<Object, T2<Integer, Map<Integer, GridDhtPartitionState>>>();
        for (GridCacheContext cacheCtx : this.cctx.cacheContexts()) {
            if (cacheCtx.isLocal()) continue;
            locMap = cacheCtx.topology().localPartitionMap();
            if (targetNode.version().compareTo(GridDhtPartitionMap2.SINCE) < 0) {
                locMap = new GridDhtPartitionMap(locMap.nodeId(), locMap.updateSequence(), locMap.map());
            }
            this.addPartitionMap(m, dupData, compress, cacheCtx.cacheId(), locMap, cacheCtx.affinity().affinityCache().similarAffinityKey());
            if (!sndCounters) continue;
            m.partitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters(true));
        }
        for (GridClientPartitionTopology top : this.clientTops.values()) {
            if (m.partitions() != null && m.partitions().containsKey(top.cacheId())) continue;
            locMap = top.localPartitionMap();
            this.addPartitionMap(m, dupData, compress, top.cacheId(), locMap, top.similarAffinityKey());
            if (!sndCounters) continue;
            m.partitionUpdateCounters(top.cacheId(), top.updateCounters(true));
        }
        return m;
    }

    private void addPartitionMap(GridDhtPartitionsSingleMessage m, Map<Object, T2<Integer, Map<Integer, GridDhtPartitionState>>> dupData, boolean compress, Integer cacheId, GridDhtPartitionMap2 map, Object affKey) {
        Integer dupDataCache = null;
        if (compress) {
            T2<Integer, Map<Integer, GridDhtPartitionState>> state0 = dupData.get(affKey);
            if (state0 != null && ((Map)state0.get2()).equals(map.map())) {
                dupDataCache = (Integer)state0.get1();
                map = map.emptyCopy();
            } else {
                dupData.put(affKey, new T2<Integer, Map<Integer, GridDhtPartitionState>>(cacheId, map.map()));
            }
        }
        m.addLocalPartitionMap(cacheId, map, dupDataCache);
    }

    private GridDhtPartitionExchangeId exchangeId(UUID nodeId, AffinityTopologyVersion topVer, int evt) {
        return new GridDhtPartitionExchangeId(nodeId, evt, topVer);
    }

    private GridDhtPartitionsExchangeFuture exchangeFuture(GridDhtPartitionExchangeId exchId, @Nullable DiscoveryEvent discoEvt, @Nullable Collection<DynamicCacheChangeRequest> reqs, @Nullable CacheAffinityChangeMessage affChangeMsg) {
        GridDhtPartitionsExchangeFuture fut = new GridDhtPartitionsExchangeFuture(this.cctx, this.busyLock, exchId, reqs, affChangeMsg);
        GridDhtPartitionsExchangeFuture old = this.exchFuts.addx(fut);
        if (old != null) {
            fut = old;
            if (reqs != null) {
                fut.cacheChangeRequests(reqs);
            }
            if (affChangeMsg != null) {
                fut.affinityChangeMessage(affChangeMsg);
            }
        }
        if (discoEvt != null) {
            fut.onEvent(exchId, discoEvt);
        }
        if (this.stopErr != null) {
            fut.onDone(this.stopErr);
        }
        return fut;
    }

    /*
     * WARNING - void declaration
     */
    public void onExchangeDone(GridDhtPartitionsExchangeFuture exchFut, @Nullable Throwable err) {
        ExchangeFutureSet exchFuts0;
        AffinityTopologyVersion topVer = exchFut.topologyVersion();
        if (this.log.isDebugEnabled()) {
            this.log.debug("Exchange done [topVer=" + topVer + ", fut=" + exchFut + ", err=" + err + ']');
        }
        IgniteProductVersion minVer = this.cctx.localNode().version();
        IgniteProductVersion maxVer = this.cctx.localNode().version();
        if (err == null && !F.isEmpty(exchFut.discoveryEvent().topologyNodes())) {
            for (ClusterNode node : exchFut.discoveryEvent().topologyNodes()) {
                IgniteProductVersion igniteProductVersion = node.version();
                if (igniteProductVersion.compareTo(minVer) < 0) {
                    minVer = igniteProductVersion;
                }
                if (igniteProductVersion.compareTo(maxVer) <= 0) continue;
                maxVer = igniteProductVersion;
            }
        }
        this.nodeVers.put(topVer, new IgnitePair<IgniteProductVersion>(minVer, maxVer));
        AffinityTopologyVersion histVer = new AffinityTopologyVersion(topVer.topologyVersion() - 10L, 0);
        for (AffinityTopologyVersion affinityTopologyVersion : this.nodeVers.headMap((Object)histVer).keySet()) {
            this.nodeVers.remove(affinityTopologyVersion);
        }
        if (err == null) {
            AffinityTopologyVersion readyVer;
            while ((readyVer = this.readyTopVer.get()).compareTo(topVer) < 0 && !this.readyTopVer.compareAndSet(readyVer, topVer)) {
            }
            for (Map.Entry entry : this.readyFuts.entrySet()) {
                if (((AffinityTopologyVersion)entry.getKey()).compareTo(topVer) > 0) continue;
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Completing created topology ready future [ver=" + topVer + ", fut=" + entry.getValue() + ']');
                }
                ((AffinityReadyFuture)entry.getValue()).onDone(topVer);
            }
        } else {
            for (Map.Entry entry : this.readyFuts.entrySet()) {
                if (((AffinityTopologyVersion)entry.getKey()).compareTo(topVer) > 0) continue;
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Completing created topology ready future with error [ver=" + topVer + ", fut=" + entry.getValue() + ']');
                }
                ((AffinityReadyFuture)entry.getValue()).onDone(err);
            }
        }
        if ((exchFuts0 = this.exchFuts) != null) {
            boolean bl = false;
            for (GridDhtPartitionsExchangeFuture fut : exchFuts0.values()) {
                void var8_17;
                if (exchFut.exchangeId().topologyVersion().compareTo(fut.exchangeId().topologyVersion()) < 0 || ++var8_17 <= 10) continue;
                fut.cleanUp();
            }
        }
    }

    private boolean addFuture(GridDhtPartitionsExchangeFuture fut) {
        if (fut.onAdded()) {
            this.exchWorker.addFuture(fut);
            return true;
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processFullPartitionUpdate(ClusterNode node, GridDhtPartitionsFullMessage msg) {
        if (!this.enterBusy()) {
            return;
        }
        try {
            if (msg.exchangeId() == null) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Received full partition update [node=" + node.id() + ", msg=" + msg + ']');
                }
                boolean updated = false;
                for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) {
                    Integer cacheId = entry.getKey();
                    GridCacheContext cacheCtx = this.cctx.cacheContext(cacheId);
                    if (cacheCtx != null && !cacheCtx.started()) continue;
                    GridDhtPartitionTopology top = null;
                    if (cacheCtx == null) {
                        top = (GridDhtPartitionTopology)this.clientTops.get(cacheId);
                    } else if (!cacheCtx.isLocal()) {
                        top = cacheCtx.topology();
                    }
                    if (top == null) continue;
                    updated |= top.update(null, entry.getValue(), null);
                }
                if (!this.cctx.kernalContext().clientNode() && updated) {
                    this.refreshPartitions();
                }
            } else {
                this.exchangeFuture(msg.exchangeId(), null, null, null).onReceive(node, msg);
            }
        }
        finally {
            this.leaveBusy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processSinglePartitionUpdate(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) {
        if (!this.enterBusy()) {
            return;
        }
        try {
            if (msg.exchangeId() == null) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Received local partition update [nodeId=" + node.id() + ", parts=" + msg + ']');
                }
                boolean updated = false;
                for (Map.Entry<Integer, GridDhtPartitionMap2> entry : msg.partitions().entrySet()) {
                    Integer cacheId = entry.getKey();
                    GridCacheContext cacheCtx = this.cctx.cacheContext(cacheId);
                    if (cacheCtx != null && cacheCtx.startTopologyVersion() != null && entry.getValue() != null && entry.getValue().topologyVersion() != null && cacheCtx.startTopologyVersion().compareTo(entry.getValue().topologyVersion()) > 0) continue;
                    GridDhtPartitionTopology top = null;
                    if (cacheCtx == null) {
                        top = (GridDhtPartitionTopology)this.clientTops.get(cacheId);
                    } else if (!cacheCtx.isLocal()) {
                        top = cacheCtx.topology();
                    }
                    if (top == null) continue;
                    updated |= top.update(null, entry.getValue(), null, true);
                    this.cctx.affinity().checkRebalanceState(top, cacheId);
                }
                if (updated) {
                    this.scheduleResendPartitions();
                }
            } else if (msg.client()) {
                final GridDhtPartitionsExchangeFuture exchFut = this.exchangeFuture(msg.exchangeId(), null, null, null);
                exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>(){

                    @Override
                    public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
                        exchFut.onReceive(node, msg);
                    }
                });
            } else {
                this.exchangeFuture(msg.exchangeId(), null, null, null).onReceive(node, msg);
            }
        }
        finally {
            this.leaveBusy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processSinglePartitionRequest(ClusterNode node, GridDhtPartitionsSingleRequest msg) {
        if (!this.enterBusy()) {
            return;
        }
        try {
            this.sendLocalPartitions(node, msg.exchangeId());
        }
        finally {
            this.leaveBusy();
        }
    }

    public void dumpDebugInfo() throws Exception {
        this.dumpDebugInfo(null);
    }

    public void dumpDebugInfo(@Nullable AffinityTopologyVersion exchTopVer) throws Exception {
        ExchangeFutureSet exchFuts;
        U.warn(this.log, "Ready affinity version: " + this.readyTopVer.get());
        U.warn(this.log, "Last exchange future: " + this.lastInitializedFut);
        U.warn(this.log, "Pending exchange futures:");
        for (GridFutureAdapter fut : this.exchWorker.futQ) {
            U.warn(this.log, ">>> " + fut);
        }
        if (!this.readyFuts.isEmpty()) {
            U.warn(this.log, "Pending affinity ready futures:");
            for (GridFutureAdapter fut : this.readyFuts.values()) {
                U.warn(this.log, ">>> " + fut);
            }
        }
        if ((exchFuts = this.exchFuts) != null) {
            U.warn(this.log, "Last 10 exchange futures (total: " + exchFuts.size() + "):");
            int cnt = 0;
            for (GridDhtPartitionsExchangeFuture fut : exchFuts.values()) {
                U.warn(this.log, ">>> " + fut);
                if (++cnt != 10) continue;
                break;
            }
        }
        this.dumpPendingObjects(exchTopVer);
        for (GridCacheContext cacheCtx : this.cctx.cacheContexts()) {
            cacheCtx.preloader().dumpDebugInfo();
        }
        this.cctx.affinity().dumpDebugInfo();
        this.cctx.gridIO().dumpStats();
    }

    public void dumpLongRunningOperations(long timeout) {
        try {
            GridCacheMvccManager mvcc;
            GridDhtPartitionsExchangeFuture lastFut = this.lastInitializedFut;
            if (lastFut != null && !lastFut.isDone()) {
                return;
            }
            long curTime = U.currentTimeMillis();
            boolean found = false;
            IgniteTxManager tm = this.cctx.tm();
            if (tm != null) {
                for (IgniteInternalTx tx : tm.activeTransactions()) {
                    if (curTime - tx.startTime() <= timeout) continue;
                    found = true;
                    if (this.longRunningOpsDumpCnt >= GridDhtPartitionsExchangeFuture.DUMP_PENDING_OBJECTS_THRESHOLD) break;
                    U.warn(this.log, "Found long running transaction [startTime=" + this.formatTime(tx.startTime()) + ", curTime=" + this.formatTime(curTime) + ", tx=" + tx + ']');
                }
            }
            if ((mvcc = this.cctx.mvcc()) != null) {
                for (GridCacheFuture<?> gridCacheFuture : mvcc.activeFutures()) {
                    if (curTime - gridCacheFuture.startTime() <= timeout) continue;
                    found = true;
                    if (this.longRunningOpsDumpCnt >= GridDhtPartitionsExchangeFuture.DUMP_PENDING_OBJECTS_THRESHOLD) break;
                    U.warn(this.log, "Found long running cache future [startTime=" + this.formatTime(gridCacheFuture.startTime()) + ", curTime=" + this.formatTime(curTime) + ", fut=" + gridCacheFuture + ']');
                }
                for (GridCacheFuture<?> gridCacheFuture : mvcc.atomicFutures()) {
                    if (curTime - gridCacheFuture.startTime() <= timeout) continue;
                    found = true;
                    if (this.longRunningOpsDumpCnt >= GridDhtPartitionsExchangeFuture.DUMP_PENDING_OBJECTS_THRESHOLD) break;
                    U.warn(this.log, "Found long running cache future [startTime=" + this.formatTime(gridCacheFuture.startTime()) + ", curTime=" + this.formatTime(curTime) + ", fut=" + gridCacheFuture + ']');
                }
            }
            if (found) {
                if (this.longRunningOpsDumpCnt < GridDhtPartitionsExchangeFuture.DUMP_PENDING_OBJECTS_THRESHOLD) {
                    ++this.longRunningOpsDumpCnt;
                    if (IgniteSystemProperties.getBoolean("IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT", false)) {
                        U.warn(this.log, "Found long running cache operations, dump threads.");
                        U.dumpThreads(this.log);
                    }
                    U.warn(this.log, "Found long running cache operations, dump IO statistics.");
                    this.cctx.gridIO().dumpStats();
                }
            } else {
                this.longRunningOpsDumpCnt = 0;
            }
        }
        catch (Exception e) {
            U.error(this.log, "Failed to dump debug information: " + e, e);
        }
    }

    private String formatTime(long time) {
        return this.dateFormat.format(new Date(time));
    }

    private void dumpPendingObjects(@Nullable AffinityTopologyVersion exchTopVer) {
        GridCacheMvccManager mvcc;
        IgniteTxManager tm = this.cctx.tm();
        if (tm != null) {
            U.warn(this.log, "Pending transactions:");
            for (IgniteInternalTx tx : tm.activeTransactions()) {
                if (exchTopVer != null) {
                    U.warn(this.log, ">>> [txVer=" + tx.topologyVersionSnapshot() + ", exchWait=" + tm.needWaitTransaction(tx, exchTopVer) + ", tx=" + tx + ']');
                    continue;
                }
                U.warn(this.log, ">>> [txVer=" + tx.topologyVersionSnapshot() + ", tx=" + tx + ']');
            }
        }
        if ((mvcc = this.cctx.mvcc()) != null) {
            U.warn(this.log, "Pending explicit locks:");
            for (GridCacheExplicitLockSpan gridCacheExplicitLockSpan : mvcc.activeExplicitLocks()) {
                U.warn(this.log, ">>> " + gridCacheExplicitLockSpan);
            }
            U.warn(this.log, "Pending cache futures:");
            for (GridCacheFuture gridCacheFuture : mvcc.activeFutures()) {
                U.warn(this.log, ">>> " + gridCacheFuture);
            }
            U.warn(this.log, "Pending atomic cache futures:");
            for (GridCacheFuture gridCacheFuture : mvcc.atomicFutures()) {
                U.warn(this.log, ">>> " + gridCacheFuture);
            }
            U.warn(this.log, "Pending data streamer futures:");
            for (IgniteInternalFuture igniteInternalFuture : mvcc.dataStreamerFutures()) {
                U.warn(this.log, ">>> " + igniteInternalFuture);
            }
            if (tm != null) {
                U.warn(this.log, "Pending transaction deadlock detection futures:");
                for (IgniteInternalFuture igniteInternalFuture : tm.deadlockDetectionFutures()) {
                    U.warn(this.log, ">>> " + igniteInternalFuture);
                }
            }
        }
        for (GridCacheContext gridCacheContext : this.cctx.cacheContexts()) {
            GridCacheAffinityManager affMgr;
            if (gridCacheContext.isLocal()) continue;
            GridCacheContext ctx0 = gridCacheContext.isNear() ? gridCacheContext.near().dht().context() : gridCacheContext;
            GridCachePreloader preloader = ctx0.preloader();
            if (preloader != null) {
                preloader.dumpDebugInfo();
            }
            if ((affMgr = ctx0.affinity()) == null) continue;
            affMgr.dumpDebugInfo();
        }
    }

    @Nullable
    private <T> T poll(BlockingQueue<T> deque, long time, GridWorker w) throws InterruptedException {
        assert (w != null);
        if (w.isCancelled()) {
            Thread.currentThread().interrupt();
        }
        return deque.poll(time, TimeUnit.MILLISECONDS);
    }

    private boolean canUsePartitionMapCompression(ClusterNode node) {
        IgniteProductVersion ver = node.version();
        if (ver.compareToIgnoreTimestamp(GridDhtPartitionsAbstractMessage.PART_MAP_COMPRESS_SINCE) >= 0) {
            return ver.minor() != 7 || ver.maintenance() >= 4;
        }
        return false;
    }

    private class AffinityReadyFuture
    extends GridFutureAdapter<AffinityTopologyVersion> {
        private static final long serialVersionUID = 0L;
        @GridToStringInclude
        private AffinityTopologyVersion topVer;

        private AffinityReadyFuture(AffinityTopologyVersion topVer) {
            this.topVer = topVer;
        }

        @Override
        public boolean onDone(AffinityTopologyVersion res, @Nullable Throwable err) {
            assert (res != null || err != null);
            boolean done = super.onDone(res, err);
            if (done) {
                GridCachePartitionExchangeManager.this.readyFuts.remove(this.topVer, this);
            }
            return done;
        }

        @Override
        public String toString() {
            return S.toString(AffinityReadyFuture.class, this, super.toString());
        }
    }

    private abstract class MessageHandler<M>
    implements IgniteBiInClosure<UUID, M> {
        private static final long serialVersionUID = 0L;

        private MessageHandler() {
        }

        @Override
        public void apply(UUID nodeId, M msg) {
            ClusterNode node = GridCachePartitionExchangeManager.this.cctx.node(nodeId);
            if (node == null) {
                if (GridCachePartitionExchangeManager.this.log.isDebugEnabled()) {
                    GridCachePartitionExchangeManager.this.log.debug("Received message from failed node [node=" + nodeId + ", msg=" + msg + ']');
                }
                return;
            }
            if (GridCachePartitionExchangeManager.this.log.isDebugEnabled()) {
                GridCachePartitionExchangeManager.this.log.debug("Received message from node [node=" + nodeId + ", msg=" + msg + ']');
            }
            this.onMessage(node, msg);
        }

        protected abstract void onMessage(ClusterNode var1, M var2);
    }

    private static class ExchangeFutureSet
    extends GridListSet<GridDhtPartitionsExchangeFuture> {
        private static final long serialVersionUID = 0L;

        private ExchangeFutureSet() {
            super(new Comparator<GridDhtPartitionsExchangeFuture>(){

                @Override
                public int compare(GridDhtPartitionsExchangeFuture f1, GridDhtPartitionsExchangeFuture f2) {
                    AffinityTopologyVersion t1 = f1.exchangeId().topologyVersion();
                    AffinityTopologyVersion t2 = f2.exchangeId().topologyVersion();
                    assert (t1.topologyVersion() > 0L);
                    assert (t2.topologyVersion() > 0L);
                    return t2.compareTo(t1);
                }
            }, false);
        }

        @Override
        public synchronized GridDhtPartitionsExchangeFuture addx(GridDhtPartitionsExchangeFuture fut) {
            GridDhtPartitionsExchangeFuture cur = super.addx(fut);
            while (this.size() > 1000) {
                this.removeLast();
            }
            return cur == null ? fut : cur;
        }

        @Override
        @Nullable
        public synchronized GridDhtPartitionsExchangeFuture removex(GridDhtPartitionsExchangeFuture val) {
            return super.removex(val);
        }

        @Override
        public synchronized List<GridDhtPartitionsExchangeFuture> values() {
            return super.values();
        }

        @Override
        public synchronized String toString() {
            return S.toString(ExchangeFutureSet.class, this, super.toString());
        }
    }

    private class ResendTimeoutObject
    implements GridTimeoutObject {
        private final IgniteUuid timeoutId = IgniteUuid.randomUuid();
        private final long createTime = U.currentTimeMillis();
        private AtomicBoolean started = new AtomicBoolean();

        private ResendTimeoutObject() {
        }

        @Override
        public IgniteUuid timeoutId() {
            return this.timeoutId;
        }

        @Override
        public long endTime() {
            return this.createTime + GridCachePartitionExchangeManager.this.partResendTimeout;
        }

        @Override
        public void onTimeout() {
            GridCachePartitionExchangeManager.this.cctx.kernalContext().closure().runLocalSafe(new Runnable(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    if (!GridCachePartitionExchangeManager.this.busyLock.readLock().tryLock()) {
                        return;
                    }
                    try {
                        if (ResendTimeoutObject.this.started.compareAndSet(false, true)) {
                            GridCachePartitionExchangeManager.this.refreshPartitions();
                        }
                    }
                    finally {
                        GridCachePartitionExchangeManager.this.busyLock.readLock().unlock();
                        GridCachePartitionExchangeManager.this.cctx.time().removeTimeoutObject(ResendTimeoutObject.this);
                        GridCachePartitionExchangeManager.this.pendingResend.compareAndSet(ResendTimeoutObject.this, null);
                    }
                }
            });
        }

        public boolean started() {
            return this.started.get();
        }
    }

    private class ExchangeWorker
    extends GridWorker {
        private final LinkedBlockingDeque<GridDhtPartitionsExchangeFuture> futQ;
        private volatile boolean busy;

        private ExchangeWorker() {
            super(GridCachePartitionExchangeManager.this.cctx.gridName(), "partition-exchanger", GridCachePartitionExchangeManager.this.log);
            this.futQ = new LinkedBlockingDeque();
        }

        void addFuture(GridDhtPartitionsExchangeFuture exchFut) {
            assert (exchFut != null);
            if (!exchFut.dummy() || this.futQ.isEmpty() && !this.busy) {
                this.futQ.offer(exchFut);
            }
            if (this.log.isDebugEnabled()) {
                this.log.debug("Added exchange future to exchange worker: " + exchFut);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
            long timeout = GridCachePartitionExchangeManager.this.cctx.gridConfig().getNetworkTimeout();
            int cnt = 0;
            while (!this.isCancelled()) {
                GridDhtPartitionsExchangeFuture exchFut = null;
                ++cnt;
                try {
                    boolean forcePreload;
                    HashMap<Integer, GridDhtPreloaderAssignments> assignsMap;
                    block38: {
                        boolean preloadFinished = true;
                        for (GridCacheContext cacheCtx : GridCachePartitionExchangeManager.this.cctx.cacheContexts()) {
                            if (!(preloadFinished &= cacheCtx.preloader() != null && cacheCtx.preloader().syncFuture().isDone())) break;
                        }
                        if (!GridCachePartitionExchangeManager.this.cctx.kernalContext().clientNode() && this.futQ.isEmpty() && preloadFinished) {
                            timeout = GridCachePartitionExchangeManager.this.cctx.gridConfig().getNetworkTimeout();
                        }
                        if (this.log.isDebugEnabled()) {
                            HashSet<GridDhtPartitionsExchangeFuture> unfinished = new HashSet<GridDhtPartitionsExchangeFuture>();
                            for (GridDhtPartitionsExchangeFuture fut : GridCachePartitionExchangeManager.this.exchFuts.values()) {
                                if (fut.isDone()) continue;
                                unfinished.add(fut);
                            }
                            this.log.debug("Before waiting for exchange futures [futs" + unfinished + ", worker=" + this + ']');
                        }
                        if ((exchFut = (GridDhtPartitionsExchangeFuture)GridCachePartitionExchangeManager.this.poll(this.futQ, timeout, this)) == null) continue;
                        this.busy = true;
                        assignsMap = null;
                        boolean dummyReassign = exchFut.dummyReassign();
                        forcePreload = exchFut.forcePreload();
                        try {
                            if (this.isCancelled()) break;
                            if (!exchFut.dummy() && !exchFut.forcePreload()) {
                                GridCachePartitionExchangeManager.this.lastInitializedFut = exchFut;
                                exchFut.init();
                                int dumpedObjects = 0;
                                while (true) {
                                    try {
                                        exchFut.get(2L * GridCachePartitionExchangeManager.this.cctx.gridConfig().getNetworkTimeout(), TimeUnit.MILLISECONDS);
                                    }
                                    catch (IgniteFutureTimeoutCheckedException ignored) {
                                        U.warn(this.log, "Failed to wait for partition map exchange [topVer=" + exchFut.topologyVersion() + ", node=" + GridCachePartitionExchangeManager.this.cctx.localNodeId() + "]. " + "Dumping pending objects that might be the cause: ");
                                        if (dumpedObjects >= GridDhtPartitionsExchangeFuture.DUMP_PENDING_OBJECTS_THRESHOLD) continue;
                                        try {
                                            GridCachePartitionExchangeManager.this.dumpDebugInfo(exchFut.topologyVersion());
                                        }
                                        catch (Exception e) {
                                            U.error(this.log, "Failed to dump debug information: " + e, e);
                                        }
                                        if (IgniteSystemProperties.getBoolean("IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT", false)) {
                                            U.dumpThreads(this.log);
                                        }
                                        ++dumpedObjects;
                                        continue;
                                    }
                                    break;
                                }
                                if (this.log.isDebugEnabled()) {
                                    this.log.debug("After waiting for exchange future [exchFut=" + exchFut + ", worker=" + this + ']');
                                }
                                if (exchFut.exchangeId().nodeId().equals(GridCachePartitionExchangeManager.this.cctx.localNodeId())) {
                                    GridCachePartitionExchangeManager.this.lastRefresh.compareAndSet(-1L, U.currentTimeMillis());
                                }
                                boolean changed = false;
                                for (GridCacheContext cacheCtx : GridCachePartitionExchangeManager.this.cctx.cacheContexts()) {
                                    if (cacheCtx.isLocal()) continue;
                                    changed |= cacheCtx.topology().afterExchange(exchFut);
                                }
                                if (!GridCachePartitionExchangeManager.this.cctx.kernalContext().clientNode() && changed && this.futQ.isEmpty()) {
                                    GridCachePartitionExchangeManager.this.refreshPartitions();
                                }
                            } else {
                                if (this.log.isDebugEnabled()) {
                                    this.log.debug("Got dummy exchange (will reassign)");
                                }
                                if (!dummyReassign) {
                                    timeout = 0L;
                                    continue;
                                }
                            }
                            if (exchFut.skipPreload()) break block38;
                            assignsMap = new HashMap<Integer, GridDhtPreloaderAssignments>();
                            for (GridCacheContext cacheCtx : GridCachePartitionExchangeManager.this.cctx.cacheContexts()) {
                                long delay = cacheCtx.config().getRebalanceDelay();
                                GridDhtPreloaderAssignments assigns = null;
                                if (delay == 0L || forcePreload) {
                                    assigns = cacheCtx.preloader().assign(exchFut);
                                }
                                assignsMap.put(cacheCtx.cacheId(), assigns);
                            }
                        }
                        finally {
                            this.busy = false;
                            continue;
                        }
                    }
                    if (assignsMap == null) continue;
                    int size = assignsMap.size();
                    TreeMap orderMap = new TreeMap();
                    for (Map.Entry e : assignsMap.entrySet()) {
                        int cacheId = (Integer)e.getKey();
                        GridCacheContext cacheCtx = GridCachePartitionExchangeManager.this.cctx.cacheContext(cacheId);
                        int order = cacheCtx.config().getRebalanceOrder();
                        if (orderMap.get(order) == null) {
                            orderMap.put(order, new ArrayList(size));
                        }
                        ((List)orderMap.get(order)).add(cacheId);
                    }
                    Runnable r = null;
                    LinkedList<String> rebList = new LinkedList<String>();
                    boolean assignsCancelled = false;
                    for (Integer order : orderMap.descendingKeySet()) {
                        for (Integer cacheId : (List)orderMap.get(order)) {
                            Runnable cur;
                            GridCacheContext cacheCtx = GridCachePartitionExchangeManager.this.cctx.cacheContext(cacheId);
                            GridDhtPreloaderAssignments assigns = (GridDhtPreloaderAssignments)assignsMap.get(cacheId);
                            if (assigns != null) {
                                assignsCancelled |= assigns.cancelled();
                            }
                            if ((cur = cacheCtx.preloader().addAssignments(assigns, forcePreload, cnt, r, exchFut.forcedRebalanceFuture())) == null) continue;
                            rebList.add(U.maskName(cacheCtx.name()));
                            r = cur;
                        }
                    }
                    if (assignsCancelled) {
                        U.log(this.log, "Skipping rebalancing (obsolete exchange ID) [top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() + ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
                        continue;
                    }
                    if (r != null) {
                        Collections.reverse(rebList);
                        U.log(this.log, "Rebalancing scheduled [order=" + rebList + "]");
                        if (this.futQ.isEmpty()) {
                            U.log(this.log, "Rebalancing started [top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() + ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
                            r.run();
                            continue;
                        }
                        U.log(this.log, "Skipping rebalancing (obsolete exchange ID) [top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() + ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
                        continue;
                    }
                    U.log(this.log, "Skipping rebalancing (nothing scheduled) [top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() + ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
                }
                catch (IgniteInterruptedCheckedException e) {
                    throw e;
                }
                catch (IgniteClientDisconnectedCheckedException ignored) {
                    return;
                }
                catch (IgniteCheckedException e) {
                    U.error(this.log, "Failed to wait for completion of partition map exchange (preloading will not start): " + exchFut, e);
                }
            }
        }
    }
}

