/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.managers.communication;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteComponentType;
import org.apache.ignite.internal.IgniteDeploymentCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.direct.DirectMessageReader;
import org.apache.ignite.internal.direct.DirectMessageWriter;
import org.apache.ignite.internal.managers.GridManagerAdapter;
import org.apache.ignite.internal.managers.communication.GridDisconnectListener;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.communication.GridIoUserMessage;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.communication.IgniteIoTestMessage;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
import org.apache.ignite.internal.processors.pool.PoolProcessor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
import org.apache.ignite.internal.util.GridSpinReadWriteLock;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridTuple3;
import org.apache.ignite.internal.util.nio.GridNioBackPressureControl;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.plugin.extensions.communication.MessageFormatter;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.apache.ignite.spi.IgniteSpi;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.communication.CommunicationListener;
import org.apache.ignite.spi.communication.CommunicationSpi;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
import org.jsr166.ConcurrentLinkedDeque8;
import org.jsr166.ConcurrentLinkedHashMap;

public class GridIoManager
extends GridManagerAdapter<CommunicationSpi<Serializable>> {
    public static final MessageFactory[] EMPTY = new MessageFactory[0];
    public static final int MAX_CLOSED_TOPICS = 10240;
    public static final String DIRECT_PROTO_VER_ATTR = "comm.direct.proto.ver";
    public static final byte DIRECT_PROTO_VER = 2;
    private static final ThreadLocal<Byte> CUR_PLC = new ThreadLocal();
    private final ConcurrentMap<Object, GridMessageListener> lsnrMap = new ConcurrentHashMap8<Object, GridMessageListener>();
    private volatile GridMessageListener[] sysLsnrs;
    private final Object sysLsnrsMux = new Object();
    private final Collection<GridDisconnectListener> disconnectLsnrs = new ConcurrentLinkedQueue<GridDisconnectListener>();
    private PoolProcessor pools;
    private GridLocalEventListener discoLsnr;
    private final ConcurrentMap<Object, ConcurrentMap<UUID, GridCommunicationMessageSet>> msgSetMap = new ConcurrentHashMap8<Object, ConcurrentMap<UUID, GridCommunicationMessageSet>>();
    private final UUID locNodeId;
    private final long discoDelay;
    private final ConcurrentMap<UUID, ConcurrentLinkedDeque8<DelayedMessage>> waitMap = new ConcurrentHashMap8<UUID, ConcurrentLinkedDeque8<DelayedMessage>>();
    private CommunicationListener<Serializable> commLsnr;
    private final Marshaller marsh;
    private final GridSpinReadWriteLock busyLock = new GridSpinReadWriteLock();
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private volatile boolean started;
    private final GridBoundedConcurrentLinkedHashSet<Object> closedTopics = new GridBoundedConcurrentLinkedHashSet(10240, 10240, 0.75f, 256, ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q_OPTIMIZED_RMV);
    private MessageFactory msgFactory;
    private MessageFormatter formatter;
    private boolean stopping;
    private final AtomicReference<ConcurrentHashMap<Long, IoTestFuture>> ioTestMap = new AtomicReference();
    private final AtomicLong ioTestId = new AtomicLong();
    private static final IgniteRunnable NOOP = new IgniteRunnable(){

        @Override
        public void run() {
        }
    };

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public GridIoManager(GridKernalContext ctx) {
        super(ctx, (IgniteSpi[])new CommunicationSpi[]{ctx.config().getCommunicationSpi()});
        this.pools = ctx.pools();
        assert (this.pools != null);
        this.locNodeId = ctx.localNodeId();
        this.discoDelay = ctx.config().getDiscoveryStartupDelay();
        this.marsh = ctx.config().getMarshaller();
        Object object = this.sysLsnrsMux;
        synchronized (object) {
            this.sysLsnrs = new GridMessageListener[GridTopic.values().length];
        }
    }

    public MessageFactory messageFactory() {
        assert (this.msgFactory != null);
        return this.msgFactory;
    }

    public MessageFormatter formatter() {
        assert (this.formatter != null);
        return this.formatter;
    }

    public void resetMetrics() {
        ((CommunicationSpi)this.getSpi()).resetMetrics();
    }

    @Override
    public void start() throws IgniteCheckedException {
        this.assertParameter(this.discoDelay > 0L, "discoveryStartupDelay > 0");
        this.startSpi();
        this.commLsnr = new CommunicationListener<Serializable>(){

            @Override
            public void onMessage(UUID nodeId, Serializable msg, IgniteRunnable msgC) {
                try {
                    GridIoManager.this.onMessage0(nodeId, (GridIoMessage)msg, msgC);
                }
                catch (ClassCastException ignored) {
                    U.error(GridIoManager.this.log, "Communication manager received message of unknown type (will ignore): " + msg.getClass().getName() + ". Most likely GridCommunicationSpi is being used directly, " + "which is illegal - make sure to send messages only via GridProjection API.");
                }
            }

            @Override
            public void onDisconnected(UUID nodeId) {
                for (GridDisconnectListener lsnr : GridIoManager.this.disconnectLsnrs) {
                    lsnr.onNodeDisconnected(nodeId);
                }
            }
        };
        ((CommunicationSpi)this.getSpi()).setListener(this.commLsnr);
        this.ctx.addNodeAttribute(DIRECT_PROTO_VER_ATTR, (byte)2);
        MessageFormatter[] formatterExt = (MessageFormatter[])this.ctx.plugins().extensions(MessageFormatter.class);
        if (formatterExt != null && formatterExt.length > 0) {
            if (formatterExt.length > 1) {
                throw new IgniteCheckedException("More than one MessageFormatter extension is defined. Check your plugins configuration and make sure that only one of them provides custom message format.");
            }
            this.formatter = formatterExt[0];
        } else {
            this.formatter = new MessageFormatter(){

                @Override
                public MessageWriter writer(UUID rmtNodeId) throws IgniteCheckedException {
                    assert (rmtNodeId != null);
                    return new DirectMessageWriter(U.directProtocolVersion(GridIoManager.this.ctx, rmtNodeId));
                }

                @Override
                public MessageReader reader(UUID rmtNodeId, MessageFactory msgFactory) throws IgniteCheckedException {
                    assert (rmtNodeId != null);
                    return new DirectMessageReader(msgFactory, U.directProtocolVersion(GridIoManager.this.ctx, rmtNodeId));
                }
            };
        }
        MessageFactory[] msgs = (MessageFactory[])this.ctx.plugins().extensions(MessageFactory.class);
        if (msgs == null) {
            msgs = EMPTY;
        }
        ArrayList<MessageFactory> compMsgs = new ArrayList<MessageFactory>();
        for (IgniteComponentType compType : IgniteComponentType.values()) {
            MessageFactory f = compType.messageFactory();
            if (f == null) continue;
            compMsgs.add(f);
        }
        if (!compMsgs.isEmpty()) {
            msgs = F.concat(msgs, compMsgs.toArray(new MessageFactory[compMsgs.size()]));
        }
        this.msgFactory = new GridIoMessageFactory(msgs);
        if (this.log.isDebugEnabled()) {
            this.log.debug(this.startInfo());
        }
        this.addMessageListener(GridTopic.TOPIC_IO_TEST, new GridMessageListener(){

            @Override
            public void onMessage(UUID nodeId, Object msg) {
                ClusterNode node = GridIoManager.this.ctx.discovery().node(nodeId);
                if (node == null) {
                    return;
                }
                IgniteIoTestMessage msg0 = (IgniteIoTestMessage)msg;
                if (msg0.request()) {
                    IgniteIoTestMessage res = new IgniteIoTestMessage(msg0.id(), false, null);
                    res.flags(msg0.flags());
                    try {
                        GridIoManager.this.send(node, GridTopic.TOPIC_IO_TEST, (Message)res, (byte)2);
                    }
                    catch (IgniteCheckedException e) {
                        U.error(GridIoManager.this.log, "Failed to send IO test response [msg=" + msg0 + "]", e);
                    }
                } else {
                    IoTestFuture fut = (IoTestFuture)GridIoManager.this.ioTestMap().get(msg0.id());
                    if (fut == null) {
                        U.warn(GridIoManager.this.log, "Failed to find IO test future [msg=" + msg0 + ']');
                    } else {
                        fut.onResponse();
                    }
                }
            }
        });
    }

    public IgniteInternalFuture sendIoTest(List<ClusterNode> nodes, byte[] payload, boolean procFromNioThread) {
        long id = this.ioTestId.getAndIncrement();
        IoTestFuture fut = new IoTestFuture(id, nodes.size());
        IgniteIoTestMessage msg = new IgniteIoTestMessage(id, true, payload);
        msg.processFromNioThread(procFromNioThread);
        this.ioTestMap().put(id, fut);
        for (int i = 0; i < nodes.size(); ++i) {
            ClusterNode node = nodes.get(i);
            try {
                this.send(node, GridTopic.TOPIC_IO_TEST, (Message)msg, (byte)2);
                continue;
            }
            catch (IgniteCheckedException e) {
                this.ioTestMap().remove(msg.id());
                return new GridFinishedFuture(e);
            }
        }
        return fut;
    }

    public IgniteInternalFuture sendIoTest(ClusterNode node, byte[] payload, boolean procFromNioThread) {
        long id = this.ioTestId.getAndIncrement();
        IoTestFuture fut = new IoTestFuture(id, 1);
        IgniteIoTestMessage msg = new IgniteIoTestMessage(id, true, payload);
        msg.processFromNioThread(procFromNioThread);
        this.ioTestMap().put(id, fut);
        try {
            this.send(node, GridTopic.TOPIC_IO_TEST, (Message)msg, (byte)2);
        }
        catch (IgniteCheckedException e) {
            this.ioTestMap().remove(msg.id());
            return new GridFinishedFuture(e);
        }
        return fut;
    }

    private ConcurrentHashMap<Long, IoTestFuture> ioTestMap() {
        ConcurrentHashMap<Long, IoTestFuture> map = this.ioTestMap.get();
        if (map == null && !this.ioTestMap.compareAndSet(null, map = new ConcurrentHashMap())) {
            map = this.ioTestMap.get();
        }
        return map;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * WARNING - void declaration
     */
    @Override
    public void onKernalStart0() throws IgniteCheckedException {
        this.discoLsnr = new GridLocalEventListener(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void onEvent(Event evt) {
                assert (evt instanceof DiscoveryEvent) : "Invalid event: " + evt;
                DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
                UUID nodeId = discoEvt.eventNode().id();
                switch (evt.type()) {
                    case 10: {
                        assert (GridIoManager.this.waitMap.get(nodeId) == null);
                        break;
                    }
                    case 11: 
                    case 12: {
                        for (Map.Entry e : GridIoManager.this.msgSetMap.entrySet()) {
                            boolean empty;
                            GridCommunicationMessageSet set;
                            ConcurrentMap map;
                            ConcurrentMap concurrentMap = map = (ConcurrentMap)e.getValue();
                            synchronized (concurrentMap) {
                                set = (GridCommunicationMessageSet)map.remove(nodeId);
                                empty = map.isEmpty();
                            }
                            if (set != null) {
                                if (GridIoManager.this.log.isDebugEnabled()) {
                                    GridIoManager.this.log.debug("Removed message set due to node leaving grid: " + set);
                                }
                                GridIoManager.this.ctx.timeout().removeTimeoutObject(set);
                                GridIoManager.this.closedTopics.add(set.topic());
                            }
                            if (!empty) continue;
                            GridIoManager.this.msgSetMap.remove(e.getKey(), map);
                        }
                        GridIoManager.this.lock.writeLock().lock();
                        try {
                            ConcurrentLinkedDeque8 waitList = (ConcurrentLinkedDeque8)GridIoManager.this.waitMap.remove(nodeId);
                            if (!GridIoManager.this.log.isDebugEnabled()) break;
                            GridIoManager.this.log.debug("Removed messages from discovery startup delay list (sender node left topology): " + waitList);
                            break;
                        }
                        finally {
                            GridIoManager.this.lock.writeLock().unlock();
                        }
                    }
                    default: {
                        assert (false) : "Unexpected event: " + evt;
                        break;
                    }
                }
            }
        };
        this.ctx.event().addLocalEventListener(this.discoLsnr, 10, 11, 12);
        ArrayList<ConcurrentLinkedDeque8> delayedMsgs = new ArrayList<ConcurrentLinkedDeque8>();
        this.lock.writeLock().lock();
        try {
            this.started = true;
            for (Map.Entry entry : this.waitMap.entrySet()) {
                if (this.ctx.discovery().node((UUID)entry.getKey()) == null) continue;
                ConcurrentLinkedDeque8 waitList = (ConcurrentLinkedDeque8)this.waitMap.remove(entry.getKey());
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Processing messages from discovery startup delay list: " + waitList);
                }
                if (waitList == null) continue;
                delayedMsgs.add(waitList);
            }
        }
        finally {
            this.lock.writeLock().unlock();
        }
        if (!delayedMsgs.isEmpty()) {
            for (Collection collection : delayedMsgs) {
                for (DelayedMessage msg : collection) {
                    this.commLsnr.onMessage(msg.nodeId(), msg.message(), msg.callback());
                }
            }
        }
        for (Map.Entry entry : this.msgSetMap.entrySet()) {
            boolean rmv;
            ConcurrentMap map = (ConcurrentMap)entry.getValue();
            for (GridCommunicationMessageSet set : map.values()) {
                void rmv2;
                if (this.ctx.discovery().node(set.nodeId()) != null) continue;
                ConcurrentMap concurrentMap = map;
                synchronized (concurrentMap) {
                    boolean rmv22 = map.remove(set.nodeId(), set);
                }
                if (rmv2 == false) continue;
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Removed message set due to node leaving grid: " + set);
                }
                this.ctx.timeout().removeTimeoutObject(set);
            }
            ConcurrentMap concurrentMap = map;
            synchronized (concurrentMap) {
                rmv = map.isEmpty();
            }
            if (!rmv) continue;
            this.msgSetMap.remove(entry.getKey(), map);
            this.closedTopics.add(entry.getKey());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onKernalStop0(boolean cancel) {
        ((CommunicationSpi)this.getSpi()).setListener(null);
        boolean interrupted = false;
        while (true) {
            try {
                while (!this.busyLock.tryWriteLock(200L, TimeUnit.MILLISECONDS)) {
                    Thread.sleep(200L);
                }
            }
            catch (InterruptedException ignore) {
                interrupted = true;
                continue;
            }
            break;
        }
        try {
            GridEventStorageManager evtMgr;
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
            if ((evtMgr = this.ctx.event()) != null && this.discoLsnr != null) {
                evtMgr.removeLocalEventListener(this.discoLsnr, new int[0]);
            }
            this.stopping = true;
        }
        finally {
            this.busyLock.writeUnlock();
        }
    }

    @Override
    public void stop(boolean cancel) throws IgniteCheckedException {
        this.stopSpi();
        if (this.log.isDebugEnabled()) {
            this.log.debug(this.stopInfo());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private void onMessage0(UUID nodeId, GridIoMessage msg, IgniteRunnable msgC) {
        assert (nodeId != null);
        assert (msg != null);
        this.busyLock.readLock();
        try {
            if (this.stopping) {
                if (!this.log.isDebugEnabled()) return;
                this.log.debug("Received communication message while stopping (will ignore) [nodeId=" + nodeId + ", msg=" + msg + ']');
                return;
            }
            if (msg.topic() == null) {
                int topicOrd = msg.topicOrdinal();
                msg.topic((Object)(topicOrd >= 0 ? GridTopic.fromOrdinal(topicOrd) : U.unmarshal(this.marsh, msg.topicBytes(), U.resolveClassLoader(this.ctx.config()))));
            }
            if (!this.started) {
                this.lock.readLock().lock();
                try {
                    if (!this.started) {
                        if (this.log.isDebugEnabled()) {
                            this.log.debug("Adding message to waiting list [senderId=" + nodeId + ", msg=" + msg + ']');
                        }
                        ConcurrentLinkedDeque8 list = (ConcurrentLinkedDeque8)((Object)F.addIfAbsent(this.waitMap, nodeId, F.newDeque()));
                        assert (list != null);
                        list.add(new DelayedMessage(nodeId, msg, msgC));
                        return;
                    }
                }
                finally {
                    this.lock.readLock().unlock();
                }
            }
            byte plc = msg.policy();
            switch (plc) {
                case 1: {
                    this.processP2PMessage(nodeId, msg, msgC);
                    return;
                }
                case 0: 
                case 2: 
                case 3: 
                case 4: 
                case 5: 
                case 6: 
                case 7: 
                case 8: {
                    if (msg.isOrdered()) {
                        this.processOrderedMessage(nodeId, msg, plc, msgC);
                        return;
                    } else {
                        this.processRegularMessage(nodeId, msg, plc, msgC);
                        return;
                    }
                }
                default: {
                    assert (plc >= 0) : "Negative policy: " + plc;
                    if (GridIoPolicy.isReservedGridIoPolicy(plc)) {
                        throw new IgniteCheckedException("Failed to process message with policy of reserved range. [policy=" + plc + ']');
                    }
                    if (msg.isOrdered()) {
                        this.processOrderedMessage(nodeId, msg, plc, msgC);
                        return;
                    } else {
                        this.processRegularMessage(nodeId, msg, plc, msgC);
                    }
                    return;
                }
            }
        }
        catch (IgniteCheckedException e) {
            U.error(this.log, "Failed to process message (will ignore): " + msg, e);
            return;
        }
        finally {
            this.busyLock.readUnlock();
        }
    }

    private void processP2PMessage(final UUID nodeId, final GridIoMessage msg, final IgniteRunnable msgC) {
        Runnable c = new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                try {
                    GridNioBackPressureControl.threadProcessingMessage(true);
                    GridMessageListener lsnr = GridIoManager.this.listenerGet0(msg.topic());
                    if (lsnr == null) {
                        return;
                    }
                    Message obj = msg.message();
                    assert (obj != null);
                    GridIoManager.this.invokeListener(msg.policy(), lsnr, nodeId, obj);
                }
                finally {
                    GridNioBackPressureControl.threadProcessingMessage(false);
                    msgC.run();
                }
            }
        };
        try {
            this.pools.p2pPool().execute(c);
        }
        catch (RejectedExecutionException e) {
            U.error(this.log, "Failed to process P2P message due to execution rejection. Increase the upper bound on 'ExecutorService' provided by 'IgniteConfiguration.getPeerClassLoadingThreadPoolSize()'. Will attempt to process message in the listener thread instead.", e);
            c.run();
        }
    }

    private void processRegularMessage(final UUID nodeId, final GridIoMessage msg, byte plc, final IgniteRunnable msgC) throws IgniteCheckedException {
        IgniteIoTestMessage msg0;
        Runnable c = new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                try {
                    GridNioBackPressureControl.threadProcessingMessage(true);
                    GridIoManager.this.processRegularMessage0(msg, nodeId);
                }
                finally {
                    GridNioBackPressureControl.threadProcessingMessage(false);
                    msgC.run();
                }
            }

            public String toString() {
                return "Message closure [msg=" + msg + ']';
            }
        };
        if (msg.topicOrdinal() == GridTopic.TOPIC_IO_TEST.ordinal() && (msg0 = (IgniteIoTestMessage)msg.message()).processFromNioThread()) {
            c.run();
            return;
        }
        if (this.ctx.config().getStripedPoolSize() > 0 && plc == 2 && msg.partition() != Integer.MIN_VALUE) {
            this.ctx.getStripedExecutorService().execute(msg.partition(), c);
            return;
        }
        try {
            this.pools.poolForPolicy(plc).execute(c);
        }
        catch (RejectedExecutionException e) {
            U.error(this.log, "Failed to process regular message due to execution rejection. Increase the upper bound on 'ExecutorService' provided by 'IgniteConfiguration.getPublicThreadPoolSize()'. Will attempt to process message in the listener thread instead.", e);
            c.run();
        }
    }

    private void processRegularMessage0(GridIoMessage msg, UUID nodeId) {
        GridMessageListener lsnr = this.listenerGet0(msg.topic());
        if (lsnr == null) {
            return;
        }
        Message obj = msg.message();
        assert (obj != null);
        this.invokeListener(msg.policy(), lsnr, nodeId, obj);
    }

    @Nullable
    private GridMessageListener listenerGet0(Object topic) {
        if (topic instanceof GridTopic) {
            return this.sysLsnrs[this.systemListenerIndex(topic)];
        }
        return (GridMessageListener)this.lsnrMap.get(topic);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nullable
    private GridMessageListener listenerPutIfAbsent0(Object topic, GridMessageListener lsnr) {
        if (topic instanceof GridTopic) {
            Object object = this.sysLsnrsMux;
            synchronized (object) {
                int idx = this.systemListenerIndex(topic);
                GridMessageListener old = this.sysLsnrs[idx];
                if (old == null) {
                    this.changeSystemListener(idx, lsnr);
                }
                return old;
            }
        }
        return this.lsnrMap.putIfAbsent(topic, lsnr);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nullable
    private GridMessageListener listenerRemove0(Object topic) {
        if (topic instanceof GridTopic) {
            Object object = this.sysLsnrsMux;
            synchronized (object) {
                int idx = this.systemListenerIndex(topic);
                GridMessageListener old = this.sysLsnrs[idx];
                if (old != null) {
                    this.changeSystemListener(idx, null);
                }
                return old;
            }
        }
        return (GridMessageListener)this.lsnrMap.remove(topic);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean listenerRemove0(Object topic, GridMessageListener expected) {
        if (topic instanceof GridTopic) {
            Object object = this.sysLsnrsMux;
            synchronized (object) {
                return this.systemListenerChange(topic, expected, null);
            }
        }
        return this.lsnrMap.remove(topic, expected);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean listenerReplace0(Object topic, GridMessageListener expected, GridMessageListener newVal) {
        if (topic instanceof GridTopic) {
            Object object = this.sysLsnrsMux;
            synchronized (object) {
                return this.systemListenerChange(topic, expected, newVal);
            }
        }
        return this.lsnrMap.replace(topic, expected, newVal);
    }

    private boolean systemListenerChange(Object topic, GridMessageListener expected, GridMessageListener newVal) {
        assert (Thread.holdsLock(this.sysLsnrsMux));
        assert (topic instanceof GridTopic);
        int idx = this.systemListenerIndex(topic);
        GridMessageListener old = this.sysLsnrs[idx];
        if (old != null && old.equals(expected)) {
            this.changeSystemListener(idx, newVal);
            return true;
        }
        return false;
    }

    private void changeSystemListener(int idx, @Nullable GridMessageListener lsnr) {
        assert (Thread.holdsLock(this.sysLsnrsMux));
        GridMessageListener[] res = new GridMessageListener[this.sysLsnrs.length];
        System.arraycopy(this.sysLsnrs, 0, res, 0, this.sysLsnrs.length);
        res[idx] = lsnr;
        this.sysLsnrs = res;
    }

    private int systemListenerIndex(Object topic) {
        assert (topic instanceof GridTopic);
        return ((GridTopic)((Object)topic)).ordinal();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processOrderedMessage(UUID nodeId, GridIoMessage msg, byte plc, @Nullable IgniteRunnable msgC) throws IgniteCheckedException {
        GridMessageListener lsnr;
        ConcurrentHashMap0<UUID, GridCommunicationMessageSet> concurrentHashMap0;
        ConcurrentMap map;
        GridCommunicationMessageSet set;
        boolean isNew;
        block32: {
            assert (msg != null);
            long timeout = msg.timeout();
            boolean skipOnTimeout = msg.skipOnTimeout();
            isNew = false;
            set = null;
            while (true) {
                if ((map = (ConcurrentHashMap0<UUID, GridCommunicationMessageSet>)this.msgSetMap.get(msg.topic())) == null) {
                    set = new GridCommunicationMessageSet(plc, msg.topic(), nodeId, timeout, skipOnTimeout, msg, msgC);
                    map = new ConcurrentHashMap0<UUID, GridCommunicationMessageSet>();
                    map.put(nodeId, set);
                    ConcurrentMap old = this.msgSetMap.putIfAbsent(msg.topic(), map);
                    if (old == null) {
                        isNew = true;
                        break block32;
                    }
                    map = old;
                }
                boolean rmv = false;
                concurrentHashMap0 = map;
                synchronized (concurrentHashMap0) {
                    if (map.isEmpty()) {
                        rmv = true;
                    } else {
                        set = (GridCommunicationMessageSet)map.get(nodeId);
                        if (set == null) {
                            set = new GridCommunicationMessageSet(plc, msg.topic(), nodeId, timeout, skipOnTimeout, msg, msgC);
                            GridCommunicationMessageSet old = map.putIfAbsent(nodeId, set);
                            assert (old == null);
                            isNew = true;
                            break block32;
                        }
                    }
                }
                if (!rmv) break;
                this.msgSetMap.remove(msg.topic(), map);
            }
            assert (set != null);
            assert (!isNew);
            set.add(msg, msgC);
        }
        if (isNew && this.ctx.discovery().node(nodeId) == null) {
            boolean rmv;
            if (this.log.isDebugEnabled()) {
                this.log.debug("Message is ignored as sender has left the grid: " + msg);
            }
            assert (map != null);
            concurrentHashMap0 = map;
            synchronized (concurrentHashMap0) {
                map.remove(nodeId);
                rmv = map.isEmpty();
            }
            if (rmv) {
                this.msgSetMap.remove(msg.topic(), map);
            }
            return;
        }
        if (isNew && set.endTime() != Long.MAX_VALUE) {
            this.ctx.timeout().addTimeoutObject(set);
        }
        if ((lsnr = this.listenerGet0(msg.topic())) == null) {
            if (this.closedTopics.contains(msg.topic())) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Message is ignored as it came for the closed topic: " + msg);
                }
                assert (map != null);
                this.msgSetMap.remove(msg.topic(), map);
            } else if (this.log.isDebugEnabled()) {
                this.log.debug("Received message for unknown listener (messages will be kept until a listener is registered): " + msg);
            }
            if (msgC != null) {
                msgC.run();
            }
            return;
        }
        if (msgC == null) {
            assert (this.locNodeId.equals(nodeId));
            this.unwindMessageSet(set, lsnr);
            return;
        }
        final GridCommunicationMessageSet msgSet0 = set;
        Runnable c = new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                try {
                    GridNioBackPressureControl.threadProcessingMessage(true);
                    GridIoManager.this.unwindMessageSet(msgSet0, lsnr);
                }
                finally {
                    GridNioBackPressureControl.threadProcessingMessage(false);
                }
            }
        };
        try {
            this.pools.poolForPolicy(plc).execute(c);
        }
        catch (RejectedExecutionException e) {
            U.error(this.log, "Failed to process ordered message due to execution rejection. Increase the upper bound on executor service provided by corresponding configuration property. Will attempt to process message in the listener thread instead [msgPlc=" + plc + ']', e);
            c.run();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void unwindMessageSet(GridCommunicationMessageSet msgSet, GridMessageListener lsnr) {
        block7: {
            while (msgSet.reserve()) {
                try {
                    msgSet.unwind(lsnr);
                }
                finally {
                    msgSet.release();
                }
                if (msgSet.changed()) continue;
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Message set has not been changed: " + msgSet);
                }
                break block7;
            }
            if (this.log.isDebugEnabled()) {
                this.log.debug("Another thread owns reservation: " + msgSet);
            }
            return;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void invokeListener(Byte plc, GridMessageListener lsnr, UUID nodeId, Object msg) {
        Byte oldPlc = CUR_PLC.get();
        boolean change = F.eq(oldPlc, plc);
        if (change) {
            CUR_PLC.set(plc);
        }
        try {
            lsnr.onMessage(nodeId, msg);
        }
        finally {
            if (change) {
                CUR_PLC.set(oldPlc);
            }
        }
    }

    @Nullable
    public static Byte currentPolicy() {
        return CUR_PLC.get();
    }

    private void send(ClusterNode node, Object topic, int topicOrd, Message msg, byte plc, boolean ordered, long timeout, boolean skipOnTimeout, IgniteInClosure<IgniteException> ackC, boolean async) throws IgniteCheckedException {
        assert (node != null);
        assert (topic != null);
        assert (msg != null);
        assert (!async || msg instanceof GridIoUserMessage) : msg;
        GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout);
        if (this.locNodeId.equals(node.id())) {
            assert (plc != 1);
            CommunicationListener<Serializable> commLsnr = this.commLsnr;
            if (commLsnr == null) {
                throw new IgniteCheckedException("Trying to send message when grid is not fully started.");
            }
            if (ordered) {
                this.processOrderedMessage(this.locNodeId, ioMsg, plc, null);
            } else if (async) {
                this.processRegularMessage(this.locNodeId, ioMsg, plc, NOOP);
            } else {
                this.processRegularMessage0(ioMsg, this.locNodeId);
            }
            if (ackC != null) {
                ackC.apply(null);
            }
        } else {
            if (topicOrd < 0) {
                ioMsg.topicBytes(U.marshal(this.marsh, topic));
            }
            try {
                if ((CommunicationSpi)this.getSpi() instanceof TcpCommunicationSpi) {
                    ((TcpCommunicationSpi)((CommunicationSpi)this.getSpi())).sendMessage(node, ioMsg, ackC);
                } else {
                    ((CommunicationSpi)this.getSpi()).sendMessage(node, ioMsg);
                }
            }
            catch (IgniteSpiException e) {
                throw new IgniteCheckedException("Failed to send message (node may have left the grid or TCP connection cannot be established due to firewall issues) [node=" + node + ", topic=" + topic + ", msg=" + msg + ", policy=" + plc + ']', e);
            }
        }
    }

    public void send(UUID nodeId, Object topic, Message msg, byte plc) throws IgniteCheckedException {
        ClusterNode node = this.ctx.discovery().node(nodeId);
        if (node == null) {
            throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
        }
        this.send(node, topic, msg, plc);
    }

    public void send(UUID nodeId, GridTopic topic, Message msg, byte plc) throws IgniteCheckedException {
        ClusterNode node = this.ctx.discovery().node(nodeId);
        if (node == null) {
            throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
        }
        this.send(node, (Object)topic, topic.ordinal(), msg, plc, false, 0L, false, null, false);
    }

    public void send(ClusterNode node, Object topic, Message msg, byte plc) throws IgniteCheckedException {
        this.send(node, topic, -1, msg, plc, false, 0L, false, null, false);
    }

    public void send(ClusterNode node, GridTopic topic, Message msg, byte plc) throws IgniteCheckedException {
        this.send(node, (Object)topic, topic.ordinal(), msg, plc, false, 0L, false, null, false);
    }

    public void send(ClusterNode node, Object topic, int topicOrd, Message msg, byte plc) throws IgniteCheckedException {
        this.send(node, topic, topicOrd, msg, plc, false, 0L, false, null, false);
    }

    public void sendOrderedMessage(ClusterNode node, Object topic, Message msg, byte plc, long timeout, boolean skipOnTimeout) throws IgniteCheckedException {
        assert (timeout > 0L || skipOnTimeout);
        this.send(node, topic, -1, msg, plc, true, timeout, skipOnTimeout, null, false);
    }

    public void send(ClusterNode node, GridTopic topic, Message msg, byte plc, IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
        this.send(node, (Object)topic, topic.ordinal(), msg, plc, false, 0L, false, ackC, false);
    }

    public void sendOrderedMessage(Collection<? extends ClusterNode> nodes, Object topic, Message msg, byte plc, long timeout, boolean skipOnTimeout) throws IgniteCheckedException {
        assert (timeout > 0L || skipOnTimeout);
        this.send(nodes, topic, -1, msg, plc, true, timeout, skipOnTimeout);
    }

    public void send(ClusterNode node, Object topic, Message msg, byte plc, IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
        this.send(node, topic, -1, msg, plc, false, 0L, false, ackC, false);
    }

    public void send(Collection<? extends ClusterNode> nodes, Object topic, Message msg, byte plc) throws IgniteCheckedException {
        this.send(nodes, topic, -1, msg, plc, false, 0L, false);
    }

    public void send(Collection<? extends ClusterNode> nodes, GridTopic topic, Message msg, byte plc) throws IgniteCheckedException {
        this.send(nodes, (Object)topic, topic.ordinal(), msg, plc, false, 0L, false);
    }

    public void sendOrderedMessage(ClusterNode node, Object topic, Message msg, byte plc, long timeout, boolean skipOnTimeout, IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
        assert (timeout > 0L || skipOnTimeout);
        this.send(node, topic, -1, msg, plc, true, timeout, skipOnTimeout, ackC, false);
    }

    public void sendUserMessage(Collection<? extends ClusterNode> nodes, Object msg) throws IgniteCheckedException {
        this.sendUserMessage(nodes, msg, null, false, 0L, false);
    }

    public void sendUserMessage(Collection<? extends ClusterNode> nodes, Object msg, @Nullable Object topic, boolean ordered, long timeout, boolean async) throws IgniteCheckedException {
        boolean loc = nodes.size() == 1 && F.first(nodes).id().equals(this.locNodeId);
        byte[] serMsg = null;
        byte[] serTopic = null;
        if (!loc) {
            serMsg = U.marshal(this.marsh, msg);
            if (topic != null) {
                serTopic = U.marshal(this.marsh, topic);
            }
        }
        GridDeployment dep = null;
        String depClsName = null;
        if (this.ctx.config().isPeerClassLoadingEnabled()) {
            Class<?> cls0 = U.detectClass(msg);
            if (U.isJdk(cls0) && topic != null) {
                cls0 = U.detectClass(topic);
            }
            if ((dep = this.ctx.deploy().deploy(cls0, U.detectClassLoader(cls0))) == null) {
                throw new IgniteDeploymentCheckedException("Failed to deploy user message: " + msg);
            }
            depClsName = cls0.getName();
        }
        GridIoUserMessage ioMsg = new GridIoUserMessage(msg, serMsg, depClsName, topic, serTopic, dep != null ? dep.classLoaderId() : null, dep != null ? dep.deployMode() : null, dep != null ? dep.userVersion() : null, dep != null ? dep.participants() : null);
        if (ordered) {
            this.sendOrderedMessage(nodes, (Object)GridTopic.TOPIC_COMM_USER, (Message)ioMsg, (byte)0, timeout, true);
        } else if (loc) {
            this.send(F.first(nodes), (Object)GridTopic.TOPIC_COMM_USER, GridTopic.TOPIC_COMM_USER.ordinal(), ioMsg, (byte)0, false, 0L, false, null, async);
        } else {
            ClusterNode locNode = F.find(nodes, null, F.localNode(this.locNodeId));
            Collection<? extends ClusterNode> rmtNodes = F.view(nodes, F.remoteNodes(this.locNodeId));
            if (!rmtNodes.isEmpty()) {
                this.send(rmtNodes, GridTopic.TOPIC_COMM_USER, (Message)ioMsg, (byte)0);
            }
            if (locNode != null) {
                this.send(locNode, (Object)GridTopic.TOPIC_COMM_USER, GridTopic.TOPIC_COMM_USER.ordinal(), ioMsg, (byte)0, false, 0L, false, null, async);
            }
        }
    }

    public void addUserMessageListener(@Nullable Object topic, @Nullable IgniteBiPredicate<UUID, ?> p) {
        if (p != null) {
            try {
                if (p instanceof PlatformMessageFilter) {
                    ((PlatformMessageFilter)p).initialize(this.ctx);
                } else {
                    this.ctx.resource().injectGeneric(p);
                }
                this.addMessageListener(GridTopic.TOPIC_COMM_USER, (GridMessageListener)new GridUserMessageListener(topic, p));
            }
            catch (IgniteCheckedException e) {
                throw new IgniteException(e);
            }
        }
    }

    public void removeUserMessageListener(@Nullable Object topic, IgniteBiPredicate<UUID, ?> p) {
        try {
            this.removeMessageListener(GridTopic.TOPIC_COMM_USER, (GridMessageListener)new GridUserMessageListener(topic, p));
        }
        catch (IgniteCheckedException e) {
            throw new IgniteException(e);
        }
    }

    private void send(Collection<? extends ClusterNode> nodes, Object topic, int topicOrd, Message msg, byte plc, boolean ordered, long timeout, boolean skipOnTimeout) throws IgniteCheckedException {
        assert (nodes != null);
        assert (topic != null);
        assert (msg != null);
        if (!ordered) assert (F.find(nodes, null, F.localNode(this.locNodeId)) == null) : "Internal Ignite code should never call the method with local node in a node list.";
        try {
            if (!nodes.isEmpty()) {
                for (ClusterNode clusterNode : nodes) {
                    this.send(clusterNode, topic, topicOrd, msg, plc, ordered, timeout, skipOnTimeout, null, false);
                }
            } else if (this.log.isDebugEnabled()) {
                this.log.debug("Failed to send message to empty nodes collection [topic=" + topic + ", msg=" + msg + ", policy=" + plc + ']');
            }
        }
        catch (IgniteSpiException e) {
            throw new IgniteCheckedException("Failed to send message (nodes may have left the grid or TCP connection cannot be established due to firewall issues) [nodes=" + nodes + ", topic=" + topic + ", msg=" + msg + ", policy=" + plc + ']', e);
        }
    }

    public void addMessageListener(GridTopic topic, GridMessageListener lsnr) {
        this.addMessageListener((Object)topic, lsnr);
    }

    public void addDisconnectListener(GridDisconnectListener lsnr) {
        this.disconnectLsnrs.add(lsnr);
    }

    public void removeDisconnectListener(GridDisconnectListener lsnr) {
        this.disconnectLsnrs.remove(lsnr);
    }

    public void addMessageListener(Object topic, GridMessageListener lsnr) {
        Collection msgSets;
        GridMessageListener lsnrs;
        assert (lsnr != null);
        assert (topic != null);
        this.closedTopics.remove(topic);
        while (true) {
            if ((lsnrs = this.listenerPutIfAbsent0(topic, lsnr)) == null) {
                lsnrs = lsnr;
                break;
            }
            assert (lsnrs != null);
            if (!(lsnrs instanceof ArrayListener)) {
                ArrayListener arrLsnr = new ArrayListener(lsnrs, lsnr);
                if (!this.listenerReplace0(topic, lsnrs, arrLsnr)) continue;
                lsnrs = arrLsnr;
                break;
            }
            if (((ArrayListener)lsnrs).add(lsnr)) break;
            this.listenerRemove0(topic, lsnrs);
        }
        Map map = (Map)this.msgSetMap.get(topic);
        Collection collection = msgSets = map != null ? map.values() : null;
        if (msgSets != null) {
            final GridMessageListener lsnrs0 = lsnrs;
            try {
                for (final GridCommunicationMessageSet msgSet : msgSets) {
                    this.pools.poolForPolicy(msgSet.policy()).execute(new Runnable(){

                        @Override
                        public void run() {
                            GridIoManager.this.unwindMessageSet(msgSet, lsnrs0);
                        }
                    });
                }
            }
            catch (RejectedExecutionException e) {
                U.error(this.log, "Failed to process delayed message due to execution rejection. Increase the upper bound on executor service provided in 'IgniteConfiguration.getPublicThreadPoolSize()'). Will attempt to process message in the listener thread instead.", e);
                for (GridCommunicationMessageSet msgSet : msgSets) {
                    this.unwindMessageSet(msgSet, lsnr);
                }
            }
            catch (IgniteCheckedException ice) {
                throw new IgniteException(ice);
            }
        }
    }

    public boolean removeMessageListener(GridTopic topic) {
        return this.removeMessageListener((Object)topic);
    }

    public boolean removeMessageListener(Object topic) {
        return this.removeMessageListener(topic, null);
    }

    public boolean removeMessageListener(GridTopic topic, @Nullable GridMessageListener lsnr) {
        return this.removeMessageListener((Object)topic, lsnr);
    }

    public boolean removeMessageListener(Object topic, @Nullable GridMessageListener lsnr) {
        Collection msgSets;
        boolean rmv;
        block16: {
            boolean empty;
            block20: {
                GridMessageListener lsnrs;
                block17: {
                    block18: {
                        block19: {
                            assert (topic != null);
                            rmv = true;
                            msgSets = null;
                            if (lsnr != null) break block19;
                            this.closedTopics.add(topic);
                            lsnr = this.listenerRemove0(topic);
                            rmv = lsnr != null;
                            Map map = (Map)this.msgSetMap.remove(topic);
                            if (map != null) {
                                msgSets = map.values();
                            }
                            break block16;
                        }
                        do {
                            if ((lsnrs = this.listenerGet0(topic)) == null) {
                                this.closedTopics.add(topic);
                                Map map = (Map)this.msgSetMap.remove(topic);
                                if (map != null) {
                                    msgSets = map.values();
                                }
                                rmv = false;
                                break block16;
                            }
                            empty = false;
                            if (lsnrs instanceof ArrayListener) break block17;
                            if (!lsnrs.equals(lsnr)) break block18;
                        } while (!this.listenerRemove0(topic, lsnrs));
                        empty = true;
                        break block20;
                    }
                    rmv = false;
                    break block20;
                }
                ArrayListener arrLsnr = (ArrayListener)lsnrs;
                if (arrLsnr.remove(lsnr)) {
                    empty = arrLsnr.isEmpty();
                } else {
                    rmv = false;
                }
                if (empty) {
                    this.listenerRemove0(topic, lsnrs);
                }
            }
            if (empty) {
                this.closedTopics.add(topic);
                Map map = (Map)this.msgSetMap.remove(topic);
                if (map != null) {
                    msgSets = map.values();
                }
            }
        }
        if (msgSets != null) {
            for (GridCommunicationMessageSet msgSet : msgSets) {
                this.ctx.timeout().removeTimeoutObject(msgSet);
            }
        }
        if (rmv && this.log.isDebugEnabled()) {
            this.log.debug("Removed message listener [topic=" + topic + ", lsnr=" + lsnr + ']');
        }
        if (lsnr instanceof ArrayListener) {
            for (GridMessageListener childLsnr : ((ArrayListener)lsnr).arr) {
                this.closeListener(childLsnr);
            }
        } else {
            this.closeListener(lsnr);
        }
        return rmv;
    }

    private void closeListener(GridMessageListener lsnr) {
        GridUserMessageListener userLsnr;
        if (lsnr instanceof GridUserMessageListener && (userLsnr = (GridUserMessageListener)lsnr).predLsnr instanceof PlatformMessageFilter) {
            ((PlatformMessageFilter)userLsnr.predLsnr).onClose();
        }
    }

    public int getSentMessagesCount() {
        return ((CommunicationSpi)this.getSpi()).getSentMessagesCount();
    }

    public long getSentBytesCount() {
        return ((CommunicationSpi)this.getSpi()).getSentBytesCount();
    }

    public int getReceivedMessagesCount() {
        return ((CommunicationSpi)this.getSpi()).getReceivedMessagesCount();
    }

    public long getReceivedBytesCount() {
        return ((CommunicationSpi)this.getSpi()).getReceivedBytesCount();
    }

    public int getOutboundMessagesQueueSize() {
        return ((CommunicationSpi)this.getSpi()).getOutboundMessagesQueueSize();
    }

    public void dumpStats() {
        CommunicationSpi spi = (CommunicationSpi)this.getSpi();
        if (spi instanceof TcpCommunicationSpi) {
            ((TcpCommunicationSpi)spi).dumpStats();
        }
    }

    @Override
    public void printMemoryStats() {
        X.println(">>>", new Object[0]);
        X.println(">>> IO manager memory stats [grid=" + this.ctx.gridName() + ']', new Object[0]);
        X.println(">>>  lsnrMapSize: " + this.lsnrMap.size(), new Object[0]);
        X.println(">>>  msgSetMapSize: " + this.msgSetMap.size(), new Object[0]);
        X.println(">>>  closedTopicsSize: " + this.closedTopics.sizex(), new Object[0]);
        X.println(">>>  discoWaitMapSize: " + this.waitMap.size(), new Object[0]);
    }

    private class IoTestFuture
    extends GridFutureAdapter<Object> {
        private final long id;
        private int cntr;

        IoTestFuture(long id, int cntr) {
            assert (cntr > 0) : cntr;
            this.id = id;
            this.cntr = cntr;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void onResponse() {
            boolean complete;
            IoTestFuture ioTestFuture = this;
            synchronized (ioTestFuture) {
                complete = --this.cntr == 0;
            }
            if (complete) {
                this.onDone();
            }
        }

        @Override
        public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
            if (super.onDone(res, err)) {
                GridIoManager.this.ioTestMap().remove(this.id);
                return true;
            }
            return false;
        }

        @Override
        public String toString() {
            return S.toString(IoTestFuture.class, this);
        }
    }

    private static class DelayedMessage {
        private final UUID nodeId;
        private final GridIoMessage msg;
        private final IgniteRunnable msgC;

        private DelayedMessage(UUID nodeId, GridIoMessage msg, IgniteRunnable msgC) {
            this.nodeId = nodeId;
            this.msg = msg;
            this.msgC = msgC;
        }

        public IgniteRunnable callback() {
            return this.msgC;
        }

        public GridIoMessage message() {
            return this.msg;
        }

        public UUID nodeId() {
            return this.nodeId;
        }

        public String toString() {
            return S.toString(DelayedMessage.class, this, super.toString());
        }
    }

    private static class ConcurrentHashMap0<K, V>
    extends ConcurrentHashMap8<K, V> {
        private static final long serialVersionUID = 0L;
        private int hash;

        private ConcurrentHashMap0() {
        }

        @Override
        public boolean equals(Object o) {
            return o == this;
        }

        @Override
        public int hashCode() {
            if (this.hash == 0) {
                int hash0 = System.identityHashCode(this);
                this.hash = hash0 != 0 ? hash0 : -1;
            }
            return this.hash;
        }
    }

    private class GridCommunicationMessageSet
    implements GridTimeoutObject {
        private final UUID nodeId;
        private long endTime;
        private final IgniteUuid timeoutId;
        @GridToStringInclude
        private final Object topic;
        private final byte plc;
        @GridToStringInclude
        private final Queue<GridTuple3<GridIoMessage, Long, IgniteRunnable>> msgs = new ConcurrentLinkedDeque<GridTuple3<GridIoMessage, Long, IgniteRunnable>>();
        private final AtomicBoolean reserved = new AtomicBoolean();
        private final long timeout;
        private final boolean skipOnTimeout;
        private long lastTs;

        GridCommunicationMessageSet(byte plc, Object topic, UUID nodeId, long timeout, boolean skipOnTimeout, @Nullable GridIoMessage msg, IgniteRunnable msgC) {
            assert (nodeId != null);
            assert (topic != null);
            assert (msg != null);
            this.plc = plc;
            this.nodeId = nodeId;
            this.topic = topic;
            this.timeout = timeout == 0L ? GridIoManager.this.ctx.config().getNetworkTimeout() : timeout;
            this.skipOnTimeout = skipOnTimeout;
            this.endTime = this.endTime(timeout);
            this.timeoutId = IgniteUuid.randomUuid();
            this.lastTs = U.currentTimeMillis();
            this.msgs.add(F.t(msg, this.lastTs, msgC));
        }

        @Override
        public IgniteUuid timeoutId() {
            return this.timeoutId;
        }

        @Override
        public long endTime() {
            return this.endTime;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onTimeout() {
            ConcurrentMap map;
            GridMessageListener lsnr = GridIoManager.this.listenerGet0(this.topic);
            if (lsnr != null) {
                long delta = 0L;
                if (this.skipOnTimeout) {
                    while (true) {
                        delta = 0L;
                        boolean unwind = false;
                        GridCommunicationMessageSet gridCommunicationMessageSet = this;
                        synchronized (gridCommunicationMessageSet) {
                            if (!this.msgs.isEmpty() && (delta = U.currentTimeMillis() - this.lastTs) >= this.timeout) {
                                unwind = true;
                            }
                        }
                        if (!unwind) break;
                        GridIoManager.this.unwindMessageSet(this, lsnr);
                    }
                }
                this.endTime = this.endTime(this.timeout - delta);
                GridIoManager.this.ctx.timeout().addTimeoutObject(this);
                return;
            }
            if (GridIoManager.this.log.isDebugEnabled()) {
                GridIoManager.this.log.debug("Removing message set due to timeout: " + this);
            }
            if ((map = (ConcurrentMap)GridIoManager.this.msgSetMap.get(this.topic)) != null) {
                boolean rmv;
                ConcurrentMap concurrentMap = map;
                synchronized (concurrentMap) {
                    rmv = map.remove(this.nodeId, this) && map.isEmpty();
                }
                if (rmv) {
                    GridIoManager.this.msgSetMap.remove(this.topic, map);
                }
            }
        }

        UUID nodeId() {
            return this.nodeId;
        }

        byte policy() {
            return this.plc;
        }

        Object topic() {
            return this.topic;
        }

        boolean reserve() {
            return this.reserved.compareAndSet(false, true);
        }

        boolean reserved() {
            return this.reserved.get();
        }

        void release() {
            assert (this.reserved.get()) : "Message set was not reserved: " + this;
            this.reserved.set(false);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void unwind(GridMessageListener lsnr) {
            assert (this.reserved.get());
            GridTuple3<GridIoMessage, Long, IgniteRunnable> t = this.msgs.poll();
            while (t != null) {
                try {
                    GridIoManager.this.invokeListener(this.plc, lsnr, this.nodeId, t.get1().message());
                }
                finally {
                    if (t.get3() != null) {
                        t.get3().run();
                    }
                }
                t = this.msgs.poll();
            }
        }

        void add(GridIoMessage msg, @Nullable IgniteRunnable msgC) {
            this.msgs.add(F.t(msg, U.currentTimeMillis(), msgC));
        }

        boolean changed() {
            return !this.msgs.isEmpty();
        }

        private long endTime(long timeout) {
            long endTime = U.currentTimeMillis() + timeout;
            if (endTime < 0L) {
                endTime = Long.MAX_VALUE;
            }
            return endTime;
        }

        public String toString() {
            return S.toString(GridCommunicationMessageSet.class, this);
        }
    }

    private class GridUserMessageListener
    implements GridMessageListener {
        private final IgniteBiPredicate<UUID, Object> predLsnr;
        private final Object topic;

        GridUserMessageListener(@Nullable Object topic, IgniteBiPredicate<UUID, Object> predLsnr) throws IgniteCheckedException {
            this.topic = topic;
            this.predLsnr = predLsnr;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onMessage(UUID nodeId, Object msg) {
            if (!(msg instanceof GridIoUserMessage)) {
                U.error(GridIoManager.this.log, "Received unknown message (potentially fatal problem): " + msg);
                return;
            }
            GridIoUserMessage ioMsg = (GridIoUserMessage)msg;
            ClusterNode node = GridIoManager.this.ctx.discovery().node(nodeId);
            if (node == null) {
                U.warn(GridIoManager.this.log, "Failed to resolve sender node (did the node left grid?): " + nodeId);
                return;
            }
            GridIoManager.this.busyLock.readLock();
            try {
                GridDeployment dep;
                Object msgBody;
                block19: {
                    if (GridIoManager.this.stopping) {
                        if (GridIoManager.this.log.isDebugEnabled()) {
                            GridIoManager.this.log.debug("Received user message while stopping (will ignore) [nodeId=" + nodeId + ", msg=" + msg + ']');
                        }
                        return;
                    }
                    msgBody = ioMsg.body();
                    assert (msgBody != null || ioMsg.bodyBytes() != null);
                    byte[] msgTopicBytes = ioMsg.topicBytes();
                    Object msgTopic = ioMsg.topic();
                    dep = ioMsg.deployment();
                    if (dep == null && GridIoManager.this.ctx.config().isPeerClassLoadingEnabled() && ioMsg.deploymentClassName() != null) {
                        dep = GridIoManager.this.ctx.deploy().getGlobalDeployment(ioMsg.deploymentMode(), ioMsg.deploymentClassName(), ioMsg.deploymentClassName(), ioMsg.userVersion(), nodeId, ioMsg.classLoaderId(), ioMsg.loaderParticipants(), null);
                        if (dep == null) {
                            throw new IgniteDeploymentCheckedException("Failed to obtain deployment information for user message. If you are using custom message or topic class, try implementing GridPeerDeployAware interface. [msg=" + ioMsg + ']');
                        }
                        ioMsg.deployment(dep);
                    }
                    if (msgTopic == null && msgTopicBytes != null) {
                        msgTopic = U.unmarshal(GridIoManager.this.marsh, msgTopicBytes, U.resolveClassLoader(dep != null ? dep.classLoader() : null, GridIoManager.this.ctx.config()));
                        ioMsg.topic(msgTopic);
                    }
                    if (F.eq(this.topic, msgTopic)) break block19;
                    return;
                }
                try {
                    if (msgBody == null) {
                        msgBody = U.unmarshal(GridIoManager.this.marsh, ioMsg.bodyBytes(), U.resolveClassLoader(dep != null ? dep.classLoader() : null, GridIoManager.this.ctx.config()));
                        ioMsg.body(msgBody);
                    }
                    if (dep != null) {
                        GridIoManager.this.ctx.resource().inject(dep, dep.deployedClass(ioMsg.deploymentClassName(), new String[0]), msgBody);
                    }
                }
                catch (IgniteCheckedException e) {
                    U.error(GridIoManager.this.log, "Failed to unmarshal user message [node=" + nodeId + ", message=" + msg + ']', e);
                }
                if (msgBody != null && this.predLsnr != null && !this.predLsnr.apply(nodeId, msgBody)) {
                    GridIoManager.this.removeMessageListener(GridTopic.TOPIC_COMM_USER, (GridMessageListener)this);
                }
            }
            finally {
                GridIoManager.this.busyLock.readUnlock();
            }
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            GridUserMessageListener l = (GridUserMessageListener)o;
            return F.eq(this.predLsnr, l.predLsnr) && F.eq(this.topic, l.topic);
        }

        public int hashCode() {
            int res = this.predLsnr != null ? this.predLsnr.hashCode() : 0;
            res = 31 * res + (this.topic != null ? this.topic.hashCode() : 0);
            return res;
        }

        public String toString() {
            return S.toString(GridUserMessageListener.class, this);
        }
    }

    private static class ArrayListener
    implements GridMessageListener {
        private volatile GridMessageListener[] arr;

        ArrayListener(GridMessageListener ... arr) {
            this.arr = arr;
        }

        @Override
        public void onMessage(UUID nodeId, Object msg) {
            GridMessageListener[] arr0 = this.arr;
            if (arr0 == null) {
                return;
            }
            for (GridMessageListener l : arr0) {
                l.onMessage(nodeId, msg);
            }
        }

        boolean isEmpty() {
            return this.arr == null;
        }

        synchronized boolean remove(GridMessageListener l) {
            GridMessageListener[] arr0 = this.arr;
            if (arr0 == null) {
                return false;
            }
            if (arr0.length == 1) {
                if (!arr0[0].equals(l)) {
                    return false;
                }
                this.arr = null;
                return true;
            }
            for (int i = 0; i < arr0.length; ++i) {
                if (!arr0[i].equals(l)) continue;
                int newLen = arr0.length - 1;
                if (i == newLen) {
                    this.arr = Arrays.copyOf(arr0, newLen);
                } else {
                    GridMessageListener[] arr1 = new GridMessageListener[newLen];
                    if (i != 0) {
                        System.arraycopy(arr0, 0, arr1, 0, i);
                    }
                    System.arraycopy(arr0, i + 1, arr1, i, newLen - i);
                    this.arr = arr1;
                }
                return true;
            }
            return false;
        }

        synchronized boolean add(GridMessageListener l) {
            GridMessageListener[] arr0 = this.arr;
            if (arr0 == null) {
                return false;
            }
            int oldLen = arr0.length;
            arr0 = Arrays.copyOf(arr0, oldLen + 1);
            arr0[oldLen] = l;
            this.arr = arr0;
            return true;
        }
    }
}

