/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.cache.distributed.dht;

import java.util.AbstractSet;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import javax.cache.Cache;
import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheOperationContext;
import org.apache.ignite.internal.processors.cache.CachePeekModes;
import org.apache.ignite.internal.processors.cache.EntryGetResult;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheClearAllRunnable;
import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.GridCachePreloader;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.ReaderArguments;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTtlUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.dht.CacheGetFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedConcurrentMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtGetFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtGetSingleFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtOffHeapCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopologyImpl;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.dr.GridDrType;
import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridInClosure3;
import org.apache.ignite.internal.util.lang.GridIteratorAdapter;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.CI3;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;

public abstract class GridDhtCacheAdapter<K, V>
extends GridDistributedCacheAdapter<K, V> {
    private static final long serialVersionUID = 0L;
    private GridDhtPartitionTopologyImpl top;
    protected GridCachePreloader preldr;
    private ThreadLocal<IgniteBiTuple<IgniteUuid, GridDhtTopologyFuture>> multiTxHolder = new ThreadLocal();
    private ConcurrentMap<IgniteUuid, MultiUpdateFuture> multiTxFuts = new ConcurrentHashMap8<IgniteUuid, MultiUpdateFuture>();

    protected GridDhtCacheAdapter() {
    }

    protected final void processNearGetResponse(UUID nodeId, GridNearGetResponse res) {
        CacheGetFuture fut;
        if (this.log.isDebugEnabled()) {
            this.log.debug("Processing near get response [nodeId=" + nodeId + ", res=" + res + ']');
        }
        if ((fut = (CacheGetFuture)((Object)this.ctx.mvcc().future(res.futureId()))) == null) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Failed to find future for get response [sender=" + nodeId + ", res=" + res + ']');
            }
            return;
        }
        fut.onResult(nodeId, res);
    }

    protected void processNearSingleGetResponse(UUID nodeId, GridNearSingleGetResponse res) {
        GridPartitionedSingleGetFuture fut;
        if (this.log.isDebugEnabled()) {
            this.log.debug("Processing near get response [nodeId=" + nodeId + ", res=" + res + ']');
        }
        if ((fut = (GridPartitionedSingleGetFuture)this.ctx.mvcc().future(new IgniteUuid(IgniteUuid.VM_ID, res.futureId()))) == null) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Failed to find future for get response [sender=" + nodeId + ", res=" + res + ']');
            }
            return;
        }
        fut.onResult(nodeId, res);
    }

    protected GridDhtCacheAdapter(GridCacheContext<K, V> ctx) {
        this(ctx, new GridCachePartitionedConcurrentMap(ctx));
    }

    protected GridDhtCacheAdapter(GridCacheContext<K, V> ctx, GridCacheConcurrentMap map) {
        super(ctx, map);
    }

    @Override
    protected void init() {
        super.init();
        this.top = new GridDhtPartitionTopologyImpl(this.ctx, this.entryFactory());
    }

    @Override
    public void start() throws IgniteCheckedException {
        super.start();
        this.ctx.io().addHandler(this.ctx.cacheId(), GridCacheTtlUpdateRequest.class, (IgniteBiInClosure<UUID, ? extends GridCacheMessage>)new CI2<UUID, GridCacheTtlUpdateRequest>(){

            @Override
            public void apply(UUID nodeId, GridCacheTtlUpdateRequest req) {
                GridDhtCacheAdapter.this.processTtlUpdateRequest(req);
            }
        });
    }

    @Override
    public void stop() {
        super.stop();
        if (this.preldr != null) {
            this.preldr.stop();
        }
        this.preldr = null;
        this.top = null;
    }

    @Override
    public void onReconnected() {
        super.onReconnected();
        this.ctx.affinity().onReconnected();
        this.top.onReconnected();
        if (this.preldr != null) {
            this.preldr.onReconnected();
        }
    }

    @Override
    public void onKernalStart() throws IgniteCheckedException {
        super.onKernalStart();
        if (this.preldr != null) {
            this.preldr.onKernalStart();
        }
    }

    @Override
    public void onKernalStop() {
        super.onKernalStop();
        if (this.preldr != null) {
            this.preldr.onKernalStop();
        }
    }

    @Override
    public void printMemoryStats() {
        super.printMemoryStats();
        this.top.printMemoryStats(1024);
    }

    @Override
    protected GridCacheMapEntryFactory entryFactory() {
        return new GridCacheMapEntryFactory(){

            @Override
            public GridCacheMapEntry create(GridCacheContext ctx, AffinityTopologyVersion topVer, KeyCacheObject key, int hash, CacheObject val) {
                if (ctx.useOffheapEntry()) {
                    return new GridDhtOffHeapCacheEntry(ctx, topVer, key, hash, val);
                }
                return new GridDhtCacheEntry(ctx, topVer, key, hash, val);
            }
        };
    }

    public abstract GridNearCacheAdapter<K, V> near();

    public GridDhtPartitionTopology topology() {
        return this.top;
    }

    @Override
    public GridCachePreloader preloader() {
        return this.preldr;
    }

    public GridDhtPreloader dhtPreloader() {
        assert (this.preldr instanceof GridDhtPreloader);
        return (GridDhtPreloader)this.preldr;
    }

    @Nullable
    public GridDhtTopologyFuture multiUpdateTopologyFuture() {
        IgniteBiTuple<IgniteUuid, GridDhtTopologyFuture> tup = this.multiTxHolder.get();
        return tup == null ? null : tup.get2();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public AffinityTopologyVersion beginMultiUpdate() throws IgniteCheckedException {
        GridDhtTopologyFuture topFut;
        AffinityTopologyVersion topVer;
        IgniteBiTuple<IgniteUuid, GridDhtTopologyFuture> tup = this.multiTxHolder.get();
        if (tup != null) {
            throw new IgniteCheckedException("Nested multi-update locks are not supported");
        }
        this.top.readLock();
        try {
            IgniteUuid lockId = IgniteUuid.fromUuid(this.ctx.localNodeId());
            topVer = this.top.topologyVersion();
            MultiUpdateFuture fut = new MultiUpdateFuture(topVer);
            MultiUpdateFuture old = this.multiTxFuts.putIfAbsent(lockId, fut);
            assert (old == null);
            topFut = this.top.topologyVersionFuture();
            this.multiTxHolder.set(F.t(lockId, topFut));
        }
        finally {
            this.top.readUnlock();
        }
        topFut.get();
        return topVer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void endMultiUpdate() throws IgniteCheckedException {
        IgniteBiTuple<IgniteUuid, GridDhtTopologyFuture> tup = this.multiTxHolder.get();
        if (tup == null) {
            throw new IgniteCheckedException("Multi-update was not started or released twice.");
        }
        this.top.readLock();
        try {
            IgniteUuid lockId = tup.get1();
            MultiUpdateFuture multiFut = (MultiUpdateFuture)this.multiTxFuts.remove(lockId);
            this.multiTxHolder.set(null);
            multiFut.onDone(lockId);
        }
        finally {
            this.top.readUnlock();
        }
    }

    @Nullable
    public IgniteInternalFuture<?> multiUpdateFinishFuture(AffinityTopologyVersion topVer) {
        GridCompoundFuture fut = null;
        for (MultiUpdateFuture multiFut : this.multiTxFuts.values()) {
            if (multiFut.topologyVersion().compareTo(topVer) > 0) continue;
            if (fut == null) {
                fut = new GridCompoundFuture();
            }
            fut.add(multiFut);
        }
        if (fut != null) {
            fut.markInitialized();
        }
        return fut;
    }

    @Nullable
    public GridDhtCacheEntry peekExx(KeyCacheObject key) {
        return (GridDhtCacheEntry)this.peekEx(key);
    }

    @Override
    public GridCacheEntryEx entryEx(KeyCacheObject key, boolean touch) throws GridDhtInvalidPartitionException {
        return super.entryEx(key, touch);
    }

    @Override
    public GridCacheEntryEx entryEx(KeyCacheObject key, AffinityTopologyVersion topVer) throws GridDhtInvalidPartitionException {
        return super.entryEx(key, topVer);
    }

    public GridDhtCacheEntry entryExx(KeyCacheObject key) throws GridDhtInvalidPartitionException {
        return (GridDhtCacheEntry)this.entryEx(key);
    }

    public GridDhtCacheEntry entryExx(KeyCacheObject key, AffinityTopologyVersion topVer) throws GridDhtInvalidPartitionException {
        return (GridDhtCacheEntry)this.entryEx(key, topVer);
    }

    protected GridDistributedCacheEntry createEntry(KeyCacheObject key) {
        return new GridDhtDetachedCacheEntry(this.ctx, key, key.hashCode(), null, null, 0);
    }

    @Override
    public void localLoad(Collection<? extends K> keys, ExpiryPolicy plc, boolean keepBinary) throws IgniteCheckedException {
        if (this.ctx.store().isLocal()) {
            super.localLoad(keys, plc, keepBinary);
            return;
        }
        final GridCacheVersion ver0 = this.ctx.shared().versions().nextForLoad(this.topology().topologyVersion());
        final boolean replicate = this.ctx.isDrEnabled();
        final AffinityTopologyVersion topVer = this.ctx.affinity().affinityTopologyVersion();
        final ExpiryPolicy plc0 = plc != null ? plc : this.ctx.expiry();
        Collection<KeyCacheObject> keys0 = this.ctx.cacheKeysView(keys);
        this.ctx.store().loadAll(null, keys0, (IgniteBiInClosure<KeyCacheObject, Object>)new CI2<KeyCacheObject, Object>(){

            @Override
            public void apply(KeyCacheObject key, Object val) {
                GridDhtCacheAdapter.this.loadEntry(key, val, ver0, null, topVer, replicate, plc0);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void localLoadCache(final IgniteBiPredicate<K, V> p, Object[] args) throws IgniteCheckedException {
        ExpiryPolicy plc;
        if (this.ctx.store().isLocal()) {
            super.localLoadCache(p, args);
            return;
        }
        final GridCacheVersion ver0 = this.ctx.shared().versions().nextForLoad(this.topology().topologyVersion());
        final boolean replicate = this.ctx.isDrEnabled();
        final AffinityTopologyVersion topVer = this.ctx.affinity().affinityTopologyVersion();
        CacheOperationContext opCtx = this.ctx.operationContextPerCall();
        ExpiryPolicy plc0 = opCtx != null ? opCtx.expiry() : null;
        ExpiryPolicy expiryPolicy = plc = plc0 != null ? plc0 : this.ctx.expiry();
        if (p != null) {
            this.ctx.kernalContext().resource().injectGeneric(p);
        }
        try {
            this.ctx.store().loadCache((GridInClosure3<KeyCacheObject, Object, GridCacheVersion>)new CI3<KeyCacheObject, Object, GridCacheVersion>(){

                @Override
                public void apply(KeyCacheObject key, Object val, @Nullable GridCacheVersion ver) {
                    assert (ver == null);
                    GridDhtCacheAdapter.this.loadEntry(key, val, ver0, p, topVer, replicate, plc);
                }
            }, args);
        }
        finally {
            if (p instanceof PlatformCacheEntryFilter) {
                ((PlatformCacheEntryFilter)p).onClose();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void loadEntry(KeyCacheObject key, Object val, GridCacheVersion ver, @Nullable IgniteBiPredicate<K, V> p, AffinityTopologyVersion topVer, boolean replicate, @Nullable ExpiryPolicy plc) {
        block15: {
            if (p != null && !p.apply(key.value(this.ctx.cacheObjectContext(), false), val)) {
                return;
            }
            try {
                GridDhtLocalPartition part = this.top.localPartition(this.ctx.affinity().partition(key), AffinityTopologyVersion.NONE, true);
                if (part.reserve()) {
                    GridCacheEntryEx entry = null;
                    try {
                        long ttl = CU.ttlForLoad(plc);
                        if (ttl == -2L) {
                            return;
                        }
                        CacheObject cacheVal = this.ctx.toCacheObject(val);
                        entry = this.entryEx(key, false);
                        entry.initialValue(cacheVal, ver, ttl, -1L, false, topVer, replicate ? GridDrType.DR_LOAD : GridDrType.DR_NONE, false);
                        break block15;
                    }
                    catch (IgniteCheckedException e) {
                        throw new IgniteException("Failed to put cache value: " + entry, e);
                    }
                    catch (GridCacheEntryRemovedException ignore) {
                        if (this.log.isDebugEnabled()) {
                            this.log.debug("Got removed entry during loadCache (will ignore): " + entry);
                        }
                        break block15;
                    }
                    finally {
                        if (entry != null) {
                            entry.context().evicts().touch(entry, topVer);
                        }
                        part.release();
                    }
                }
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Will node load entry into cache (partition is invalid): " + part);
                }
            }
            catch (GridDhtInvalidPartitionException e) {
                if (!this.log.isDebugEnabled()) break block15;
                this.log.debug("Ignoring entry for partition that does not belong [key=" + key + ", val=" + val + ", err=" + e + ']');
            }
        }
    }

    @Override
    public int primarySize() {
        return (int)this.primarySizeLong();
    }

    @Override
    public long primarySizeLong() {
        long sum = 0L;
        AffinityTopologyVersion topVer = this.ctx.affinity().affinityTopologyVersion();
        for (GridDhtLocalPartition p : this.topology().currentLocalPartitions()) {
            if (!p.primary(topVer)) continue;
            sum += (long)p.publicSize();
        }
        return sum;
    }

    @Override
    public IgniteInternalFuture<Map<K, V>> getAllAsync(@Nullable Collection<? extends K> keys, boolean forcePrimary, boolean skipTx, @Nullable UUID subjId, String taskName, boolean deserializeBinary, boolean skipVals, boolean canRemap, boolean needVer) {
        CacheOperationContext opCtx = this.ctx.operationContextPerCall();
        return this.getAllAsync(keys, null, opCtx == null || !opCtx.skipStore(), false, subjId, taskName, deserializeBinary, forcePrimary, null, skipVals, canRemap, needVer);
    }

    IgniteInternalFuture<Map<KeyCacheObject, EntryGetResult>> getDhtAllAsync(Collection<KeyCacheObject> keys, @Nullable ReaderArguments readerArgs, boolean readThrough, @Nullable UUID subjId, String taskName, @Nullable IgniteCacheExpiryPolicy expiry, boolean skipVals, boolean canRemap) {
        return this.getAllAsync0(keys, readerArgs, readThrough, false, subjId, taskName, false, expiry, skipVals, true, canRemap, true);
    }

    public GridDhtFuture<Collection<GridCacheEntryInfo>> getDhtAsync(UUID reader, long msgId, Map<KeyCacheObject, Boolean> keys, boolean readThrough, AffinityTopologyVersion topVer, @Nullable UUID subjId, int taskNameHash, @Nullable IgniteCacheExpiryPolicy expiry, boolean skipVals) {
        GridDhtGetFuture fut = new GridDhtGetFuture(this.ctx, msgId, reader, keys, readThrough, topVer, subjId, taskNameHash, expiry, skipVals);
        fut.init();
        return fut;
    }

    private IgniteInternalFuture<GridCacheEntryInfo> getDhtSingleAsync(UUID nodeId, long msgId, KeyCacheObject key, boolean addRdr, boolean readThrough, AffinityTopologyVersion topVer, @Nullable UUID subjId, int taskNameHash, @Nullable IgniteCacheExpiryPolicy expiry, boolean skipVals) {
        GridDhtGetSingleFuture fut = new GridDhtGetSingleFuture(this.ctx, msgId, nodeId, key, addRdr, readThrough, topVer, subjId, taskNameHash, expiry, skipVals);
        fut.init();
        return fut;
    }

    protected void processNearSingleGetRequest(final UUID nodeId, final GridNearSingleGetRequest req) {
        assert (this.ctx.affinityNode());
        final GridCacheAdapter.CacheExpiryPolicy expiryPlc = GridCacheAdapter.CacheExpiryPolicy.fromRemote(req.createTtl(), req.accessTtl());
        IgniteInternalFuture<GridCacheEntryInfo> fut = this.getDhtSingleAsync(nodeId, req.messageId(), req.key(), req.addReader(), req.readThrough(), req.topologyVersion(), req.subjectId(), req.taskNameHash(), expiryPlc, req.skipValues());
        fut.listen((IgniteInClosure<IgniteInternalFuture<GridCacheEntryInfo>>)new CI1<IgniteInternalFuture<GridCacheEntryInfo>>(){

            @Override
            public void apply(IgniteInternalFuture<GridCacheEntryInfo> f) {
                GridNearSingleGetResponse res;
                GridDhtFuture fut = (GridDhtFuture)f;
                try {
                    GridCacheEntryInfo info = (GridCacheEntryInfo)fut.get();
                    if (F.isEmpty(fut.invalidPartitions())) {
                        Message res0 = null;
                        if (info != null) {
                            if (req.needEntryInfo()) {
                                info.key(null);
                                res0 = info;
                            } else {
                                res0 = req.needVersion() ? new CacheVersionedValue(info.value(), info.version()) : info.value();
                            }
                        }
                        res = new GridNearSingleGetResponse(GridDhtCacheAdapter.this.ctx.cacheId(), req.futureId(), req.topologyVersion(), res0, false, req.addDeploymentInfo());
                        if (info != null && req.skipValues()) {
                            res.setContainsValue();
                        }
                    } else {
                        AffinityTopologyVersion topVer = GridDhtCacheAdapter.this.ctx.shared().exchange().readyAffinityVersion();
                        assert (topVer.compareTo(req.topologyVersion()) >= 0) : "Wrong ready topology version for invalid partitions response [topVer=" + topVer + ", req=" + req + ']';
                        res = new GridNearSingleGetResponse(GridDhtCacheAdapter.this.ctx.cacheId(), req.futureId(), topVer, null, true, req.addDeploymentInfo());
                    }
                }
                catch (NodeStoppingException ignored) {
                    return;
                }
                catch (IgniteCheckedException e) {
                    U.error(GridDhtCacheAdapter.this.log, "Failed processing get request: " + req, e);
                    res = new GridNearSingleGetResponse(GridDhtCacheAdapter.this.ctx.cacheId(), req.futureId(), req.topologyVersion(), null, false, req.addDeploymentInfo());
                    res.error(e);
                }
                try {
                    GridDhtCacheAdapter.this.ctx.io().send(nodeId, (GridCacheMessage)res, GridDhtCacheAdapter.this.ctx.ioPolicy());
                }
                catch (IgniteCheckedException e) {
                    U.error(GridDhtCacheAdapter.this.log, "Failed to send get response to node (is node still alive?) [nodeId=" + nodeId + ",req=" + req + ", res=" + res + ']', e);
                }
                GridDhtCacheAdapter.this.sendTtlUpdateRequest(expiryPlc);
            }
        });
    }

    protected void processNearGetRequest(final UUID nodeId, final GridNearGetRequest req) {
        assert (this.ctx.affinityNode());
        assert (!req.reload()) : req;
        final GridCacheAdapter.CacheExpiryPolicy expiryPlc = GridCacheAdapter.CacheExpiryPolicy.fromRemote(req.createTtl(), req.accessTtl());
        GridDhtFuture<Collection<GridCacheEntryInfo>> fut = this.getDhtAsync(nodeId, req.messageId(), req.keys(), req.readThrough(), req.topologyVersion(), req.subjectId(), req.taskNameHash(), expiryPlc, req.skipValues());
        fut.listen(new CI1<IgniteInternalFuture<Collection<GridCacheEntryInfo>>>(){

            @Override
            public void apply(IgniteInternalFuture<Collection<GridCacheEntryInfo>> f) {
                GridNearGetResponse res = new GridNearGetResponse(GridDhtCacheAdapter.this.ctx.cacheId(), req.futureId(), req.miniId(), req.version(), req.deployInfo() != null);
                GridDhtFuture fut = (GridDhtFuture)f;
                try {
                    Collection entries = (Collection)fut.get();
                    res.entries(entries);
                }
                catch (NodeStoppingException ignored) {
                    return;
                }
                catch (IgniteCheckedException e) {
                    U.error(GridDhtCacheAdapter.this.log, "Failed processing get request: " + req, e);
                    res.error(e);
                }
                if (!F.isEmpty(fut.invalidPartitions())) {
                    res.invalidPartitions(fut.invalidPartitions(), GridDhtCacheAdapter.this.ctx.shared().exchange().readyAffinityVersion());
                } else {
                    res.invalidPartitions(fut.invalidPartitions(), req.topologyVersion());
                }
                try {
                    GridDhtCacheAdapter.this.ctx.io().send(nodeId, (GridCacheMessage)res, GridDhtCacheAdapter.this.ctx.ioPolicy());
                }
                catch (IgniteCheckedException e) {
                    U.error(GridDhtCacheAdapter.this.log, "Failed to send get response to node (is node still alive?) [nodeId=" + nodeId + ",req=" + req + ", res=" + res + ']', e);
                }
                GridDhtCacheAdapter.this.sendTtlUpdateRequest(expiryPlc);
            }
        });
    }

    public void sendTtlUpdateRequest(final @Nullable IgniteCacheExpiryPolicy expiryPlc) {
        if (expiryPlc != null && expiryPlc.entries() != null) {
            this.ctx.closures().runLocalSafe(new Runnable(){

                @Override
                public void run() {
                    Map<KeyCacheObject, GridCacheVersion> entries = expiryPlc.entries();
                    assert (entries != null && !entries.isEmpty());
                    HashMap<ClusterNode, GridCacheTtlUpdateRequest> reqMap = new HashMap<ClusterNode, GridCacheTtlUpdateRequest>();
                    AffinityTopologyVersion topVer = GridDhtCacheAdapter.this.ctx.shared().exchange().readyAffinityVersion();
                    for (Map.Entry<KeyCacheObject, GridCacheVersion> e : entries.entrySet()) {
                        List<ClusterNode> list = GridDhtCacheAdapter.this.ctx.affinity().nodesByKey(e.getKey(), topVer);
                        for (int i = 0; i < list.size(); ++i) {
                            ClusterNode node = list.get(i);
                            if (node.isLocal()) continue;
                            GridCacheTtlUpdateRequest req = (GridCacheTtlUpdateRequest)reqMap.get(node);
                            if (req == null) {
                                req = new GridCacheTtlUpdateRequest(GridDhtCacheAdapter.this.ctx.cacheId(), topVer, expiryPlc.forAccess());
                                reqMap.put(node, req);
                            }
                            req.addEntry(e.getKey(), e.getValue());
                        }
                    }
                    Map<UUID, Collection<IgniteBiTuple<KeyCacheObject, GridCacheVersion>>> rdrs = expiryPlc.readers();
                    if (rdrs != null) {
                        assert (!rdrs.isEmpty());
                        for (Map.Entry<Object, Object> entry : rdrs.entrySet()) {
                            ClusterNode node = GridDhtCacheAdapter.this.ctx.node((UUID)entry.getKey());
                            if (node == null) continue;
                            GridCacheTtlUpdateRequest req = (GridCacheTtlUpdateRequest)reqMap.get(node);
                            if (req == null) {
                                req = new GridCacheTtlUpdateRequest(GridDhtCacheAdapter.this.ctx.cacheId(), topVer, expiryPlc.forAccess());
                                reqMap.put(node, req);
                            }
                            for (IgniteBiTuple t : (Collection)entry.getValue()) {
                                req.addNearEntry((KeyCacheObject)t.get1(), (GridCacheVersion)t.get2());
                            }
                        }
                    }
                    for (Map.Entry<Object, Object> entry : reqMap.entrySet()) {
                        try {
                            GridDhtCacheAdapter.this.ctx.io().send((ClusterNode)entry.getKey(), (GridCacheMessage)entry.getValue(), GridDhtCacheAdapter.this.ctx.ioPolicy());
                        }
                        catch (IgniteCheckedException e) {
                            if (e instanceof ClusterTopologyCheckedException) {
                                if (!GridDhtCacheAdapter.this.log.isDebugEnabled()) continue;
                                GridDhtCacheAdapter.this.log.debug("Failed to send TTC update request, node left: " + entry.getKey());
                                continue;
                            }
                            U.error(GridDhtCacheAdapter.this.log, "Failed to send TTL update request.", e);
                        }
                    }
                }
            });
        }
    }

    private void processTtlUpdateRequest(GridCacheTtlUpdateRequest req) {
        if (req.keys() != null) {
            this.updateTtl(this, req.keys(), req.versions(), req.ttl());
        }
        if (req.nearKeys() != null) {
            GridNearCacheAdapter<K, V> near = this.near();
            assert (near != null);
            this.updateTtl(near, req.nearKeys(), req.nearVersions(), req.ttl());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateTtl(GridCacheAdapter<K, V> cache, List<KeyCacheObject> keys, List<GridCacheVersion> vers, long ttl) {
        assert (!F.isEmpty(keys));
        assert (keys.size() == vers.size());
        int size = keys.size();
        boolean swap = cache.context().isSwapOrOffheapEnabled();
        block8: for (int i = 0; i < size; ++i) {
            try {
                GridCacheEntryEx entry = null;
                try {
                    while (true) {
                        try {
                            if (swap) {
                                entry = cache.entryEx(keys.get(i));
                                entry.unswap(false);
                            } else {
                                entry = cache.peekEx(keys.get(i));
                            }
                            if (entry == null) continue block8;
                            entry.updateTtl(vers.get(i), ttl);
                            continue block8;
                        }
                        catch (GridCacheEntryRemovedException ignore) {
                            if (!this.log.isDebugEnabled()) continue;
                            this.log.debug("Got removed entry: " + entry);
                            continue;
                        }
                        catch (GridDhtInvalidPartitionException e) {
                            if (!this.log.isDebugEnabled()) continue block8;
                            this.log.debug("Got GridDhtInvalidPartitionException: " + e);
                            continue block8;
                        }
                        break;
                    }
                }
                finally {
                    if (entry != null) {
                        cache.context().evicts().touch(entry, AffinityTopologyVersion.NONE);
                    }
                }
            }
            catch (IgniteCheckedException e) {
                this.log.error("Failed to unswap entry.", e);
            }
        }
    }

    @Override
    public void unlockAll(Collection<? extends K> keys) {
        assert (false);
    }

    @Override
    public Set<Cache.Entry<K, V>> entrySet(int part) {
        return new PartitionEntrySet(part);
    }

    @Override
    public String toString() {
        return S.toString(GridDhtCacheAdapter.class, this, super.toString());
    }

    @Override
    public List<GridCacheClearAllRunnable<K, V>> splitClearLocally(boolean srv, boolean near, boolean readers) {
        return this.ctx.affinityNode() ? super.splitClearLocally(srv, near, readers) : Collections.emptyList();
    }

    @Override
    public void onDeferredDelete(GridCacheEntryEx entry, GridCacheVersion ver) {
        assert (entry.isDht());
        GridDhtLocalPartition part = this.topology().localPartition(entry.partition(), AffinityTopologyVersion.NONE, false);
        if (part != null) {
            part.onDeferredDelete(entry.key(), ver);
        }
    }

    protected final boolean needRemap(AffinityTopologyVersion expVer, AffinityTopologyVersion curVer) {
        Collection<ClusterNode> cacheNodes1;
        if (expVer.equals(curVer)) {
            return false;
        }
        Collection<ClusterNode> cacheNodes0 = this.ctx.discovery().cacheAffinityNodes(this.ctx.name(), expVer);
        if (!cacheNodes0.equals(cacheNodes1 = this.ctx.discovery().cacheAffinityNodes(this.ctx.name(), curVer)) || this.ctx.affinity().affinityTopologyVersion().compareTo(curVer) < 0) {
            return true;
        }
        try {
            List<List<ClusterNode>> aff1 = this.ctx.affinity().assignments(expVer);
            List<List<ClusterNode>> aff2 = this.ctx.affinity().assignments(curVer);
            return !aff1.equals(aff2);
        }
        catch (IllegalStateException ignored) {
            return true;
        }
    }

    public Iterator<Cache.Entry<K, V>> localEntriesIterator(boolean primary, boolean backup, boolean keepBinary) {
        return this.localEntriesIterator(primary, backup, keepBinary, this.ctx.affinity().affinityTopologyVersion());
    }

    public Iterator<Cache.Entry<K, V>> localEntriesIterator(final boolean primary, boolean backup, boolean keepBinary, final AffinityTopologyVersion topVer) {
        assert (primary || backup);
        if (primary && backup) {
            return this.iterator(this.entries().iterator(), !keepBinary);
        }
        final Iterator<GridDhtLocalPartition> partIt = this.topology().currentLocalPartitions().iterator();
        Iterator<GridCacheMapEntry> it = new Iterator<GridCacheMapEntry>(){
            private GridCacheMapEntry next;
            private Iterator<GridCacheMapEntry> curIt;
            {
                this.advance();
            }

            @Override
            public boolean hasNext() {
                return this.next != null;
            }

            @Override
            public GridCacheMapEntry next() {
                if (this.next == null) {
                    throw new NoSuchElementException();
                }
                GridCacheMapEntry e = this.next;
                this.advance();
                return e;
            }

            @Override
            public void remove() {
                throw new UnsupportedOperationException();
            }

            private void advance() {
                this.next = null;
                do {
                    if (this.curIt == null) {
                        while (partIt.hasNext()) {
                            GridDhtLocalPartition part = (GridDhtLocalPartition)partIt.next();
                            if (primary != part.primary(topVer)) continue;
                            this.curIt = part.entries(new CacheEntryPredicate[0]).iterator();
                            break;
                        }
                    }
                    if (this.curIt == null) continue;
                    if (this.curIt.hasNext()) {
                        this.next = this.curIt.next();
                        break;
                    }
                    this.curIt = null;
                } while (partIt.hasNext());
            }
        };
        return this.iterator((Iterator<GridCacheEntryEx>)it, !keepBinary);
    }

    private static class MultiUpdateFuture
    extends GridFutureAdapter<IgniteUuid> {
        private static final long serialVersionUID = 0L;
        private AffinityTopologyVersion topVer;

        private MultiUpdateFuture(@NotNull AffinityTopologyVersion topVer) {
            this.topVer = topVer;
        }

        private AffinityTopologyVersion topologyVersion() {
            return this.topVer;
        }
    }

    private class PartitionEntryIterator
    extends GridIteratorAdapter<Cache.Entry<K, V>> {
        private static final long serialVersionUID = 0L;
        private Cache.Entry<K, V> entry;
        private Cache.Entry<K, V> last;
        private final Iterator<GridCacheMapEntry> partIt;

        private PartitionEntryIterator(Iterator<GridCacheMapEntry> partIt) {
            this.partIt = partIt;
            this.advance();
        }

        @Override
        public boolean hasNextX() {
            return this.entry != null;
        }

        @Override
        public Cache.Entry<K, V> nextX() throws IgniteCheckedException {
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            this.last = this.entry;
            this.advance();
            return this.last;
        }

        @Override
        public void removeX() throws IgniteCheckedException {
            if (this.last == null) {
                throw new IllegalStateException();
            }
            GridDhtCacheAdapter.this.ctx.grid().cache(GridDhtCacheAdapter.this.ctx.name()).remove(this.last.getKey(), this.last.getValue());
        }

        private void advance() {
            if (this.partIt != null) {
                while (this.partIt.hasNext()) {
                    GridCacheEntryEx next = this.partIt.next();
                    if (next instanceof GridCacheMapEntry && !((GridCacheMapEntry)next).visitable(CU.empty0())) continue;
                    this.entry = next.wrapLazyValue(GridDhtCacheAdapter.this.ctx.keepBinary());
                    return;
                }
            }
            this.entry = null;
        }
    }

    private class PartitionEntrySet
    extends AbstractSet<Cache.Entry<K, V>> {
        private int partId;

        private PartitionEntrySet(int partId) {
            this.partId = partId;
        }

        @Override
        @NotNull
        public Iterator<Cache.Entry<K, V>> iterator() {
            GridDhtLocalPartition part = GridDhtCacheAdapter.this.ctx.topology().localPartition(this.partId, GridDhtCacheAdapter.this.ctx.discovery().topologyVersionEx(), false);
            Iterator<GridCacheMapEntry> partIt = part == null ? null : part.entries(new CacheEntryPredicate[0]).iterator();
            return new PartitionEntryIterator(partIt);
        }

        @Override
        public boolean remove(Object o) {
            if (!(o instanceof Cache.Entry)) {
                return false;
            }
            Cache.Entry entry = (Cache.Entry)o;
            Object key = entry.getKey();
            Object val = entry.getValue();
            if (val == null) {
                return false;
            }
            try {
                return GridDhtCacheAdapter.this.remove(key, val);
            }
            catch (IgniteCheckedException e) {
                throw new IgniteException(e);
            }
        }

        @Override
        public boolean removeAll(Collection<?> c) {
            boolean rmv = false;
            for (Object o : c) {
                rmv |= this.remove(o);
            }
            return rmv;
        }

        @Override
        public boolean contains(Object o) {
            if (!(o instanceof Cache.Entry)) {
                return false;
            }
            Cache.Entry entry = (Cache.Entry)o;
            try {
                return this.partId == GridDhtCacheAdapter.this.ctx.affinity().partition(entry.getKey()) && F.eq(entry.getValue(), GridDhtCacheAdapter.this.localPeek(entry.getKey(), CachePeekModes.ONHEAP_ONLY, null));
            }
            catch (IgniteCheckedException e) {
                throw new IgniteException(e);
            }
        }

        @Override
        public int size() {
            GridDhtLocalPartition part = GridDhtCacheAdapter.this.ctx.topology().localPartition(this.partId, GridDhtCacheAdapter.this.ctx.discovery().topologyVersionEx(), false);
            return part != null ? part.publicSize() : 0;
        }

        @Override
        public String toString() {
            return S.toString(PartitionEntrySet.class, this, "super", (Object)super.toString(), true);
        }
    }
}

