/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.cache;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheExplicitLockSpan;
import org.apache.ignite.internal.processors.cache.GridCacheFuture;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCallback;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture;
import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheMappedVersion;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
import org.apache.ignite.internal.util.GridConcurrentFactory;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
import org.jsr166.ConcurrentLinkedDeque8;
import org.jsr166.ConcurrentLinkedHashMap;

public class GridCacheMvccManager
extends GridCacheSharedManagerAdapter {
    private static final int MAX_REMOVED_LOCKS = 10240;
    private static final int MAX_NESTED_LSNR_CALLS = IgniteSystemProperties.getInteger("IGNITE_MAX_NESTED_LISTENER_CALLS", 5);
    private final ThreadLocal<Deque<GridCacheMvccCandidate>> pending = new ThreadLocal();
    private ConcurrentMap<Long, GridCacheExplicitLockSpan> pendingExplicit;
    private GridBoundedConcurrentLinkedHashSet<GridCacheVersion> rmvLocks = new GridBoundedConcurrentLinkedHashSet(10240, 10240, 0.75f, 16, ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q);
    @GridToStringExclude
    private final ConcurrentMap<IgniteTxKey, GridDistributedCacheEntry> locked = GridConcurrentFactory.newMap();
    @GridToStringExclude
    private final ConcurrentMap<IgniteTxKey, GridDistributedCacheEntry> nearLocked = GridConcurrentFactory.newMap();
    @GridToStringExclude
    private final ConcurrentMap<GridCacheVersion, Collection<GridCacheMvccFuture<?>>> mvccFuts = GridConcurrentFactory.newMap();
    private final ConcurrentMap<GridCacheVersion, GridCacheAtomicFuture<?>> atomicFuts = new ConcurrentHashMap8();
    private final GridConcurrentHashSet<DataStreamerFuture> dataStreamerFuts = new GridConcurrentHashSet();
    private final ConcurrentMap<IgniteUuid, GridCacheFuture<?>> futs = new ConcurrentHashMap8();
    private final ConcurrentMap<GridCacheVersion, GridCacheVersion> near2dht = GridConcurrentFactory.newMap();
    private final ConcurrentLinkedDeque8<FinishLockFuture> finishFuts = new ConcurrentLinkedDeque8();
    private final ThreadLocal<Integer> nestedLsnrCalls = new ThreadLocal<Integer>(){

        @Override
        protected Integer initialValue() {
            return 0;
        }
    };
    private IgniteLogger exchLog;
    private volatile boolean stopping;
    @GridToStringExclude
    private final GridCacheMvccCallback cb = new GridCacheMvccCallback(){

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onOwnerChanged(final GridCacheEntryEx entry, final GridCacheMvccCandidate owner) {
            int nested = (Integer)GridCacheMvccManager.this.nestedLsnrCalls.get();
            if (nested < MAX_NESTED_LSNR_CALLS) {
                GridCacheMvccManager.this.nestedLsnrCalls.set(nested + 1);
                try {
                    GridCacheMvccManager.this.notifyOwnerChanged(entry, owner);
                }
                finally {
                    GridCacheMvccManager.this.nestedLsnrCalls.set(nested);
                }
            } else {
                GridCacheMvccManager.this.cctx.kernalContext().closure().runLocalSafe((Runnable)new GridPlainRunnable(){

                    @Override
                    public void run() {
                        GridCacheMvccManager.this.notifyOwnerChanged(entry, owner);
                    }
                }, true);
            }
        }

        @Override
        public void onLocked(GridDistributedCacheEntry entry) {
            if (entry.isNear()) {
                GridCacheMvccManager.this.nearLocked.put(entry.txKey(), entry);
            } else {
                GridCacheMvccManager.this.locked.put(entry.txKey(), entry);
            }
        }

        @Override
        public void onFreed(GridDistributedCacheEntry entry) {
            if (entry.isNear()) {
                GridCacheMvccManager.this.nearLocked.remove(entry.txKey());
            } else {
                GridCacheMvccManager.this.locked.remove(entry.txKey());
            }
        }
    };
    @GridToStringExclude
    private final GridLocalEventListener discoLsnr = new GridLocalEventListener(){

        @Override
        public void onEvent(Event evt) {
            assert (evt instanceof DiscoveryEvent);
            assert (evt.type() == 12 || evt.type() == 11);
            DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
            if (GridCacheMvccManager.this.log.isDebugEnabled()) {
                GridCacheMvccManager.this.log.debug("Processing node left [nodeId=" + discoEvt.eventNode().id() + "]");
            }
            for (GridCacheFuture<?> fut : GridCacheMvccManager.this.activeFutures()) {
                fut.onNodeLeft(discoEvt.eventNode().id());
            }
            for (GridCacheAtomicFuture cacheFut : GridCacheMvccManager.this.atomicFuts.values()) {
                GridCacheVersion futVer;
                cacheFut.onNodeLeft(discoEvt.eventNode().id());
                if (!cacheFut.isCancelled() && !cacheFut.isDone() || (futVer = cacheFut.version()) == null) continue;
                GridCacheMvccManager.this.atomicFuts.remove(futVer, cacheFut);
            }
        }
    };

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void notifyOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) {
        Collection futCol;
        assert (entry != null);
        if (this.log.isDebugEnabled()) {
            this.log.debug("Received owner changed callback [" + entry.key() + ", owner=" + owner + ']');
        }
        if (owner != null && (owner.local() || owner.nearLocal()) && (futCol = (Collection)this.mvccFuts.get(owner.version())) != null) {
            ArrayList futColCp;
            Collection collection = futCol;
            synchronized (collection) {
                futColCp = new ArrayList(futCol.size());
                futColCp.addAll(futCol);
            }
            for (GridCacheMvccFuture fut : futColCp) {
                GridCacheMvccFuture mvccFut;
                if (fut.isDone() || !(mvccFut = fut).onOwnerChanged(entry, owner)) continue;
                return;
            }
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Lock future not found for owner change callback (will try transaction futures) [owner=" + owner + ", entry=" + entry + ']');
        }
        if (this.cctx.tm().onOwnerChanged(entry, owner)) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Found transaction for changed owner: " + owner);
            }
        } else if (this.log.isDebugEnabled()) {
            this.log.debug("Failed to find transaction for changed owner: " + owner);
        }
        if (!this.finishFuts.isEmptyx()) {
            for (FinishLockFuture f : this.finishFuts) {
                f.recheck(entry);
            }
        }
    }

    @Override
    protected void start0() throws IgniteCheckedException {
        this.exchLog = this.cctx.logger(this.getClass().getName() + ".exchange");
        this.pendingExplicit = GridConcurrentFactory.newMap();
    }

    @Override
    protected void onKernalStart0(boolean reconnect) throws IgniteCheckedException {
        if (!reconnect) {
            this.cctx.gridEvents().addLocalEventListener(this.discoLsnr, 12, 11);
        }
    }

    @Override
    public void onKernalStop0(boolean cancel) {
        this.cctx.gridEvents().removeLocalEventListener(this.discoLsnr, new int[0]);
    }

    public GridCacheMvccCallback callback() {
        return this.cb;
    }

    public Collection<GridCacheExplicitLockSpan> activeExplicitLocks() {
        return this.pendingExplicit.values();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Collection<GridCacheFuture<?>> activeFutures() {
        ArrayList col = new ArrayList();
        Iterator i$ = this.mvccFuts.values().iterator();
        while (i$.hasNext()) {
            Collection futs;
            Collection collection = futs = (Collection)i$.next();
            synchronized (collection) {
                col.addAll(futs);
            }
        }
        col.addAll(this.futs.values());
        return col;
    }

    public void removeExplicitNodeLocks(UUID leftNodeId, AffinityTopologyVersion topVer) {
        for (GridDistributedCacheEntry entry : this.locked()) {
            try {
                entry.removeExplicitNodeLocks(leftNodeId);
                entry.context().evicts().touch(entry, topVer);
            }
            catch (GridCacheEntryRemovedException ignore) {
                if (!this.log.isDebugEnabled()) continue;
                this.log.debug("Attempted to remove node locks from removed entry in mvcc manager disco callback (will ignore): " + entry);
            }
        }
    }

    public void mapVersion(GridCacheVersion from, GridCacheVersion to) {
        assert (from != null);
        assert (to != null);
        GridCacheVersion old = this.near2dht.put(from, to);
        assert (old == null || old == to || old.equals(to));
        if (this.log.isDebugEnabled()) {
            this.log.debug("Added version mapping [from=" + from + ", to=" + to + ']');
        }
    }

    public GridCacheVersion mappedVersion(GridCacheVersion from) {
        assert (from != null);
        GridCacheVersion to = (GridCacheVersion)this.near2dht.get(from);
        if (this.log.isDebugEnabled()) {
            this.log.debug("Retrieved mapped version [from=" + from + ", to=" + to + ']');
        }
        return to;
    }

    public void onStop() {
        this.stopping = true;
        this.cancelClientFutures(this.stopError());
    }

    @Override
    public void onDisconnected(IgniteFuture reconnectFut) {
        IgniteClientDisconnectedCheckedException err = this.disconnectedError(reconnectFut);
        this.cancelClientFutures(err);
    }

    private void cancelClientFutures(IgniteCheckedException err) {
        for (GridCacheFuture<?> fut : this.activeFutures()) {
            ((GridFutureAdapter)((Object)fut)).onDone(err);
        }
        for (GridCacheAtomicFuture future : this.atomicFuts.values()) {
            ((GridFutureAdapter)((Object)future)).onDone(err);
        }
    }

    private IgniteClientDisconnectedCheckedException disconnectedError(@Nullable IgniteFuture<?> reconnectFut) {
        if (reconnectFut == null) {
            reconnectFut = this.cctx.kernalContext().cluster().clientReconnectFuture();
        }
        return new IgniteClientDisconnectedCheckedException(reconnectFut, "Operation has been cancelled (client node disconnected).");
    }

    private IgniteCheckedException stopError() {
        return new NodeStoppingException("Operation has been cancelled (node is stopping).");
    }

    public GridCacheVersion unmapVersion(GridCacheVersion from) {
        assert (from != null);
        GridCacheVersion to = (GridCacheVersion)this.near2dht.remove(from);
        if (this.log.isDebugEnabled()) {
            this.log.debug("Removed mapped version [from=" + from + ", to=" + to + ']');
        }
        return to;
    }

    public boolean addAtomicFuture(GridCacheVersion futVer, GridCacheAtomicFuture<?> fut) {
        IgniteInternalFuture old = this.atomicFuts.put(futVer, fut);
        assert (old == null) : "Old future is not null [futVer=" + futVer + ", fut=" + fut + ", old=" + old + ']';
        return this.onFutureAdded(fut);
    }

    public Collection<GridCacheAtomicFuture<?>> atomicFutures() {
        return this.atomicFuts.values();
    }

    public Collection<DataStreamerFuture> dataStreamerFutures() {
        return this.dataStreamerFuts;
    }

    @Nullable
    public IgniteInternalFuture<?> atomicFuture(GridCacheVersion futVer) {
        return (IgniteInternalFuture)this.atomicFuts.get(futVer);
    }

    @Nullable
    public IgniteInternalFuture<?> removeAtomicFuture(GridCacheVersion futVer) {
        return (IgniteInternalFuture)this.atomicFuts.remove(futVer);
    }

    public void addFuture(GridCacheFuture<?> fut, IgniteUuid futId) {
        GridCacheFuture<?> old = this.futs.put(futId, fut);
        assert (old == null) : old;
        this.onFutureAdded(fut);
    }

    public GridFutureAdapter addDataStreamerFuture(AffinityTopologyVersion topVer) {
        DataStreamerFuture fut = new DataStreamerFuture(topVer);
        boolean add = this.dataStreamerFuts.add(fut);
        assert (add);
        return fut;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean addFuture(final GridCacheMvccFuture<?> fut) {
        GridCacheVersion from;
        block14: {
            boolean dup;
            if (fut.isDone()) {
                fut.markNotTrackable();
                return true;
            }
            if (!fut.trackable()) {
                return true;
            }
            while (true) {
                boolean empty;
                Collection old;
                if ((old = (Collection)this.mvccFuts.get(fut.version())) == null) {
                    HashSet col = new HashSet<GridCacheMvccFuture<?>>(U.capacity(1), 0.75f){
                        {
                            super(x0, x1);
                            this.add(fut);
                        }

                        @Override
                        public int hashCode() {
                            return System.identityHashCode(this);
                        }

                        @Override
                        public boolean equals(Object obj) {
                            return obj == this;
                        }
                    };
                    old = this.mvccFuts.putIfAbsent(fut.version(), col);
                }
                if (old == null) break block14;
                dup = false;
                Collection collection = old;
                synchronized (collection) {
                    empty = old.isEmpty();
                    if (!empty) {
                        dup = !old.add(fut);
                    }
                }
                if (!empty) break;
                if (!this.mvccFuts.remove(fut.version(), old) || !this.log.isDebugEnabled()) continue;
                this.log.debug("Removed future list from futures map for lock version: " + fut.version());
            }
            if (dup) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Found duplicate future in futures map (will not add): " + fut);
                }
                return false;
            }
        }
        if (fut instanceof GridCacheMappedVersion && (from = ((GridCacheMappedVersion)((Object)fut)).mappedVersion()) != null) {
            this.mapVersion(from, fut.version());
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Added future to future map: " + fut);
        }
        if (fut.isDone()) {
            this.removeMvccFuture(fut);
        } else {
            this.onFutureAdded(fut);
        }
        return true;
    }

    private boolean onFutureAdded(IgniteInternalFuture<?> fut) {
        if (this.stopping) {
            ((GridFutureAdapter)fut).onDone(this.stopError());
            return false;
        }
        if (this.cctx.kernalContext().clientDisconnected()) {
            ((GridFutureAdapter)fut).onDone(this.disconnectedError(null));
            return false;
        }
        return true;
    }

    public void removeFuture(IgniteUuid futId) {
        this.futs.remove(futId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean removeMvccFuture(GridCacheMvccFuture<?> fut) {
        boolean empty;
        boolean rmv;
        if (!fut.trackable()) {
            return true;
        }
        Collection cur = (Collection)this.mvccFuts.get(fut.version());
        if (cur == null) {
            return false;
        }
        Collection collection = cur;
        synchronized (collection) {
            rmv = cur.remove(fut);
            empty = cur.isEmpty();
        }
        if (rmv) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Removed future from future map: " + fut);
            }
        } else if (this.log.isDebugEnabled()) {
            this.log.debug("Attempted to remove a non-registered future (has it been already removed?): " + fut);
        }
        if (empty && this.mvccFuts.remove(fut.version(), cur) && this.log.isDebugEnabled()) {
            this.log.debug("Removed future list from futures map for lock version: " + fut.version());
        }
        return rmv;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nullable
    public GridCacheMvccFuture<?> mvccFuture(GridCacheVersion ver, IgniteUuid futId) {
        Collection futs = (Collection)this.mvccFuts.get(ver);
        if (futs != null) {
            Collection collection = futs;
            synchronized (collection) {
                for (GridCacheMvccFuture fut : futs) {
                    if (!fut.futureId().equals(futId)) continue;
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Found future in futures map: " + fut);
                    }
                    return fut;
                }
            }
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Failed to find future in futures map [ver=" + ver + ", futId=" + futId + ']');
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nullable
    public Collection<GridCacheMvccFuture<?>> mvccFutures(GridCacheVersion ver) {
        Collection futs = (Collection)this.mvccFuts.get(ver);
        if (futs != null) {
            Collection collection = futs;
            synchronized (collection) {
                return new ArrayList(futs);
            }
        }
        return null;
    }

    @Nullable
    public GridCacheFuture future(IgniteUuid futId) {
        return (GridCacheFuture)this.futs.get(futId);
    }

    public boolean isRemoved(GridCacheContext cacheCtx, GridCacheVersion ver) {
        return !cacheCtx.isNear() && !cacheCtx.isLocal() && ver != null && this.rmvLocks.contains(ver);
    }

    public boolean addRemoved(GridCacheContext cacheCtx, GridCacheVersion ver) {
        if (cacheCtx.isNear() || cacheCtx.isLocal()) {
            return true;
        }
        boolean ret = this.rmvLocks.add(ver);
        if (this.log.isDebugEnabled()) {
            this.log.debug("Added removed lock version: " + ver);
        }
        return ret;
    }

    private Collection<GridDistributedCacheEntry> locked() {
        return F.concat(false, this.locked.values(), this.nearLocked.values());
    }

    public Collection<IgniteTxKey> lockedKeys() {
        return this.locked.keySet();
    }

    public Collection<IgniteTxKey> nearLockedKeys() {
        return this.nearLocked.keySet();
    }

    public Collection<GridCacheMvccCandidate> remoteCandidates() {
        ArrayList<GridCacheMvccCandidate> rmtCands = new ArrayList<GridCacheMvccCandidate>();
        for (GridDistributedCacheEntry entry : this.locked()) {
            rmtCands.addAll(entry.remoteMvccSnapshot(new GridCacheVersion[0]));
        }
        return rmtCands;
    }

    public Collection<GridCacheMvccCandidate> localCandidates() {
        ArrayList<GridCacheMvccCandidate> locCands = new ArrayList<GridCacheMvccCandidate>();
        for (GridDistributedCacheEntry entry : this.locked()) {
            try {
                locCands.addAll(entry.localCandidates(new GridCacheVersion[0]));
            }
            catch (GridCacheEntryRemovedException gridCacheEntryRemovedException) {}
        }
        return locCands;
    }

    public boolean addNext(GridCacheContext cacheCtx, GridCacheMvccCandidate cand) {
        assert (cand != null);
        assert (!cand.reentry()) : "Lock reentries should not be linked: " + cand;
        if (cacheCtx.isNear() || cand.singleImplicit()) {
            return true;
        }
        Deque<GridCacheMvccCandidate> queue = this.pending.get();
        if (queue == null) {
            queue = new ArrayDeque<GridCacheMvccCandidate>();
            this.pending.set(queue);
        }
        GridCacheMvccCandidate prev = null;
        if (!queue.isEmpty()) {
            prev = queue.getLast();
        }
        queue.add(cand);
        if (prev != null) {
            prev.next(cand);
            cand.previous(prev);
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Linked new candidate: " + cand);
        }
        return true;
    }

    public void contextReset() {
        this.pending.set(null);
    }

    public void addExplicitLock(long threadId, GridCacheMvccCandidate cand, AffinityTopologyVersion topVer) {
        while (true) {
            GridCacheExplicitLockSpan span;
            if ((span = (GridCacheExplicitLockSpan)this.pendingExplicit.get(cand.threadId())) == null) {
                span = new GridCacheExplicitLockSpan(topVer, cand);
                GridCacheExplicitLockSpan old = this.pendingExplicit.putIfAbsent(threadId, span);
                if (old == null) break;
                span = old;
            }
            if (span.addCandidate(topVer, cand)) break;
            this.pendingExplicit.remove(threadId, span);
        }
    }

    public void removeExplicitLock(GridCacheMvccCandidate cand) {
        GridCacheExplicitLockSpan span = (GridCacheExplicitLockSpan)this.pendingExplicit.get(cand.threadId());
        if (span == null) {
            return;
        }
        if (span.removeCandidate(cand)) {
            this.pendingExplicit.remove(cand.threadId(), span);
        }
    }

    public boolean isLockedByThread(IgniteTxKey key, long threadId) {
        if (threadId < 0L) {
            for (GridCacheExplicitLockSpan span : this.pendingExplicit.values()) {
                GridCacheMvccCandidate cand = span.candidate(key, null);
                if (cand == null || !cand.owner()) continue;
                return true;
            }
        } else {
            GridCacheExplicitLockSpan span = (GridCacheExplicitLockSpan)this.pendingExplicit.get(threadId);
            if (span != null) {
                GridCacheMvccCandidate cand = span.candidate(key, null);
                return cand != null && cand.owner();
            }
        }
        return false;
    }

    public void markExplicitOwner(IgniteTxKey key, long threadId) {
        assert (threadId > 0L);
        GridCacheExplicitLockSpan span = (GridCacheExplicitLockSpan)this.pendingExplicit.get(threadId);
        if (span != null) {
            span.markOwned(key);
        }
    }

    public GridCacheMvccCandidate removeExplicitLock(long threadId, IgniteTxKey key, @Nullable GridCacheVersion ver) {
        assert (threadId > 0L);
        GridCacheExplicitLockSpan span = (GridCacheExplicitLockSpan)this.pendingExplicit.get(threadId);
        if (span == null) {
            return null;
        }
        GridCacheMvccCandidate cand = span.removeCandidate(key, ver);
        if (cand != null && span.isEmpty()) {
            this.pendingExplicit.remove(cand.threadId(), span);
        }
        return cand;
    }

    @Nullable
    public GridCacheMvccCandidate explicitLock(long threadId, IgniteTxKey key) {
        if (threadId < 0L) {
            return this.explicitLock(key, null);
        }
        GridCacheExplicitLockSpan span = (GridCacheExplicitLockSpan)this.pendingExplicit.get(threadId);
        return span == null ? null : span.candidate(key, null);
    }

    @Nullable
    public GridCacheMvccCandidate explicitLock(IgniteTxKey key, @Nullable GridCacheVersion ver) {
        for (GridCacheExplicitLockSpan span : this.pendingExplicit.values()) {
            GridCacheMvccCandidate cand = span.candidate(key, ver);
            if (cand == null) continue;
            return cand;
        }
        return null;
    }

    @Nullable
    public AffinityTopologyVersion lastExplicitLockTopologyVersion(long threadId) {
        GridCacheExplicitLockSpan span = (GridCacheExplicitLockSpan)this.pendingExplicit.get(threadId);
        return span != null ? span.topologyVersion() : null;
    }

    @Override
    public void printMemoryStats() {
        X.println(">>> ", new Object[0]);
        X.println(">>> Mvcc manager memory stats [grid=" + this.cctx.gridName() + ']', new Object[0]);
        X.println(">>>   rmvLocksSize: " + this.rmvLocks.sizex(), new Object[0]);
        X.println(">>>   lockedSize: " + this.locked.size(), new Object[0]);
        X.println(">>>   futsSize: " + (this.mvccFuts.size() + this.futs.size()), new Object[0]);
        X.println(">>>   near2dhtSize: " + this.near2dht.size(), new Object[0]);
        X.println(">>>   finishFutsSize: " + this.finishFuts.sizex(), new Object[0]);
    }

    public IgniteInternalFuture<?> finishLocks(AffinityTopologyVersion topVer) {
        assert (topVer.compareTo(AffinityTopologyVersion.ZERO) > 0);
        return this.finishLocks(null, topVer);
    }

    public Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> unfinishedLocks(AffinityTopologyVersion topVer) {
        HashMap<IgniteTxKey, Collection<GridCacheMvccCandidate>> cands = new HashMap<IgniteTxKey, Collection<GridCacheMvccCandidate>>();
        if (!this.finishFuts.isEmptyx()) {
            for (FinishLockFuture fut : this.finishFuts) {
                if (!fut.topologyVersion().equals(topVer)) continue;
                cands.putAll(fut.pendingLocks());
            }
        }
        return cands;
    }

    public IgniteInternalFuture<?> finishExplicitLocks(AffinityTopologyVersion topVer) {
        GridCompoundFuture res = new GridCompoundFuture();
        for (GridCacheExplicitLockSpan span : this.pendingExplicit.values()) {
            AffinityTopologyVersion snapshot = span.topologyVersion();
            if (snapshot == null || snapshot.compareTo(topVer) >= 0) continue;
            res.add(span.releaseFuture());
        }
        res.markInitialized();
        return res;
    }

    public IgniteInternalFuture<?> finishAtomicUpdates(AffinityTopologyVersion topVer) {
        FinishAtomicUpdateFuture res = new FinishAtomicUpdateFuture();
        for (GridCacheAtomicFuture fut : this.atomicFuts.values()) {
            IgniteInternalFuture<Void> complete = fut.completeFuture(topVer);
            if (complete == null) continue;
            res.add(complete);
        }
        res.markInitialized();
        return res;
    }

    public IgniteInternalFuture<?> finishDataStreamerUpdates() {
        GridCompoundFuture res = new GridCompoundFuture();
        for (IgniteInternalFuture igniteInternalFuture : this.dataStreamerFuts) {
            res.add(igniteInternalFuture);
        }
        res.markInitialized();
        return res;
    }

    public IgniteInternalFuture<?> finishKeys(Collection<KeyCacheObject> keys, final int cacheId, AffinityTopologyVersion topVer) {
        if (!(keys instanceof Set)) {
            keys = new HashSet<KeyCacheObject>(keys);
        }
        final Collection<KeyCacheObject> keys0 = keys;
        return this.finishLocks((IgnitePredicate<GridDistributedCacheEntry>)new P1<GridDistributedCacheEntry>(){

            @Override
            public boolean apply(GridDistributedCacheEntry e) {
                return e.context().cacheId() == cacheId && keys0.contains(e.key());
            }
        }, topVer);
    }

    private IgniteInternalFuture<?> finishLocks(@Nullable IgnitePredicate<GridDistributedCacheEntry> filter, AffinityTopologyVersion topVer) {
        assert (topVer.topologyVersion() != 0L);
        if (topVer.equals(AffinityTopologyVersion.NONE)) {
            return new GridFinishedFuture();
        }
        final FinishLockFuture finishFut = new FinishLockFuture(filter == null ? this.locked() : F.view(this.locked(), filter), topVer);
        this.finishFuts.add(finishFut);
        finishFut.listen(new CI1<IgniteInternalFuture<?>>(){

            @Override
            public void apply(IgniteInternalFuture<?> e) {
                GridCacheMvccManager.this.finishFuts.remove(finishFut);
            }
        });
        finishFut.recheck();
        return finishFut;
    }

    public void recheckPendingLocks() {
        if (this.exchLog.isDebugEnabled()) {
            this.exchLog.debug("Rechecking pending locks for completion.");
        }
        if (!this.finishFuts.isEmptyx()) {
            for (FinishLockFuture fut : this.finishFuts) {
                fut.recheck();
            }
        }
    }

    private class DataStreamerFuture
    extends GridFutureAdapter<Void> {
        private static final long serialVersionUID = 0L;
        @GridToStringInclude
        private final AffinityTopologyVersion topVer;

        DataStreamerFuture(AffinityTopologyVersion topVer) {
            this.topVer = topVer;
        }

        @Override
        public boolean onDone(@Nullable Void res, @Nullable Throwable err) {
            if (super.onDone(res, err)) {
                GridCacheMvccManager.this.dataStreamerFuts.remove(this);
                return true;
            }
            return false;
        }

        @Override
        public String toString() {
            return S.toString(DataStreamerFuture.class, this, super.toString());
        }
    }

    private static class FinishAtomicUpdateFuture
    extends GridCompoundFuture<Object, Object> {
        private static final long serialVersionUID = 0L;

        private FinishAtomicUpdateFuture() {
        }

        @Override
        protected boolean ignoreFailure(Throwable err) {
            Class<?> cls = err.getClass();
            return ClusterTopologyCheckedException.class.isAssignableFrom(cls) || CachePartialUpdateCheckedException.class.isAssignableFrom(cls);
        }
    }

    private class FinishLockFuture
    extends GridFutureAdapter<Object> {
        private static final long serialVersionUID = 0L;
        @GridToStringInclude
        private final AffinityTopologyVersion topVer;
        @GridToStringInclude
        private final Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> pendingLocks = new ConcurrentHashMap8<IgniteTxKey, Collection<GridCacheMvccCandidate>>();

        FinishLockFuture(Iterable<GridDistributedCacheEntry> entries, AffinityTopologyVersion topVer) {
            assert (topVer.compareTo(AffinityTopologyVersion.ZERO) > 0);
            this.topVer = topVer;
            for (GridCacheEntryEx gridCacheEntryEx : entries) {
                try {
                    Collection<GridCacheMvccCandidate> locs = gridCacheEntryEx.localCandidates(new GridCacheVersion[0]);
                    if (F.isEmpty(locs)) continue;
                    ConcurrentLinkedQueue<GridCacheMvccCandidate> cands = new ConcurrentLinkedQueue<GridCacheMvccCandidate>();
                    cands.addAll(F.view(locs, this.versionFilter()));
                    if (F.isEmpty(cands)) continue;
                    this.pendingLocks.put(gridCacheEntryEx.txKey(), cands);
                }
                catch (GridCacheEntryRemovedException ignored) {
                    if (!GridCacheMvccManager.this.exchLog.isDebugEnabled()) continue;
                    GridCacheMvccManager.this.exchLog.debug("Got removed entry when adding it to finish lock future (will ignore): " + gridCacheEntryEx);
                }
            }
            if (GridCacheMvccManager.this.exchLog.isDebugEnabled()) {
                GridCacheMvccManager.this.exchLog.debug("Pending lock set [topVer=" + topVer + ", locks=" + this.pendingLocks + ']');
            }
        }

        AffinityTopologyVersion topologyVersion() {
            return this.topVer;
        }

        Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> pendingLocks() {
            return this.pendingLocks;
        }

        private IgnitePredicate<GridCacheMvccCandidate> versionFilter() {
            assert (this.topVer.topologyVersion() > 0L);
            return new P1<GridCacheMvccCandidate>(){

                @Override
                public boolean apply(GridCacheMvccCandidate c) {
                    assert (c.nearLocal() || c.dhtLocal());
                    return c.topologyVersion().equals(AffinityTopologyVersion.ZERO) || c.topologyVersion().compareTo(FinishLockFuture.this.topVer) < 0;
                }
            };
        }

        void recheck() {
            Iterator<IgniteTxKey> it = this.pendingLocks.keySet().iterator();
            while (it.hasNext()) {
                IgniteTxKey key = it.next();
                GridCacheContext cacheCtx = GridCacheMvccManager.this.cctx.cacheContext(key.cacheId());
                GridCacheEntryEx entry = cacheCtx.cache().peekEx(key.key());
                if (entry == null) {
                    it.remove();
                    continue;
                }
                this.recheck(entry);
            }
            if (GridCacheMvccManager.this.log.isDebugEnabled()) {
                GridCacheMvccManager.this.log.debug("After rechecking finished future: " + this);
            }
            if (this.pendingLocks.isEmpty()) {
                if (GridCacheMvccManager.this.exchLog.isDebugEnabled()) {
                    GridCacheMvccManager.this.exchLog.debug("Finish lock future is done: " + this);
                }
                this.onDone();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void recheck(@Nullable GridCacheEntryEx entry) {
            Collection<GridCacheMvccCandidate> cands;
            if (entry == null) {
                return;
            }
            if (GridCacheMvccManager.this.exchLog.isDebugEnabled()) {
                GridCacheMvccManager.this.exchLog.debug("Rechecking entry for completion [entry=" + entry + ", finFut=" + this + ']');
            }
            if ((cands = this.pendingLocks.get(entry.txKey())) != null) {
                Collection<GridCacheMvccCandidate> collection = cands;
                synchronized (collection) {
                    Iterator<GridCacheMvccCandidate> it = cands.iterator();
                    while (it.hasNext()) {
                        GridCacheMvccCandidate cand = it.next();
                        if (!cand.removed()) continue;
                        it.remove();
                    }
                    if (cands.isEmpty()) {
                        this.pendingLocks.remove(entry.txKey());
                    }
                    if (this.pendingLocks.isEmpty()) {
                        this.onDone();
                        if (GridCacheMvccManager.this.exchLog.isDebugEnabled()) {
                            GridCacheMvccManager.this.exchLog.debug("Finish lock future is done: " + this);
                        }
                    }
                }
            }
        }

        @Override
        public String toString() {
            if (!this.pendingLocks.isEmpty()) {
                HashMap txs = new HashMap(1, 1.0f);
                for (Collection<GridCacheMvccCandidate> cands : this.pendingLocks.values()) {
                    for (GridCacheMvccCandidate c : cands) {
                        txs.put(c.version(), GridCacheMvccManager.this.cctx.tm().tx(c.version()));
                    }
                }
                return S.toString(FinishLockFuture.class, this, "txs=" + txs + ", super=" + super.toString());
            }
            return S.toString(FinishLockFuture.class, this, super.toString());
        }
    }
}

