/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.cache;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.binary.BinaryObjectException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
import org.apache.ignite.internal.processors.cache.GridCacheEvictionRequest;
import org.apache.ignite.internal.processors.cache.GridCacheEvictionResponse;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.CacheGetFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxState;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxStateAware;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.F0;
import org.apache.ignite.internal.util.GridLeanSet;
import org.apache.ignite.internal.util.GridSpinReadWriteLock;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;

public class GridCacheIoManager
extends GridCacheSharedManagerAdapter {
    private static final String QUERY_TOPIC_PREFIX = "QUERY";
    private static final AtomicLong idGen = new AtomicLong();
    private long retryDelay;
    private int retryCnt;
    private volatile Map<Integer, IgniteBiInClosure[]> idxClsHandlers = new HashMap<Integer, IgniteBiInClosure[]>();
    private ConcurrentMap<ListenerKey, IgniteBiInClosure<UUID, GridCacheMessage>> clsHandlers = new ConcurrentHashMap8<ListenerKey, IgniteBiInClosure<UUID, GridCacheMessage>>();
    private ConcurrentMap<Object, IgniteBiInClosure<UUID, ? extends GridCacheMessage>> orderedHandlers = new ConcurrentHashMap8<Object, IgniteBiInClosure<UUID, ? extends GridCacheMessage>>();
    private boolean stopping;
    private final GridSpinReadWriteLock rw = new GridSpinReadWriteLock();
    private boolean depEnabled;
    private GridMessageListener lsnr = new GridMessageListener(){

        @Override
        public void onMessage(final UUID nodeId, Object msg) {
            if (GridCacheIoManager.this.log.isDebugEnabled()) {
                GridCacheIoManager.this.log.debug("Received unordered cache communication message [nodeId=" + nodeId + ", locId=" + GridCacheIoManager.this.cctx.localNodeId() + ", msg=" + msg + ']');
            }
            final GridCacheMessage cacheMsg = (GridCacheMessage)msg;
            IgniteInternalFuture<Long> fut = null;
            if (cacheMsg.partitionExchangeMessage()) {
                long rmtTopVer;
                long locTopVer;
                if (cacheMsg instanceof GridDhtAffinityAssignmentRequest) {
                    assert (cacheMsg.topologyVersion() != null) : cacheMsg;
                    AffinityTopologyVersion startTopVer = new AffinityTopologyVersion(GridCacheIoManager.this.cctx.localNode().order());
                    DynamicCacheDescriptor cacheDesc = GridCacheIoManager.this.cctx.cache().cacheDescriptor(cacheMsg.cacheId());
                    if (cacheDesc != null) {
                        if (cacheDesc.startTopologyVersion() != null) {
                            startTopVer = cacheDesc.startTopologyVersion();
                        } else if (cacheDesc.receivedFromStartVersion() != null) {
                            startTopVer = cacheDesc.receivedFromStartVersion();
                        }
                    }
                    if ((fut = GridCacheIoManager.this.cctx.exchange().affinityReadyFuture(startTopVer)) != null && !fut.isDone()) {
                        if (GridCacheIoManager.this.log.isDebugEnabled()) {
                            GridCacheIoManager.this.log.debug("Wait for exchange before processing message [msg=" + msg + ", node=" + nodeId + ", waitVer=" + startTopVer + ", cacheDesc=" + cacheDesc + ']');
                        }
                        fut.listen(new CI1<IgniteInternalFuture<?>>(){

                            @Override
                            public void apply(IgniteInternalFuture<?> fut) {
                                GridCacheIoManager.this.cctx.kernalContext().closure().runLocalSafe(new Runnable(){

                                    @Override
                                    public void run() {
                                        GridCacheIoManager.this.handleMessage(nodeId, cacheMsg);
                                    }
                                });
                            }
                        });
                        return;
                    }
                }
                if ((locTopVer = GridCacheIoManager.this.cctx.discovery().topologyVersion()) < (rmtTopVer = cacheMsg.topologyVersion().topologyVersion())) {
                    if (GridCacheIoManager.this.log.isDebugEnabled()) {
                        GridCacheIoManager.this.log.debug("Received message has higher topology version [msg=" + msg + ", locTopVer=" + locTopVer + ", rmtTopVer=" + rmtTopVer + ']');
                    }
                    fut = GridCacheIoManager.this.cctx.discovery().topologyFuture(rmtTopVer);
                }
            } else {
                AffinityTopologyVersion rmtAffVer;
                AffinityTopologyVersion locAffVer = GridCacheIoManager.this.cctx.exchange().readyAffinityVersion();
                if (locAffVer.compareTo(rmtAffVer = cacheMsg.topologyVersion()) < 0) {
                    IgniteLogger log = cacheMsg.messageLogger(GridCacheIoManager.this.cctx);
                    if (log.isDebugEnabled()) {
                        StringBuilder msg0 = new StringBuilder("Received message has higher affinity topology version [");
                        GridCacheIoManager.this.appendMessageInfo(cacheMsg, nodeId, msg0);
                        msg0.append(", locTopVer=").append(locAffVer).append(", rmtTopVer=").append(rmtAffVer).append(']');
                        log.debug(msg0.toString());
                    }
                    fut = GridCacheIoManager.this.cctx.exchange().affinityReadyFuture(rmtAffVer);
                }
            }
            if (fut != null && !fut.isDone()) {
                fut.listen(new CI1<IgniteInternalFuture<?>>(){

                    @Override
                    public void apply(IgniteInternalFuture<?> t) {
                        GridCacheIoManager.this.cctx.kernalContext().closure().runLocalSafe(new Runnable(){

                            @Override
                            public void run() {
                                IgniteLogger log = cacheMsg.messageLogger(GridCacheIoManager.this.cctx);
                                if (log.isDebugEnabled()) {
                                    StringBuilder msg0 = new StringBuilder("Process cache message after wait for affinity topology version [");
                                    GridCacheIoManager.this.appendMessageInfo(cacheMsg, nodeId, msg0).append(']');
                                    log.debug(msg0.toString());
                                }
                                GridCacheIoManager.this.handleMessage(nodeId, cacheMsg);
                            }
                        });
                    }
                });
                return;
            }
            GridCacheIoManager.this.handleMessage(nodeId, cacheMsg);
        }
    };

    private void handleMessage(UUID nodeId, GridCacheMessage cacheMsg) {
        Map<Integer, IgniteBiInClosure[]> idxClsHandlers0;
        IgniteBiInClosure[] cacheClsHandlers;
        int msgIdx = cacheMsg.lookupIndex();
        IgniteBiInClosure c = null;
        if (msgIdx >= 0 && (cacheClsHandlers = (idxClsHandlers0 = this.idxClsHandlers).get(cacheMsg.cacheId())) != null) {
            c = cacheClsHandlers[msgIdx];
        }
        if (c == null) {
            c = (IgniteBiInClosure)this.clsHandlers.get(new ListenerKey(cacheMsg.cacheId(), cacheMsg.getClass()));
        }
        if (c == null) {
            IgniteLogger log = cacheMsg.messageLogger(this.cctx);
            StringBuilder msg0 = new StringBuilder("Received message without registered handler (will ignore) [");
            this.appendMessageInfo(cacheMsg, nodeId, msg0);
            msg0.append(", locTopVer=").append(this.cctx.exchange().readyAffinityVersion()).append(", msgTopVer=").append(cacheMsg.topologyVersion()).append(", cacheDesc=").append(this.cctx.cache().cacheDescriptor(cacheMsg.cacheId())).append(']');
            msg0.append(U.nl()).append("Registered listeners:");
            Map<Integer, IgniteBiInClosure[]> idxClsHandlers02 = this.idxClsHandlers;
            for (Map.Entry<Integer, IgniteBiInClosure[]> e : idxClsHandlers02.entrySet()) {
                msg0.append(U.nl()).append(e.getKey()).append("=").append(Arrays.toString(e.getValue()));
            }
            if (this.cctx.kernalContext().isStopping()) {
                if (log.isDebugEnabled()) {
                    log.debug(msg0.toString());
                }
            } else {
                U.error(log, msg0.toString());
            }
            return;
        }
        this.onMessage0(nodeId, cacheMsg, c);
    }

    @Override
    public void start0() throws IgniteCheckedException {
        this.retryDelay = this.cctx.gridConfig().getNetworkSendRetryDelay();
        this.retryCnt = this.cctx.gridConfig().getNetworkSendRetryCount();
        this.depEnabled = this.cctx.gridDeploy().enabled();
        this.cctx.gridIO().addMessageListener(GridTopic.TOPIC_CACHE, this.lsnr);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void onKernalStop0(boolean cancel) {
        this.cctx.gridIO().removeMessageListener(GridTopic.TOPIC_CACHE);
        for (Object ordTopic : this.orderedHandlers.keySet()) {
            this.cctx.gridIO().removeMessageListener(ordTopic);
        }
        boolean interrupted = false;
        while (true) {
            try {
                while (!this.rw.tryWriteLock(200L, TimeUnit.MILLISECONDS)) {
                    Thread.sleep(200L);
                }
            }
            catch (InterruptedException ignore) {
                interrupted = true;
                continue;
            }
            break;
        }
        if (interrupted) {
            Thread.currentThread().interrupt();
        }
        try {
            this.stopping = true;
        }
        finally {
            this.rw.writeUnlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void onMessage0(UUID nodeId, GridCacheMessage cacheMsg, IgniteBiInClosure<UUID, GridCacheMessage> c) {
        this.rw.readLock();
        try {
            if (this.stopping) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Received cache communication message while stopping (will ignore) [nodeId=" + nodeId + ", msg=" + cacheMsg + ']');
                }
                return;
            }
            if (this.depEnabled) {
                this.cctx.deploy().ignoreOwnership(true);
            }
            this.unmarshall(nodeId, cacheMsg);
            if (cacheMsg.classError() != null) {
                this.processFailedMessage(nodeId, cacheMsg, c);
            } else {
                this.processMessage(nodeId, cacheMsg, c);
            }
        }
        catch (Throwable e) {
            U.error(this.log, "Failed to process message [senderId=" + nodeId + ", messageType=" + cacheMsg.getClass() + ']', e);
            if (e instanceof Error) {
                throw (Error)e;
            }
        }
        finally {
            if (this.depEnabled) {
                this.cctx.deploy().ignoreOwnership(false);
            }
            this.rw.readUnlock();
        }
    }

    private void sendResponseOnFailedMessage(UUID nodeId, GridCacheMessage res, GridCacheSharedContext cctx, byte plc) {
        try {
            cctx.io().send(nodeId, res, plc);
        }
        catch (IgniteCheckedException e) {
            U.error(this.log, "Failed to send response to node (is node still alive?) [nodeId=" + nodeId + ",res=" + res + ']', e);
        }
    }

    private StringBuilder appendMessageInfo(GridCacheMessage cacheMsg, UUID nodeId, StringBuilder builder) {
        if (this.txId(cacheMsg) != null) {
            builder.append("txId=").append(this.txId(cacheMsg)).append(", dhtTxId=").append(this.dhtTxId(cacheMsg)).append(", msg=").append(cacheMsg);
        } else if (this.atomicFututeId(cacheMsg) != null) {
            builder.append("futId=").append(this.atomicFututeId(cacheMsg)).append(", writeVer=").append(this.atomicWriteVersion(cacheMsg)).append(", msg=").append(cacheMsg);
        } else {
            builder.append("msg=").append(cacheMsg);
        }
        builder.append(", node=").append(nodeId);
        return builder;
    }

    @Nullable
    private GridCacheVersion txId(GridCacheMessage cacheMsg) {
        if (cacheMsg instanceof GridDhtTxPrepareRequest) {
            return ((GridDhtTxPrepareRequest)cacheMsg).nearXidVersion();
        }
        if (cacheMsg instanceof GridNearTxPrepareRequest) {
            return ((GridNearTxPrepareRequest)cacheMsg).version();
        }
        if (cacheMsg instanceof GridNearTxPrepareResponse) {
            return ((GridNearTxPrepareResponse)cacheMsg).version();
        }
        if (cacheMsg instanceof GridNearTxFinishRequest) {
            return ((GridNearTxFinishRequest)cacheMsg).version();
        }
        if (cacheMsg instanceof GridNearTxFinishResponse) {
            return ((GridNearTxFinishResponse)cacheMsg).xid();
        }
        return null;
    }

    @Nullable
    private GridCacheVersion dhtTxId(GridCacheMessage cacheMsg) {
        if (cacheMsg instanceof GridDhtTxPrepareRequest) {
            return ((GridDhtTxPrepareRequest)cacheMsg).version();
        }
        if (cacheMsg instanceof GridDhtTxPrepareResponse) {
            return ((GridDhtTxPrepareResponse)cacheMsg).version();
        }
        if (cacheMsg instanceof GridDhtTxFinishRequest) {
            return ((GridDhtTxFinishRequest)cacheMsg).version();
        }
        if (cacheMsg instanceof GridDhtTxFinishResponse) {
            return ((GridDhtTxFinishResponse)cacheMsg).xid();
        }
        return null;
    }

    @Nullable
    private GridCacheVersion atomicFututeId(GridCacheMessage cacheMsg) {
        if (cacheMsg instanceof GridNearAtomicAbstractUpdateRequest) {
            return ((GridNearAtomicAbstractUpdateRequest)cacheMsg).futureVersion();
        }
        if (cacheMsg instanceof GridNearAtomicUpdateResponse) {
            return ((GridNearAtomicUpdateResponse)cacheMsg).futureVersion();
        }
        if (cacheMsg instanceof GridDhtAtomicAbstractUpdateRequest) {
            return ((GridDhtAtomicAbstractUpdateRequest)cacheMsg).futureVersion();
        }
        if (cacheMsg instanceof GridDhtAtomicUpdateResponse) {
            return ((GridDhtAtomicUpdateResponse)cacheMsg).futureVersion();
        }
        return null;
    }

    @Nullable
    private GridCacheVersion atomicWriteVersion(GridCacheMessage cacheMsg) {
        if (cacheMsg instanceof GridNearAtomicAbstractUpdateRequest) {
            return ((GridNearAtomicAbstractUpdateRequest)cacheMsg).updateVersion();
        }
        if (cacheMsg instanceof GridDhtAtomicAbstractUpdateRequest) {
            return ((GridDhtAtomicAbstractUpdateRequest)cacheMsg).writeVersion();
        }
        return null;
    }

    private void processFailedMessage(UUID nodeId, GridCacheMessage msg, IgniteBiInClosure<UUID, GridCacheMessage> c) throws IgniteCheckedException {
        GridCacheContext ctx = this.cctx.cacheContext(msg.cacheId());
        switch (msg.directType()) {
            case 14: {
                GridCacheEvictionRequest req = (GridCacheEvictionRequest)msg;
                GridCacheEvictionResponse res = new GridCacheEvictionResponse(ctx.cacheId(), req.futureId(), req.classError() != null);
                this.sendResponseOnFailedMessage(nodeId, res, this.cctx, ctx.ioPolicy());
                break;
            }
            case 30: {
                GridDhtLockRequest req = (GridDhtLockRequest)msg;
                GridDhtLockResponse res = new GridDhtLockResponse(ctx.cacheId(), req.version(), req.futureId(), req.miniId(), 0, ctx.deploymentEnabled());
                this.sendResponseOnFailedMessage(nodeId, res, this.cctx, ctx.ioPolicy());
                break;
            }
            case 34: {
                GridDhtTxPrepareRequest req = (GridDhtTxPrepareRequest)msg;
                GridDhtTxPrepareResponse res = new GridDhtTxPrepareResponse(req.version(), req.futureId(), req.miniId(), req.deployInfo() != null);
                res.error(req.classError());
                this.sendResponseOnFailedMessage(nodeId, res, this.cctx, req.policy());
                break;
            }
            case 38: {
                GridDhtAtomicUpdateRequest req = (GridDhtAtomicUpdateRequest)msg;
                GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(ctx.cacheId(), req.futureVersion(), ctx.deploymentEnabled());
                res.onError(req.classError());
                this.sendResponseOnFailedMessage(nodeId, res, this.cctx, ctx.ioPolicy());
                break;
            }
            case 40: {
                GridNearAtomicFullUpdateRequest req = (GridNearAtomicFullUpdateRequest)msg;
                GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, req.futureVersion(), ctx.deploymentEnabled());
                res.error(req.classError());
                this.sendResponseOnFailedMessage(nodeId, res, this.cctx, ctx.ioPolicy());
                break;
            }
            case 42: {
                GridDhtForceKeysRequest req = (GridDhtForceKeysRequest)msg;
                GridDhtForceKeysResponse res = new GridDhtForceKeysResponse(ctx.cacheId(), req.futureId(), req.miniId(), ctx.deploymentEnabled());
                res.error(req.classError());
                this.sendResponseOnFailedMessage(nodeId, res, this.cctx, ctx.ioPolicy());
                break;
            }
            case 45: {
                this.processMessage(nodeId, msg, c);
                break;
            }
            case 49: {
                GridNearGetRequest req = (GridNearGetRequest)msg;
                GridNearGetResponse res = new GridNearGetResponse(ctx.cacheId(), req.futureId(), req.miniId(), req.version(), req.deployInfo() != null);
                res.error(req.classError());
                this.sendResponseOnFailedMessage(nodeId, res, this.cctx, ctx.ioPolicy());
                break;
            }
            case 50: {
                GridNearGetResponse res = (GridNearGetResponse)msg;
                CacheGetFuture fut = (CacheGetFuture)((Object)ctx.mvcc().future(res.futureId()));
                if (fut == null) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Failed to find future for get response [sender=" + nodeId + ", res=" + res + ']');
                    }
                    return;
                }
                res.error(res.classError());
                fut.onResult(nodeId, res);
                break;
            }
            case 51: {
                GridNearLockRequest req = (GridNearLockRequest)msg;
                GridNearLockResponse res = new GridNearLockResponse(ctx.cacheId(), req.version(), req.futureId(), req.miniId(), false, 0, req.classError(), null, ctx.deploymentEnabled());
                this.sendResponseOnFailedMessage(nodeId, res, this.cctx, ctx.ioPolicy());
                break;
            }
            case 55: {
                GridNearTxPrepareRequest req = (GridNearTxPrepareRequest)msg;
                GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(req.version(), req.futureId(), req.miniId(), req.version(), req.version(), null, null, null, req.deployInfo() != null);
                res.error(req.classError());
                this.sendResponseOnFailedMessage(nodeId, res, this.cctx, req.policy());
                break;
            }
            case 58: {
                GridCacheQueryRequest req = (GridCacheQueryRequest)msg;
                GridCacheQueryResponse res = new GridCacheQueryResponse(req.cacheId(), req.id(), req.classError(), this.cctx.deploymentEnabled());
                this.cctx.io().sendOrderedMessage(ctx.node(nodeId), GridTopic.TOPIC_CACHE.topic(QUERY_TOPIC_PREFIX, nodeId, req.id()), res, ctx.ioPolicy(), Long.MAX_VALUE);
                break;
            }
            case 114: {
                this.processMessage(nodeId, msg, c);
                break;
            }
            case 116: {
                GridNearSingleGetRequest req = (GridNearSingleGetRequest)msg;
                GridNearSingleGetResponse res = new GridNearSingleGetResponse(ctx.cacheId(), req.futureId(), req.topologyVersion(), null, false, req.deployInfo() != null);
                res.error(req.classError());
                this.sendResponseOnFailedMessage(nodeId, res, this.cctx, ctx.ioPolicy());
                break;
            }
            case 117: {
                GridNearSingleGetResponse res = (GridNearSingleGetResponse)msg;
                GridPartitionedSingleGetFuture fut = (GridPartitionedSingleGetFuture)ctx.mvcc().future(new IgniteUuid(IgniteUuid.VM_ID, res.futureId()));
                if (fut == null) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Failed to find future for get response [sender=" + nodeId + ", res=" + res + ']');
                    }
                    return;
                }
                res.error(res.classError());
                fut.onResult(nodeId, res);
                break;
            }
            case 125: {
                GridNearAtomicSingleUpdateRequest req = (GridNearAtomicSingleUpdateRequest)msg;
                GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, req.futureVersion(), ctx.deploymentEnabled());
                res.error(req.classError());
                this.sendResponseOnFailedMessage(nodeId, res, this.cctx, ctx.ioPolicy());
                break;
            }
            case 126: {
                GridNearAtomicSingleUpdateInvokeRequest req = (GridNearAtomicSingleUpdateInvokeRequest)msg;
                GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, req.futureVersion(), ctx.deploymentEnabled());
                res.error(req.classError());
                this.sendResponseOnFailedMessage(nodeId, res, this.cctx, ctx.ioPolicy());
                break;
            }
            case 127: {
                GridNearAtomicSingleUpdateFilterRequest req = (GridNearAtomicSingleUpdateFilterRequest)msg;
                GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, req.futureVersion(), ctx.deploymentEnabled());
                res.error(req.classError());
                this.sendResponseOnFailedMessage(nodeId, res, this.cctx, ctx.ioPolicy());
                break;
            }
            case -36: {
                GridDhtAtomicSingleUpdateRequest req = (GridDhtAtomicSingleUpdateRequest)msg;
                GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(ctx.cacheId(), req.futureVersion(), ctx.deploymentEnabled());
                res.onError(req.classError());
                this.sendResponseOnFailedMessage(nodeId, res, this.cctx, ctx.ioPolicy());
                break;
            }
            default: {
                throw new IgniteCheckedException("Failed to send response to node. Unsupported direct type [message=" + msg + "]", msg.classError());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processMessage(UUID nodeId, GridCacheMessage msg, IgniteBiInClosure<UUID, GridCacheMessage> c) {
        try {
            c.apply(nodeId, msg);
            if (this.log.isDebugEnabled()) {
                this.log.debug("Finished processing cache communication message [nodeId=" + nodeId + ", msg=" + msg + ']');
            }
        }
        catch (Throwable e) {
            U.error(this.log, "Failed processing message [senderId=" + nodeId + ", msg=" + msg + ']', e);
            if (e instanceof Error) {
                throw e;
            }
        }
        finally {
            this.cctx.tm().resetContext();
            this.cctx.mvcc().contextReset();
            if (msg instanceof IgniteTxStateAware) {
                IgniteTxState txState = ((IgniteTxStateAware)((Object)msg)).txState();
                if (txState != null) {
                    txState.unwindEvicts(this.cctx);
                }
            } else {
                GridCacheContext ctx = this.cctx.cacheContext(msg.cacheId());
                if (ctx != null) {
                    CU.unwindEvicts(ctx);
                }
            }
        }
    }

    private boolean onSend(GridCacheMessage msg, @Nullable UUID destNodeId) throws IgniteCheckedException {
        if (msg.error() != null && this.cctx.kernalContext().isStopping()) {
            return false;
        }
        if (msg.messageId() < 0L) {
            msg.messageId(idGen.incrementAndGet());
        }
        if (destNodeId == null || !this.cctx.localNodeId().equals(destNodeId)) {
            msg.prepareMarshal(this.cctx);
            if (msg instanceof GridCacheDeployable && msg.addDeploymentInfo()) {
                this.cctx.deploy().prepare((GridCacheDeployable)((Object)msg));
            }
        }
        return true;
    }

    public void send(ClusterNode node, GridCacheMessage msg, byte plc) throws IgniteCheckedException {
        assert (!node.isLocal());
        if (!this.onSend(msg, node.id())) {
            return;
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Sending cache message [msg=" + msg + ", node=" + U.toShortString(node) + ']');
        }
        int cnt = 0;
        while (cnt <= this.retryCnt) {
            try {
                ++cnt;
                this.cctx.gridIO().send(node, GridTopic.TOPIC_CACHE, (Message)msg, plc);
                return;
            }
            catch (IgniteCheckedException e) {
                if (!this.cctx.discovery().alive(node.id()) || !this.cctx.discovery().pingNode(node.id())) {
                    throw new ClusterTopologyCheckedException("Node left grid while sending message to: " + node.id(), e);
                }
                if (cnt == this.retryCnt || this.cctx.kernalContext().isStopping()) {
                    throw e;
                }
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Failed to send message to node (will retry): " + node.id());
                }
                U.sleep(this.retryDelay);
            }
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Sent cache message [msg=" + msg + ", node=" + U.toShortString(node) + ']');
        }
    }

    public void safeSend(Collection<? extends ClusterNode> nodes, GridCacheMessage msg, byte plc, @Nullable IgnitePredicate<ClusterNode> fallback) throws IgniteCheckedException {
        assert (nodes != null);
        assert (msg != null);
        if (nodes.isEmpty()) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Message will not be sent as collection of nodes is empty: " + msg);
            }
            return;
        }
        if (!this.onSend(msg, null)) {
            return;
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Sending cache message [msg=" + msg + ", nodes=" + U.toShortString(nodes) + ']');
        }
        final GridLeanSet<UUID> leftIds = new GridLeanSet<UUID>();
        int cnt = 0;
        while (cnt < this.retryCnt) {
            boolean added;
            try {
                Collection<? extends ClusterNode> nodesView = F.view(nodes, new P1<ClusterNode>(){

                    @Override
                    public boolean apply(ClusterNode e) {
                        return !leftIds.contains(e.id());
                    }
                });
                this.cctx.gridIO().send(nodesView, GridTopic.TOPIC_CACHE, (Message)msg, plc);
                added = false;
                for (ClusterNode clusterNode : nodes) {
                    if (leftIds.contains(clusterNode.id()) || this.cctx.discovery().alive(clusterNode.id())) continue;
                    leftIds.add(clusterNode.id());
                    if (fallback != null && !fallback.apply(clusterNode)) {
                        return;
                    }
                    added = true;
                }
                if (added && !F.exist(F.nodeIds(nodes), F0.not(F.contains(leftIds)))) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Message will not be sent because all nodes left topology [msg=" + msg + ", nodes=" + U.toShortString(nodes) + ']');
                    }
                    return;
                }
                break;
            }
            catch (IgniteCheckedException e) {
                added = false;
                for (ClusterNode clusterNode : nodes) {
                    if (leftIds.contains(clusterNode.id()) || this.cctx.discovery().alive(clusterNode.id()) && this.cctx.discovery().pingNode(clusterNode.id())) continue;
                    leftIds.add(clusterNode.id());
                    if (fallback != null && !fallback.apply(clusterNode)) {
                        return;
                    }
                    added = true;
                }
                if (!added) {
                    if (++cnt == this.retryCnt) {
                        throw e;
                    }
                    U.sleep(this.retryDelay);
                }
                if (!F.exist(F.nodeIds(nodes), F0.not(F.contains(leftIds)))) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Message will not be sent because all nodes left topology [msg=" + msg + ", nodes=" + U.toShortString(nodes) + ']');
                    }
                    return;
                }
                if (!this.log.isDebugEnabled()) continue;
                this.log.debug("Message send will be retried [msg=" + msg + ", nodes=" + U.toShortString(nodes) + ", leftIds=" + leftIds + ']');
            }
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Sent cache message [msg=" + msg + ", nodes=" + U.toShortString(nodes) + ']');
        }
    }

    public void send(UUID nodeId, GridCacheMessage msg, byte plc) throws IgniteCheckedException {
        ClusterNode n = this.cctx.discovery().node(nodeId);
        if (n == null) {
            throw new ClusterTopologyCheckedException("Failed to send message because node left grid [nodeId=" + nodeId + ", msg=" + msg + ']');
        }
        this.send(n, msg, plc);
    }

    public void sendOrderedMessage(ClusterNode node, Object topic, GridCacheMessage msg, byte plc, long timeout) throws IgniteCheckedException {
        if (!this.onSend(msg, node.id())) {
            return;
        }
        int cnt = 0;
        while (cnt <= this.retryCnt) {
            try {
                ++cnt;
                this.cctx.gridIO().sendOrderedMessage(node, topic, (Message)msg, plc, timeout, false);
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Sent ordered cache message [topic=" + topic + ", msg=" + msg + ", nodeId=" + node.id() + ']');
                }
                return;
            }
            catch (IgniteCheckedException e) {
                if (this.cctx.discovery().node(node.id()) == null) {
                    throw new ClusterTopologyCheckedException("Node left grid while sending ordered message to: " + node.id(), e);
                }
                if (cnt == this.retryCnt) {
                    throw e;
                }
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Failed to send message to node (will retry): " + node.id());
                }
                U.sleep(this.retryDelay);
            }
        }
    }

    public long nextIoId() {
        return idGen.incrementAndGet();
    }

    public void sendNoRetry(ClusterNode node, GridCacheMessage msg, byte plc) throws IgniteCheckedException {
        assert (node != null);
        assert (msg != null);
        if (!this.onSend(msg, null)) {
            return;
        }
        try {
            this.cctx.gridIO().send(node, GridTopic.TOPIC_CACHE, (Message)msg, plc);
            if (this.log.isDebugEnabled()) {
                this.log.debug("Sent cache message [msg=" + msg + ", node=" + U.toShortString(node) + ']');
            }
        }
        catch (IgniteCheckedException e) {
            if (!this.cctx.discovery().alive(node.id())) {
                throw new ClusterTopologyCheckedException("Node left grid while sending message to: " + node.id(), e);
            }
            throw e;
        }
    }

    public void addHandler(int cacheId, Class<? extends GridCacheMessage> type, IgniteBiInClosure<UUID, ? extends GridCacheMessage> c) {
        int msgIdx = this.messageIndex(type);
        if (msgIdx != -1) {
            Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = this.idxClsHandlers;
            IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers0.get(cacheId);
            if (cacheClsHandlers == null) {
                cacheClsHandlers = new IgniteBiInClosure[5];
                idxClsHandlers0.put(cacheId, cacheClsHandlers);
            }
            if (cacheClsHandlers[msgIdx] != null) {
                throw new IgniteException("Duplicate cache message ID found [cacheId=" + cacheId + ", type=" + type + ']');
            }
            cacheClsHandlers[msgIdx] = c;
            this.idxClsHandlers = idxClsHandlers0;
            return;
        }
        ListenerKey key = new ListenerKey(cacheId, type);
        if (this.clsHandlers.putIfAbsent(key, c) != null) assert (false) : "Handler for class already registered [cacheId=" + cacheId + ", cls=" + type + ", old=" + this.clsHandlers.get(key) + ", new=" + c + ']';
        IgniteLogger log0 = this.log;
        if (log0 != null && log0.isTraceEnabled()) {
            log0.trace("Registered cache communication handler [cacheId=" + cacheId + ", type=" + type + ", msgIdx=" + msgIdx + ", handler=" + c + ']');
        }
    }

    public void removeHandlers(int cacheId) {
        assert (cacheId != 0);
        this.idxClsHandlers.remove(cacheId);
        Iterator iter = this.clsHandlers.keySet().iterator();
        while (iter.hasNext()) {
            ListenerKey key = (ListenerKey)iter.next();
            if (key.cacheId != cacheId) continue;
            iter.remove();
        }
    }

    public void removeHandler(int cacheId, Class<? extends GridCacheMessage> type) {
        this.clsHandlers.remove(new ListenerKey(cacheId, type));
    }

    private int messageIndex(Class<?> msgCls) {
        try {
            Integer msgIdx = (Integer)U.field(msgCls, "CACHE_MSG_IDX");
            if (msgIdx == null || msgIdx < 0) {
                return -1;
            }
            return msgIdx;
        }
        catch (IgniteCheckedException ignored) {
            return -1;
        }
    }

    public void addOrderedHandler(Object topic, IgniteBiInClosure<UUID, ? extends GridCacheMessage> c) {
        IgniteLogger log0 = this.log;
        if (this.orderedHandlers.putIfAbsent(topic, c) == null) {
            this.cctx.gridIO().addMessageListener(topic, (GridMessageListener)new OrderedMessageListener(c));
            if (log0 != null && log0.isTraceEnabled()) {
                log0.trace("Registered ordered cache communication handler [topic=" + topic + ", handler=" + c + ']');
            }
        } else if (log0 != null) {
            U.warn(log0, "Failed to register ordered cache communication handler because it is already registered for this topic [topic=" + topic + ", handler=" + c + ']');
        }
    }

    public void removeOrderedHandler(Object topic) {
        if (this.orderedHandlers.remove(topic) != null) {
            this.cctx.gridIO().removeMessageListener(topic);
            if (this.log != null && this.log.isDebugEnabled()) {
                this.log.debug("Unregistered ordered cache communication handler for topic:" + topic);
            }
        } else if (this.log != null) {
            U.warn(this.log, "Failed to unregister ordered cache communication handler because it was not found for topic: " + topic);
        }
    }

    private void unmarshall(UUID nodeId, GridCacheMessage cacheMsg) {
        if (this.cctx.localNodeId().equals(nodeId)) {
            return;
        }
        GridDeploymentInfo bean = cacheMsg.deployInfo();
        if (bean != null) {
            assert (this.depEnabled) : "Received deployment info while peer class loading is disabled [nodeId=" + nodeId + ", msg=" + cacheMsg + ']';
            this.cctx.deploy().p2pContext(nodeId, bean.classLoaderId(), bean.userVersion(), bean.deployMode(), bean.participants(), bean.localDeploymentOwner());
            if (this.log.isDebugEnabled()) {
                this.log.debug("Set P2P context [senderId=" + nodeId + ", msg=" + cacheMsg + ']');
            }
        }
        try {
            cacheMsg.finishUnmarshal(this.cctx, this.cctx.deploy().globalLoader());
        }
        catch (IgniteCheckedException e) {
            cacheMsg.onClassError(e);
        }
        catch (BinaryObjectException e) {
            cacheMsg.onClassError(new IgniteCheckedException(e));
        }
        catch (Error e) {
            if (cacheMsg.ignoreClassErrors() && X.hasCause(e, NoClassDefFoundError.class, UnsupportedClassVersionError.class)) {
                cacheMsg.onClassError(new IgniteCheckedException("Failed to load class during unmarshalling: " + e, e));
            }
            throw e;
        }
    }

    @Override
    public void printMemoryStats() {
        X.println(">>> ", new Object[0]);
        X.println(">>> Cache IO manager memory stats [grid=" + this.cctx.gridName() + ']', new Object[0]);
        X.println(">>>   clsHandlersSize: " + this.clsHandlers.size(), new Object[0]);
        X.println(">>>   orderedHandlersSize: " + this.orderedHandlers.size(), new Object[0]);
    }

    private static class ListenerKey {
        private int cacheId;
        private Class<? extends GridCacheMessage> msgCls;

        private ListenerKey(int cacheId, Class<? extends GridCacheMessage> msgCls) {
            this.cacheId = cacheId;
            this.msgCls = msgCls;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (!(o instanceof ListenerKey)) {
                return false;
            }
            ListenerKey that = (ListenerKey)o;
            return this.cacheId == that.cacheId && this.msgCls.equals(that.msgCls);
        }

        public int hashCode() {
            int res = this.cacheId;
            res = 31 * res + this.msgCls.hashCode();
            return res;
        }
    }

    private class OrderedMessageListener
    implements GridMessageListener {
        private final IgniteBiInClosure<UUID, GridCacheMessage> c;

        OrderedMessageListener(IgniteBiInClosure<UUID, GridCacheMessage> c) {
            this.c = c;
        }

        @Override
        public void onMessage(UUID nodeId, Object msg) {
            if (GridCacheIoManager.this.log.isDebugEnabled()) {
                GridCacheIoManager.this.log.debug("Received cache ordered message [nodeId=" + nodeId + ", msg=" + msg + ']');
            }
            GridCacheMessage cacheMsg = (GridCacheMessage)msg;
            GridCacheIoManager.this.onMessage0(nodeId, cacheMsg, this.c);
        }
    }
}

