/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.spi.communication.tcp;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.AbstractInterruptibleChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.AddressResolver;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.util.GridConcurrentFactory;
import org.apache.ignite.internal.util.GridSpinReadWriteLock;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.ipc.IpcEndpoint;
import org.apache.ignite.internal.util.ipc.IpcToNioAdapter;
import org.apache.ignite.internal.util.ipc.shmem.IpcOutOfSystemResourcesException;
import org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryServerEndpoint;
import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
import org.apache.ignite.internal.util.nio.GridCommunicationClient;
import org.apache.ignite.internal.util.nio.GridConnectionBytesVerifyFilter;
import org.apache.ignite.internal.util.nio.GridDirectParser;
import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
import org.apache.ignite.internal.util.nio.GridNioFilter;
import org.apache.ignite.internal.util.nio.GridNioMessageReaderFactory;
import org.apache.ignite.internal.util.nio.GridNioMessageTracker;
import org.apache.ignite.internal.util.nio.GridNioMessageWriterFactory;
import org.apache.ignite.internal.util.nio.GridNioMetricsListener;
import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
import org.apache.ignite.internal.util.nio.GridNioServer;
import org.apache.ignite.internal.util.nio.GridNioServerListener;
import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter;
import org.apache.ignite.internal.util.nio.GridNioSession;
import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
import org.apache.ignite.internal.util.nio.GridShmemCommunicationClient;
import org.apache.ignite.internal.util.nio.GridTcpNioCommunicationClient;
import org.apache.ignite.internal.util.nio.ssl.BlockingSslHandler;
import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
import org.apache.ignite.internal.util.nio.ssl.GridSslMeta;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.plugin.extensions.communication.MessageFormatter;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.IgnitePortProtocol;
import org.apache.ignite.spi.IgniteSpiAdapter;
import org.apache.ignite.spi.IgniteSpiConfiguration;
import org.apache.ignite.spi.IgniteSpiConsistencyChecked;
import org.apache.ignite.spi.IgniteSpiContext;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
import org.apache.ignite.spi.IgniteSpiThread;
import org.apache.ignite.spi.IgniteSpiTimeoutObject;
import org.apache.ignite.spi.communication.CommunicationListener;
import org.apache.ignite.spi.communication.CommunicationSpi;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiMBean;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentLinkedDeque8;
import org.jsr166.LongAdder8;

@IgniteSpiMultipleInstancesSupport(value=true)
@IgniteSpiConsistencyChecked(optional=false)
public class TcpCommunicationSpi
extends IgniteSpiAdapter
implements CommunicationSpi<Message>,
TcpCommunicationSpiMBean {
    private static final IgniteProductVersion MULTIPLE_CONN_SINCE_VER = IgniteProductVersion.fromString("1.8.2");
    public static final String OUT_OF_RESOURCES_TCP_MSG = "Failed to allocate shared memory segment (switching to TCP, may be slower).";
    public static final String ATTR_ADDRS = "comm.tcp.addrs";
    public static final String ATTR_HOST_NAMES = "comm.tcp.host.names";
    public static final String ATTR_PORT = "comm.tcp.port";
    public static final String ATTR_SHMEM_PORT = "comm.shmem.tcp.port";
    public static final String ATTR_EXT_ADDRS = "comm.tcp.ext-addrs";
    public static final String ATTR_PAIRED_CONN = "comm.tcp.pairedConnection";
    public static final int DFLT_PORT = 47100;
    public static final int DFLT_SHMEM_PORT = -1;
    public static final long DFLT_IDLE_CONN_TIMEOUT = 30000L;
    public static final int DFLT_SOCK_BUF_SIZE = 32768;
    public static final long DFLT_CONN_TIMEOUT = 5000L;
    public static final long DFLT_MAX_CONN_TIMEOUT = 600000L;
    public static final int DFLT_RECONNECT_CNT = 10;
    public static final int DFLT_MSG_QUEUE_LIMIT = 0;
    public static final int DFLT_SELECTORS_CNT = Math.max(4, Runtime.getRuntime().availableProcessors() / 2);
    private static final int CONN_IDX_META = GridNioSessionMetaKey.nextUniqueKey();
    private static final int TRACKER_META = GridNioSessionMetaKey.nextUniqueKey();
    public static final int DFLT_PORT_RANGE = 100;
    public static final boolean DFLT_TCP_NODELAY = true;
    public static final int DFLT_ACK_SND_THRESHOLD = 32;
    public static final long DFLT_SOCK_WRITE_TIMEOUT = 2000L;
    public static final int DFLT_CONN_PER_NODE = 1;
    private static final IgniteRunnable NOOP = new IgniteRunnable(){

        @Override
        public void run() {
        }
    };
    public static final byte NODE_ID_MSG_TYPE = -1;
    public static final byte RECOVERY_LAST_ID_MSG_TYPE = -2;
    public static final byte HANDSHAKE_MSG_TYPE = -3;
    private ConnectGateway connectGate;
    private ConnectionPolicy connPlc;
    private final GridNioServerListener<Message> srvLsnr = new GridNioServerListenerAdapter<Message>(){

        @Override
        public void onSessionWriteTimeout(GridNioSession ses) {
            LT.warn(TcpCommunicationSpi.this.log, "Communication SPI session write timed out (consider increasing 'socketWriteTimeout' configuration property) [remoteAddr=" + ses.remoteAddress() + ", writeTimeout=" + TcpCommunicationSpi.this.sockWriteTimeout + ']');
            if (TcpCommunicationSpi.this.log.isDebugEnabled()) {
                TcpCommunicationSpi.this.log.debug("Closing communication SPI session on write timeout [remoteAddr=" + ses.remoteAddress() + ", writeTimeout=" + TcpCommunicationSpi.this.sockWriteTimeout + ']');
            }
            ses.close();
        }

        @Override
        public void onConnected(GridNioSession ses) {
            if (ses.accepted()) {
                if (TcpCommunicationSpi.this.log.isDebugEnabled()) {
                    TcpCommunicationSpi.this.log.debug("Sending local node ID to newly accepted session: " + ses);
                }
                try {
                    ses.sendNoFuture(TcpCommunicationSpi.this.nodeIdMessage());
                }
                catch (IgniteCheckedException e) {
                    U.error(TcpCommunicationSpi.this.log, "Failed to send message: " + e, e);
                }
            }
        }

        @Override
        public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
            ConnectionKey connId = (ConnectionKey)ses.meta(CONN_IDX_META);
            if (connId != null) {
                CommunicationListener lsnr0;
                GridNioRecoveryDescriptor outDesc;
                UUID id = connId.nodeId();
                GridCommunicationClient[] nodeClients = (GridCommunicationClient[])TcpCommunicationSpi.this.clients.get(id);
                if (nodeClients != null) {
                    for (GridCommunicationClient client : nodeClients) {
                        if (!(client instanceof GridTcpNioCommunicationClient) || ((GridTcpNioCommunicationClient)client).session() != ses) continue;
                        client.close();
                        TcpCommunicationSpi.this.removeNodeClient(id, client);
                    }
                }
                if (!TcpCommunicationSpi.this.stopping && (outDesc = ses.outRecoveryDescriptor()) != null) {
                    if (outDesc.nodeAlive(TcpCommunicationSpi.this.getSpiContext().node(id))) {
                        if (!outDesc.messagesRequests().isEmpty()) {
                            if (TcpCommunicationSpi.this.log.isDebugEnabled()) {
                                TcpCommunicationSpi.this.log.debug("Session was closed but there are unacknowledged messages, will try to reconnect [rmtNode=" + outDesc.node().id() + ']');
                            }
                            DisconnectedSessionInfo disconnectData = new DisconnectedSessionInfo(outDesc, connId.connectionIndex());
                            TcpCommunicationSpi.this.commWorker.addProcessDisconnectRequest(disconnectData);
                        }
                    } else {
                        outDesc.onNodeLeft();
                    }
                }
                if ((lsnr0 = TcpCommunicationSpi.this.lsnr) != null) {
                    lsnr0.onDisconnected(id);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void onFirstMessage(GridNioSession ses, Message msg) {
            ClusterNode rmtNode;
            ConnectionKey connKey;
            UUID sndId;
            if (msg instanceof NodeIdMessage) {
                sndId = U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes, 0);
                connKey = new ConnectionKey(sndId, 0, -1L);
            } else {
                assert (msg instanceof HandshakeMessage) : msg;
                HandshakeMessage msg0 = (HandshakeMessage)msg;
                sndId = ((HandshakeMessage)msg).nodeId();
                connKey = new ConnectionKey(sndId, msg0.connectionIndex(), msg0.connectCount());
            }
            if (TcpCommunicationSpi.this.log.isDebugEnabled()) {
                TcpCommunicationSpi.this.log.debug("Remote node ID received: " + sndId);
            }
            if ((rmtNode = TcpCommunicationSpi.this.getSpiContext().node(sndId)) == null) {
                if (TcpCommunicationSpi.this.log.isDebugEnabled()) {
                    TcpCommunicationSpi.this.log.debug("Close incoming connection, unknown node: " + sndId);
                }
                ses.close();
                return;
            }
            ConnectionKey old = ses.addMeta(CONN_IDX_META, connKey);
            assert (old == null);
            ClusterNode locNode = TcpCommunicationSpi.this.getSpiContext().localNode();
            if (ses.remoteAddress() == null) {
                return;
            }
            assert (msg instanceof HandshakeMessage) : msg;
            HandshakeMessage msg0 = (HandshakeMessage)msg;
            if (TcpCommunicationSpi.this.usePairedConnections(rmtNode)) {
                GridNioRecoveryDescriptor recoveryDesc = TcpCommunicationSpi.this.inRecoveryDescriptor(rmtNode, connKey);
                ConnectClosureNew c = new ConnectClosureNew(ses, recoveryDesc, rmtNode);
                boolean reserve = recoveryDesc.tryReserve(msg0.connectCount(), c);
                if (reserve) {
                    this.connectedNew(recoveryDesc, ses, true);
                } else if (c.failed) {
                    ses.send(new RecoveryLastReceivedMessage(-1L));
                    for (GridNioSession ses0 : TcpCommunicationSpi.this.nioSrvr.sessions()) {
                        ConnectionKey key0 = (ConnectionKey)ses0.meta(CONN_IDX_META);
                        if (!ses0.accepted() || key0 == null || !key0.nodeId().equals(connKey.nodeId()) || key0.connectionIndex() != connKey.connectionIndex() || key0.connectCount() >= connKey.connectCount()) continue;
                        ses0.close();
                    }
                }
            } else {
                assert (connKey.connectionIndex() >= 0) : connKey;
                GridCommunicationClient[] curClients = (GridCommunicationClient[])TcpCommunicationSpi.this.clients.get(sndId);
                GridCommunicationClient oldClient = curClients != null && connKey.connectionIndex() < curClients.length ? curClients[connKey.connectionIndex()] : null;
                boolean hasShmemClient = false;
                if (oldClient != null) {
                    if (oldClient instanceof GridTcpNioCommunicationClient) {
                        if (TcpCommunicationSpi.this.log.isDebugEnabled()) {
                            TcpCommunicationSpi.this.log.debug("Received incoming connection when already connected to this node, rejecting [locNode=" + locNode.id() + ", rmtNode=" + sndId + ']');
                        }
                        ses.send(new RecoveryLastReceivedMessage(-1L));
                        return;
                    }
                    assert (oldClient instanceof GridShmemCommunicationClient);
                    hasShmemClient = true;
                }
                GridFutureAdapter<GridCommunicationClient> fut = new GridFutureAdapter<GridCommunicationClient>();
                GridFutureAdapter oldFut = TcpCommunicationSpi.this.clientFuts.putIfAbsent(connKey, fut);
                GridNioRecoveryDescriptor recoveryDesc = TcpCommunicationSpi.this.inRecoveryDescriptor(rmtNode, connKey);
                if (oldFut == null) {
                    curClients = (GridCommunicationClient[])TcpCommunicationSpi.this.clients.get(sndId);
                    GridCommunicationClient gridCommunicationClient = oldClient = curClients != null && connKey.connectionIndex() < curClients.length ? curClients[connKey.connectionIndex()] : null;
                    if (oldClient != null) {
                        if (oldClient instanceof GridTcpNioCommunicationClient) {
                            assert (oldClient.connectionIndex() == connKey.connectionIndex()) : oldClient;
                            if (TcpCommunicationSpi.this.log.isDebugEnabled()) {
                                TcpCommunicationSpi.this.log.debug("Received incoming connection when already connected to this node, rejecting [locNode=" + locNode.id() + ", rmtNode=" + sndId + ']');
                            }
                            ses.send(new RecoveryLastReceivedMessage(-1L));
                            fut.onDone(oldClient);
                            return;
                        }
                        assert (oldClient instanceof GridShmemCommunicationClient);
                        hasShmemClient = true;
                    }
                    boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(), new ConnectClosure(ses, recoveryDesc, rmtNode, connKey, msg0, !hasShmemClient, fut));
                    if (TcpCommunicationSpi.this.log.isDebugEnabled()) {
                        TcpCommunicationSpi.this.log.debug("Received incoming connection from remote node [rmtNode=" + rmtNode.id() + ", reserved=" + reserved + ']');
                    }
                    if (reserved) {
                        try {
                            GridTcpNioCommunicationClient client = this.connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient);
                            fut.onDone(client);
                        }
                        finally {
                            TcpCommunicationSpi.this.clientFuts.remove(connKey, fut);
                        }
                    }
                } else if (oldFut instanceof ConnectFuture && locNode.order() < rmtNode.order()) {
                    if (TcpCommunicationSpi.this.log.isDebugEnabled()) {
                        TcpCommunicationSpi.this.log.debug("Received incoming connection from remote node while connecting to this node, rejecting [locNode=" + locNode.id() + ", locNodeOrder=" + locNode.order() + ", rmtNode=" + rmtNode.id() + ", rmtNodeOrder=" + rmtNode.order() + ']');
                    }
                    ses.send(new RecoveryLastReceivedMessage(-1L));
                } else {
                    boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(), new ConnectClosure(ses, recoveryDesc, rmtNode, connKey, msg0, !hasShmemClient, fut));
                    if (reserved) {
                        this.connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient);
                    }
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onMessage(GridNioSession ses, Message msg) {
            ConnectionKey connKey = (ConnectionKey)ses.meta(CONN_IDX_META);
            if (connKey == null) {
                assert (ses.accepted()) : ses;
                if (!TcpCommunicationSpi.this.connectGate.tryEnter()) {
                    if (TcpCommunicationSpi.this.log.isDebugEnabled()) {
                        TcpCommunicationSpi.this.log.debug("Close incoming connection, failed to enter gateway.");
                    }
                    ses.close();
                    return;
                }
                try {
                    this.onFirstMessage(ses, msg);
                }
                finally {
                    TcpCommunicationSpi.this.connectGate.leave();
                }
            } else {
                IgniteRunnable c;
                GridNioRecoveryDescriptor recovery;
                TcpCommunicationSpi.this.rcvdMsgsCnt.increment();
                if (msg instanceof RecoveryLastReceivedMessage) {
                    recovery = ses.outRecoveryDescriptor();
                    if (recovery != null) {
                        RecoveryLastReceivedMessage msg0 = (RecoveryLastReceivedMessage)msg;
                        if (TcpCommunicationSpi.this.log.isDebugEnabled()) {
                            TcpCommunicationSpi.this.log.debug("Received recovery acknowledgement [rmtNode=" + connKey.nodeId() + ", connIdx=" + connKey.connectionIndex() + ", rcvCnt=" + msg0.received() + ']');
                        }
                        recovery.ackReceived(msg0.received());
                        return;
                    }
                } else {
                    long rcvCnt;
                    recovery = ses.inRecoveryDescriptor();
                    if (recovery != null && (rcvCnt = recovery.onReceived()) % (long)TcpCommunicationSpi.this.ackSndThreshold == 0L) {
                        if (TcpCommunicationSpi.this.log.isDebugEnabled()) {
                            TcpCommunicationSpi.this.log.debug("Send recovery acknowledgement [rmtNode=" + connKey.nodeId() + ", connIdx=" + connKey.connectionIndex() + ", rcvCnt=" + rcvCnt + ']');
                        }
                        ses.systemMessage(new RecoveryLastReceivedMessage(rcvCnt));
                        recovery.lastAcknowledged(rcvCnt);
                    }
                }
                if (TcpCommunicationSpi.this.msgQueueLimit > 0) {
                    GridNioMessageTracker tracker = (GridNioMessageTracker)ses.meta(TRACKER_META);
                    if (tracker == null) {
                        tracker = new GridNioMessageTracker(ses, TcpCommunicationSpi.this.msgQueueLimit);
                        GridNioMessageTracker old = ses.addMeta(TRACKER_META, tracker);
                        assert (old == null);
                    }
                    tracker.onMessageReceived();
                    c = tracker;
                } else {
                    c = NOOP;
                }
                TcpCommunicationSpi.this.notifyListener(connKey.nodeId(), msg, c);
            }
        }

        private GridTcpNioCommunicationClient connected(GridNioRecoveryDescriptor recovery, GridNioSession ses, ClusterNode node, long rcvCnt, boolean sndRes, boolean createClient) {
            ConnectionKey connKey = (ConnectionKey)ses.meta(CONN_IDX_META);
            assert (connKey != null && connKey.connectionIndex() >= 0) : connKey;
            assert (!TcpCommunicationSpi.this.usePairedConnections(node));
            recovery.onHandshake(rcvCnt);
            ses.inRecoveryDescriptor(recovery);
            ses.outRecoveryDescriptor(recovery);
            TcpCommunicationSpi.this.nioSrvr.resend(ses);
            try {
                if (sndRes) {
                    TcpCommunicationSpi.this.nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.received()));
                }
            }
            catch (IgniteCheckedException e) {
                U.error(TcpCommunicationSpi.this.log, "Failed to send message: " + e, e);
            }
            recovery.onConnected();
            GridTcpNioCommunicationClient client = null;
            if (createClient) {
                client = new GridTcpNioCommunicationClient(connKey.connectionIndex(), ses, TcpCommunicationSpi.this.log);
                TcpCommunicationSpi.this.addNodeClient(node, connKey.connectionIndex(), client);
            }
            return client;
        }

        private void connectedNew(GridNioRecoveryDescriptor recovery, GridNioSession ses, boolean sndRes) {
            try {
                ses.inRecoveryDescriptor(recovery);
                if (sndRes) {
                    TcpCommunicationSpi.this.nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.received()));
                }
                recovery.onConnected();
            }
            catch (IgniteCheckedException e) {
                U.error(TcpCommunicationSpi.this.log, "Failed to send message: " + e, e);
            }
        }

        class ConnectClosure
        implements IgniteInClosure<Boolean> {
            private static final long serialVersionUID = 0L;
            private final GridNioSession ses;
            private final GridNioRecoveryDescriptor recoveryDesc;
            private final ClusterNode rmtNode;
            private final HandshakeMessage msg;
            private final GridFutureAdapter<GridCommunicationClient> fut;
            private final boolean createClient;
            private final ConnectionKey connKey;

            ConnectClosure(GridNioSession ses, GridNioRecoveryDescriptor recoveryDesc, ClusterNode rmtNode, ConnectionKey connKey, HandshakeMessage msg, boolean createClient, GridFutureAdapter<GridCommunicationClient> fut) {
                this.ses = ses;
                this.recoveryDesc = recoveryDesc;
                this.rmtNode = rmtNode;
                this.connKey = connKey;
                this.msg = msg;
                this.createClient = createClient;
                this.fut = fut;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void apply(Boolean success) {
                if (success.booleanValue()) {
                    try {
                        IgniteInClosure lsnr = new IgniteInClosure<IgniteInternalFuture<?>>(){

                            /*
                             * WARNING - Removed try catching itself - possible behaviour change.
                             */
                            @Override
                            public void apply(IgniteInternalFuture<?> msgFut) {
                                try {
                                    msgFut.get();
                                    GridTcpNioCommunicationClient client = this.connected(ConnectClosure.this.recoveryDesc, ConnectClosure.this.ses, ConnectClosure.this.rmtNode, ConnectClosure.this.msg.received(), false, ConnectClosure.this.createClient);
                                    ConnectClosure.this.fut.onDone(client);
                                }
                                catch (IgniteCheckedException e) {
                                    if (TcpCommunicationSpi.this.log.isDebugEnabled()) {
                                        TcpCommunicationSpi.this.log.debug("Failed to send recovery handshake [rmtNode=" + ConnectClosure.this.rmtNode.id() + ", err=" + e + ']');
                                    }
                                    ConnectClosure.this.recoveryDesc.release();
                                    ConnectClosure.this.fut.onDone();
                                }
                                finally {
                                    TcpCommunicationSpi.this.clientFuts.remove(ConnectClosure.this.connKey, ConnectClosure.this.fut);
                                }
                            }
                        };
                        TcpCommunicationSpi.this.nioSrvr.sendSystem(this.ses, new RecoveryLastReceivedMessage(this.recoveryDesc.received()), lsnr);
                    }
                    catch (IgniteCheckedException e) {
                        U.error(TcpCommunicationSpi.this.log, "Failed to send message: " + e, e);
                    }
                } else {
                    try {
                        this.fut.onDone();
                    }
                    finally {
                        TcpCommunicationSpi.this.clientFuts.remove(this.connKey, this.fut);
                    }
                }
            }
        }

        class ConnectClosureNew
        implements IgniteInClosure<Boolean> {
            private static final long serialVersionUID = 0L;
            private final GridNioSession ses;
            private final GridNioRecoveryDescriptor recoveryDesc;
            private final ClusterNode rmtNode;
            private boolean failed;

            ConnectClosureNew(GridNioSession ses, GridNioRecoveryDescriptor recoveryDesc, ClusterNode rmtNode) {
                this.ses = ses;
                this.recoveryDesc = recoveryDesc;
                this.rmtNode = rmtNode;
            }

            @Override
            public void apply(Boolean success) {
                try {
                    boolean bl = this.failed = success == false;
                    if (success.booleanValue()) {
                        IgniteInClosure lsnr = new IgniteInClosure<IgniteInternalFuture<?>>(){

                            @Override
                            public void apply(IgniteInternalFuture<?> msgFut) {
                                try {
                                    msgFut.get();
                                    this.connectedNew(ConnectClosureNew.this.recoveryDesc, ConnectClosureNew.this.ses, false);
                                }
                                catch (IgniteCheckedException e) {
                                    if (TcpCommunicationSpi.this.log.isDebugEnabled()) {
                                        TcpCommunicationSpi.this.log.debug("Failed to send recovery handshake [rmtNode=" + ConnectClosureNew.this.rmtNode.id() + ", err=" + e + ']');
                                    }
                                    ConnectClosureNew.this.recoveryDesc.release();
                                }
                            }
                        };
                        TcpCommunicationSpi.this.nioSrvr.sendSystem(this.ses, new RecoveryLastReceivedMessage(this.recoveryDesc.received()), lsnr);
                    } else {
                        TcpCommunicationSpi.this.nioSrvr.sendSystem(this.ses, new RecoveryLastReceivedMessage(-1L));
                    }
                }
                catch (IgniteCheckedException e) {
                    U.error(TcpCommunicationSpi.this.log, "Failed to send message: " + e, e);
                }
            }
        }
    };
    @LoggerResource
    private IgniteLogger log;
    private String locAddr;
    private volatile InetAddress locHost;
    private int locPort = 47100;
    private int locPortRange = 100;
    private int shmemPort = -1;
    private boolean directBuf = true;
    private boolean directSndBuf;
    private long idleConnTimeout = 30000L;
    private long connTimeout = 5000L;
    private long maxConnTimeout = 600000L;
    private int reconCnt = 10;
    private int sockSndBuf = 32768;
    private int sockRcvBuf = 32768;
    private int msgQueueLimit = 0;
    private int slowClientQueueLimit;
    private GridNioServer<Message> nioSrvr;
    private IpcSharedMemoryServerEndpoint shmemSrv;
    private boolean usePairedConnections;
    private int connectionsPerNode = 1;
    private boolean tcpNoDelay = true;
    private int ackSndThreshold = 32;
    private int unackedMsgsBufSize;
    private long sockWriteTimeout = 2000L;
    private CommunicationWorker commWorker;
    private ShmemAcceptWorker shmemAcceptWorker;
    private final Collection<ShmemWorker> shmemWorkers = new ConcurrentLinkedDeque8<ShmemWorker>();
    private final ConcurrentMap<UUID, GridCommunicationClient[]> clients = GridConcurrentFactory.newMap();
    private volatile CommunicationListener<Message> lsnr;
    private int boundTcpPort = -1;
    private int boundTcpShmemPort = -1;
    private int selectorsCnt = DFLT_SELECTORS_CNT;
    private long selectorSpins = IgniteSystemProperties.getLong("IGNITE_SELECTOR_SPINS", 0L);
    private AddressResolver addrRslvr;
    private final LongAdder8 rcvdMsgsCnt = new LongAdder8();
    private final LongAdder8 sentMsgsCnt = new LongAdder8();
    private final LongAdder8 rcvdBytesCnt = new LongAdder8();
    private final LongAdder8 sentBytesCnt = new LongAdder8();
    private final CountDownLatch ctxInitLatch = new CountDownLatch(1);
    private volatile boolean stopping;
    private final GridNioMetricsListener metricsLsnr = new GridNioMetricsListener(){

        @Override
        public void onBytesSent(int bytesCnt) {
            TcpCommunicationSpi.this.sentBytesCnt.add(bytesCnt);
        }

        @Override
        public void onBytesReceived(int bytesCnt) {
            TcpCommunicationSpi.this.rcvdBytesCnt.add(bytesCnt);
        }
    };
    private final ConcurrentMap<ConnectionKey, GridFutureAdapter<GridCommunicationClient>> clientFuts = GridConcurrentFactory.newMap();
    private final ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> recoveryDescs = GridConcurrentFactory.newMap();
    private final ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> outRecDescs = GridConcurrentFactory.newMap();
    private final ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> inRecDescs = GridConcurrentFactory.newMap();
    private final GridLocalEventListener discoLsnr = new GridLocalEventListener(){

        @Override
        public void onEvent(Event evt) {
            assert (evt instanceof DiscoveryEvent) : evt;
            assert (evt.type() == 11 || evt.type() == 12);
            TcpCommunicationSpi.this.onNodeLeft(((DiscoveryEvent)evt).eventNode().id());
        }
    };
    private final ThreadLocal<Integer> threadConnIdx = new ThreadLocal();
    private final AtomicInteger connIdx = new AtomicInteger();

    private boolean isSslEnabled() {
        return this.ignite.configuration().getSslContextFactory() != null;
    }

    @IgniteSpiConfiguration(optional=true)
    public void setAddressResolver(AddressResolver addrRslvr) {
        if (this.addrRslvr == null) {
            this.addrRslvr = addrRslvr;
        }
    }

    @Override
    @IgniteInstanceResource
    protected void injectResources(Ignite ignite) {
        super.injectResources(ignite);
        if (ignite != null) {
            this.setAddressResolver(ignite.configuration().getAddressResolver());
            this.setLocalAddress(ignite.configuration().getLocalHost());
        }
    }

    @IgniteSpiConfiguration(optional=true)
    public void setLocalAddress(String locAddr) {
        if (this.locAddr == null) {
            this.locAddr = locAddr;
        }
    }

    @Override
    public String getLocalAddress() {
        return this.locAddr;
    }

    @IgniteSpiConfiguration(optional=true)
    public void setLocalPort(int locPort) {
        this.locPort = locPort;
    }

    @Override
    public int getLocalPort() {
        return this.locPort;
    }

    @IgniteSpiConfiguration(optional=true)
    public void setLocalPortRange(int locPortRange) {
        this.locPortRange = locPortRange;
    }

    @Override
    public int getLocalPortRange() {
        return this.locPortRange;
    }

    @Override
    public boolean isUsePairedConnections() {
        return this.usePairedConnections;
    }

    public void setUsePairedConnections(boolean usePairedConnections) {
        this.usePairedConnections = usePairedConnections;
    }

    public void setConnectionsPerNode(int maxConnectionsPerNode) {
        this.connectionsPerNode = maxConnectionsPerNode;
    }

    @Override
    public int getConnectionsPerNode() {
        return this.connectionsPerNode;
    }

    @IgniteSpiConfiguration(optional=true)
    public void setSharedMemoryPort(int shmemPort) {
        this.shmemPort = shmemPort;
    }

    @Override
    public int getSharedMemoryPort() {
        return this.shmemPort;
    }

    @IgniteSpiConfiguration(optional=true)
    public void setIdleConnectionTimeout(long idleConnTimeout) {
        this.idleConnTimeout = idleConnTimeout;
    }

    @Override
    public long getIdleConnectionTimeout() {
        return this.idleConnTimeout;
    }

    @Override
    public long getSocketWriteTimeout() {
        return this.sockWriteTimeout;
    }

    @IgniteSpiConfiguration(optional=true)
    public void setSocketWriteTimeout(long sockWriteTimeout) {
        this.sockWriteTimeout = sockWriteTimeout;
    }

    @Override
    public int getAckSendThreshold() {
        return this.ackSndThreshold;
    }

    @IgniteSpiConfiguration(optional=true)
    public void setAckSendThreshold(int ackSndThreshold) {
        this.ackSndThreshold = ackSndThreshold;
    }

    @Override
    public int getUnacknowledgedMessagesBufferSize() {
        return this.unackedMsgsBufSize;
    }

    @IgniteSpiConfiguration(optional=true)
    public void setUnacknowledgedMessagesBufferSize(int unackedMsgsBufSize) {
        this.unackedMsgsBufSize = unackedMsgsBufSize;
    }

    @Deprecated
    @IgniteSpiConfiguration(optional=true)
    public void setConnectionBufferSize(int connBufSize) {
    }

    @Override
    @Deprecated
    public int getConnectionBufferSize() {
        return 0;
    }

    @Override
    @Deprecated
    @IgniteSpiConfiguration(optional=true)
    public void setConnectionBufferFlushFrequency(long connBufFlushFreq) {
    }

    @Override
    @Deprecated
    public long getConnectionBufferFlushFrequency() {
        return 0L;
    }

    @IgniteSpiConfiguration(optional=true)
    public void setConnectTimeout(long connTimeout) {
        this.connTimeout = connTimeout;
        this.failureDetectionTimeoutEnabled(false);
    }

    @Override
    public long getConnectTimeout() {
        return this.connTimeout;
    }

    @IgniteSpiConfiguration(optional=true)
    public void setMaxConnectTimeout(long maxConnTimeout) {
        this.maxConnTimeout = maxConnTimeout;
        this.failureDetectionTimeoutEnabled(false);
    }

    @Override
    public long getMaxConnectTimeout() {
        return this.maxConnTimeout;
    }

    @IgniteSpiConfiguration(optional=true)
    public void setReconnectCount(int reconCnt) {
        this.reconCnt = reconCnt;
        this.failureDetectionTimeoutEnabled(false);
    }

    @Override
    public int getReconnectCount() {
        return this.reconCnt;
    }

    @IgniteSpiConfiguration(optional=true)
    public void setDirectBuffer(boolean directBuf) {
        this.directBuf = directBuf;
    }

    @Override
    public boolean isDirectBuffer() {
        return this.directBuf;
    }

    @Override
    public boolean isDirectSendBuffer() {
        return this.directSndBuf;
    }

    @IgniteSpiConfiguration(optional=true)
    public void setDirectSendBuffer(boolean directSndBuf) {
        this.directSndBuf = directSndBuf;
    }

    @IgniteSpiConfiguration(optional=true)
    public void setSelectorsCount(int selectorsCnt) {
        this.selectorsCnt = selectorsCnt;
    }

    @Override
    public int getSelectorsCount() {
        return this.selectorsCnt;
    }

    @Override
    public long getSelectorSpins() {
        return this.selectorSpins;
    }

    public void setSelectorSpins(long selectorSpins) {
        this.selectorSpins = selectorSpins;
    }

    @IgniteSpiConfiguration(optional=true)
    public void setTcpNoDelay(boolean tcpNoDelay) {
        this.tcpNoDelay = tcpNoDelay;
    }

    @Override
    public boolean isTcpNoDelay() {
        return this.tcpNoDelay;
    }

    @IgniteSpiConfiguration(optional=true)
    public void setSocketReceiveBuffer(int sockRcvBuf) {
        this.sockRcvBuf = sockRcvBuf;
    }

    @Override
    public int getSocketReceiveBuffer() {
        return this.sockRcvBuf;
    }

    @IgniteSpiConfiguration(optional=true)
    public void setSocketSendBuffer(int sockSndBuf) {
        this.sockSndBuf = sockSndBuf;
    }

    @Override
    public int getSocketSendBuffer() {
        return this.sockSndBuf;
    }

    @IgniteSpiConfiguration(optional=true)
    public void setMessageQueueLimit(int msgQueueLimit) {
        this.msgQueueLimit = msgQueueLimit;
    }

    @Override
    public int getMessageQueueLimit() {
        return this.msgQueueLimit;
    }

    @Override
    public int getSlowClientQueueLimit() {
        return this.slowClientQueueLimit;
    }

    public void setSlowClientQueueLimit(int slowClientQueueLimit) {
        this.slowClientQueueLimit = slowClientQueueLimit;
    }

    @IgniteSpiConfiguration(optional=true)
    @Deprecated
    public void setMinimumBufferedMessageCount(int minBufferedMsgCnt) {
    }

    @Override
    @Deprecated
    public int getMinimumBufferedMessageCount() {
        return 0;
    }

    @Override
    public void setListener(CommunicationListener<Message> lsnr) {
        this.lsnr = lsnr;
    }

    public CommunicationListener getListener() {
        return this.lsnr;
    }

    @Override
    public int getSentMessagesCount() {
        return this.sentMsgsCnt.intValue();
    }

    @Override
    public long getSentBytesCount() {
        return this.sentBytesCnt.longValue();
    }

    @Override
    public int getReceivedMessagesCount() {
        return this.rcvdMsgsCnt.intValue();
    }

    @Override
    public long getReceivedBytesCount() {
        return this.rcvdBytesCnt.longValue();
    }

    @Override
    public int getOutboundMessagesQueueSize() {
        GridNioServer<Message> srv = this.nioSrvr;
        return srv != null ? srv.outboundMessagesQueueSize() : 0;
    }

    @Override
    public void resetMetrics() {
        this.sentMsgsCnt.add(-this.sentMsgsCnt.sum());
        this.rcvdMsgsCnt.add(-this.rcvdMsgsCnt.sum());
        this.sentBytesCnt.add(-this.sentBytesCnt.sum());
        this.rcvdBytesCnt.add(-this.rcvdBytesCnt.sum());
    }

    @Override
    public void dumpStats() {
        GridNioServer<Message> nioSrvr;
        IgniteLogger log = this.log;
        if (log != null) {
            GridNioRecoveryDescriptor desc;
            StringBuilder sb = new StringBuilder("Communication SPI recovery descriptors: ").append(U.nl());
            for (Map.Entry entry : this.recoveryDescs.entrySet()) {
                desc = (GridNioRecoveryDescriptor)entry.getValue();
                sb.append("    [key=").append(entry.getKey()).append(", msgsSent=").append(desc.sent()).append(", msgsAckedByRmt=").append(desc.acked()).append(", msgsRcvd=").append(desc.received()).append(", lastAcked=").append(desc.lastAcknowledged()).append(", reserveCnt=").append(desc.reserveCount()).append(", descIdHash=").append(System.identityHashCode(desc)).append(']').append(U.nl());
            }
            for (Map.Entry entry : this.outRecDescs.entrySet()) {
                desc = (GridNioRecoveryDescriptor)entry.getValue();
                sb.append("    [key=").append(entry.getKey()).append(", msgsSent=").append(desc.sent()).append(", msgsAckedByRmt=").append(desc.acked()).append(", reserveCnt=").append(desc.reserveCount()).append(", connected=").append(desc.connected()).append(", reserved=").append(desc.reserved()).append(", descIdHash=").append(System.identityHashCode(desc)).append(']').append(U.nl());
            }
            for (Map.Entry entry : this.inRecDescs.entrySet()) {
                desc = (GridNioRecoveryDescriptor)entry.getValue();
                sb.append("    [key=").append(entry.getKey()).append(", msgsRcvd=").append(desc.received()).append(", lastAcked=").append(desc.lastAcknowledged()).append(", reserveCnt=").append(desc.reserveCount()).append(", connected=").append(desc.connected()).append(", reserved=").append(desc.reserved()).append(", handshakeIdx=").append(desc.handshakeIndex()).append(", descIdHash=").append(System.identityHashCode(desc)).append(']').append(U.nl());
            }
            sb.append("Communication SPI clients: ").append(U.nl());
            for (Map.Entry entry : this.clients.entrySet()) {
                GridCommunicationClient[] clients0;
                UUID nodeId = (UUID)entry.getKey();
                for (GridCommunicationClient client : clients0 = (GridCommunicationClient[])entry.getValue()) {
                    if (client == null) continue;
                    sb.append("    [node=").append(nodeId).append(", client=").append(client).append(']').append(U.nl());
                }
            }
            U.warn(log, sb.toString());
        }
        if ((nioSrvr = this.nioSrvr) != null) {
            nioSrvr.dumpStats();
        }
    }

    @Override
    public Map<String, Object> getNodeAttributes() throws IgniteSpiException {
        this.initFailureDetectionTimeout();
        this.assertParameter(this.locPort > 1023, "locPort > 1023");
        this.assertParameter(this.locPort <= 65535, "locPort < 0xffff");
        this.assertParameter(this.locPortRange >= 0, "locPortRange >= 0");
        this.assertParameter(this.idleConnTimeout > 0L, "idleConnTimeout > 0");
        this.assertParameter(this.sockRcvBuf >= 0, "sockRcvBuf >= 0");
        this.assertParameter(this.sockSndBuf >= 0, "sockSndBuf >= 0");
        this.assertParameter(this.msgQueueLimit >= 0, "msgQueueLimit >= 0");
        this.assertParameter(this.shmemPort > 0 || this.shmemPort == -1, "shmemPort > 0 || shmemPort == -1");
        this.assertParameter(this.selectorsCnt > 0, "selectorsCnt > 0");
        this.assertParameter(this.connectionsPerNode > 0, "connectionsPerNode > 0");
        this.assertParameter(this.connectionsPerNode <= 1024, "connectionsPerNode <= 1024");
        if (!this.failureDetectionTimeoutEnabled()) {
            this.assertParameter(this.reconCnt > 0, "reconnectCnt > 0");
            this.assertParameter(this.connTimeout >= 0L, "connTimeout >= 0");
            this.assertParameter(this.maxConnTimeout >= this.connTimeout, "maxConnTimeout >= connTimeout");
        }
        this.assertParameter(this.sockWriteTimeout >= 0L, "sockWriteTimeout >= 0");
        this.assertParameter(this.ackSndThreshold > 0, "ackSndThreshold > 0");
        this.assertParameter(this.unackedMsgsBufSize >= 0, "unackedMsgsBufSize >= 0");
        if (this.unackedMsgsBufSize > 0) {
            this.assertParameter(this.unackedMsgsBufSize >= this.msgQueueLimit * 5, "Specified 'unackedMsgsBufSize' is too low, it should be at least 'msgQueueLimit * 5'.");
            this.assertParameter(this.unackedMsgsBufSize >= this.ackSndThreshold * 5, "Specified 'unackedMsgsBufSize' is too low, it should be at least 'ackSndThreshold * 5'.");
        }
        this.connPlc = this.connectionsPerNode > 1 ? new ConnectionPolicy(){

            @Override
            public int connectionIndex() {
                return (int)(U.safeAbs(Thread.currentThread().getId()) % (long)TcpCommunicationSpi.this.connectionsPerNode);
            }
        } : new ConnectionPolicy(){

            @Override
            public int connectionIndex() {
                return 0;
            }
        };
        try {
            this.locHost = U.resolveLocalHost(this.locAddr);
        }
        catch (IOException e) {
            throw new IgniteSpiException("Failed to initialize local address: " + this.locAddr, e);
        }
        try {
            this.shmemSrv = this.resetShmemServer();
        }
        catch (IgniteCheckedException e) {
            U.warn(this.log, "Failed to start shared memory communication server.", e);
        }
        try {
            this.nioSrvr = this.resetNioServer();
        }
        catch (IgniteCheckedException e) {
            throw new IgniteSpiException("Failed to initialize TCP server: " + this.locHost, e);
        }
        try {
            IgniteBiTuple<Collection<String>, Collection<String>> addrs = U.resolveLocalAddresses(this.locHost);
            Collection<InetSocketAddress> extAddrs = this.addrRslvr == null ? null : U.resolveAddresses(this.addrRslvr, F.flat(Arrays.asList(addrs.get1(), addrs.get2())), this.boundTcpPort);
            HashMap<String, Object> res = new HashMap<String, Object>(5);
            res.put(this.createSpiAttributeName(ATTR_ADDRS), addrs.get1());
            res.put(this.createSpiAttributeName(ATTR_HOST_NAMES), addrs.get2());
            res.put(this.createSpiAttributeName(ATTR_PORT), this.boundTcpPort);
            res.put(this.createSpiAttributeName(ATTR_SHMEM_PORT), this.boundTcpShmemPort >= 0 ? Integer.valueOf(this.boundTcpShmemPort) : null);
            res.put(this.createSpiAttributeName(ATTR_EXT_ADDRS), extAddrs);
            res.put(this.createSpiAttributeName(ATTR_PAIRED_CONN), this.usePairedConnections);
            return res;
        }
        catch (IOException | IgniteCheckedException e) {
            throw new IgniteSpiException("Failed to resolve local host to addresses: " + this.locHost, e);
        }
    }

    @Override
    public void spiStart(String gridName) throws IgniteSpiException {
        assert (this.locHost != null);
        this.startStopwatch();
        if (this.log.isDebugEnabled()) {
            this.log.debug(this.configInfo("locAddr", this.locAddr));
            this.log.debug(this.configInfo("locPort", this.locPort));
            this.log.debug(this.configInfo("locPortRange", this.locPortRange));
            this.log.debug(this.configInfo("idleConnTimeout", this.idleConnTimeout));
            this.log.debug(this.configInfo("directBuf", this.directBuf));
            this.log.debug(this.configInfo("directSendBuf", this.directSndBuf));
            this.log.debug(this.configInfo("selectorsCnt", this.selectorsCnt));
            this.log.debug(this.configInfo("tcpNoDelay", this.tcpNoDelay));
            this.log.debug(this.configInfo("sockSndBuf", this.sockSndBuf));
            this.log.debug(this.configInfo("sockRcvBuf", this.sockRcvBuf));
            this.log.debug(this.configInfo("shmemPort", this.shmemPort));
            this.log.debug(this.configInfo("msgQueueLimit", this.msgQueueLimit));
            this.log.debug(this.configInfo("connectionsPerNode", this.connectionsPerNode));
            if (this.failureDetectionTimeoutEnabled()) {
                this.log.debug(this.configInfo("connTimeout", this.connTimeout));
                this.log.debug(this.configInfo("maxConnTimeout", this.maxConnTimeout));
                this.log.debug(this.configInfo("reconCnt", this.reconCnt));
            } else {
                this.log.debug(this.configInfo("failureDetectionTimeout", this.failureDetectionTimeout()));
            }
            this.log.debug(this.configInfo("sockWriteTimeout", this.sockWriteTimeout));
            this.log.debug(this.configInfo("ackSndThreshold", this.ackSndThreshold));
            this.log.debug(this.configInfo("unackedMsgsBufSize", this.unackedMsgsBufSize));
        }
        if (!this.tcpNoDelay) {
            U.quietAndWarn(this.log, "'TCP_NO_DELAY' for communication is off, which should be used with caution since may produce significant delays with some scenarios.");
        }
        if (this.slowClientQueueLimit > 0 && this.msgQueueLimit > 0 && this.slowClientQueueLimit >= this.msgQueueLimit) {
            U.quietAndWarn(this.log, "Slow client queue limit is set to a value greater than message queue limit (slow client queue limit will have no effect) [msgQueueLimit=" + this.msgQueueLimit + ", slowClientQueueLimit=" + this.slowClientQueueLimit + ']');
        }
        if (this.msgQueueLimit == 0) {
            U.quietAndWarn(this.log, "Message queue limit is set to 0 which may lead to potential OOMEs when running cache operations in FULL_ASYNC or PRIMARY_SYNC modes due to message queues growth on sender and receiver sides.");
        }
        this.registerMBean(gridName, this, TcpCommunicationSpiMBean.class);
        this.connectGate = new ConnectGateway();
        if (this.shmemSrv != null) {
            this.shmemAcceptWorker = new ShmemAcceptWorker(this.shmemSrv);
            new IgniteThread(this.shmemAcceptWorker).start();
        }
        this.nioSrvr.start();
        this.commWorker = new CommunicationWorker(gridName);
        this.commWorker.start();
        if (this.log.isDebugEnabled()) {
            this.log.debug(this.startInfo());
        }
    }

    @Override
    public void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException {
        spiCtx.registerPort(this.boundTcpPort, IgnitePortProtocol.TCP);
        if (this.boundTcpShmemPort > 0) {
            spiCtx.registerPort(this.boundTcpShmemPort, IgnitePortProtocol.TCP);
        }
        spiCtx.addLocalEventListener(this.discoLsnr, 11, 12);
        this.ctxInitLatch.countDown();
    }

    @Override
    public IgniteSpiContext getSpiContext() {
        if (this.ctxInitLatch.getCount() > 0L) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Waiting for context initialization.");
            }
            try {
                U.await(this.ctxInitLatch);
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Context has been initialized.");
                }
            }
            catch (IgniteInterruptedCheckedException e) {
                U.warn(this.log, "Thread has been interrupted while waiting for SPI context initialization.", e);
            }
        }
        return super.getSpiContext();
    }

    private GridNioServer<Message> resetNioServer() throws IgniteCheckedException {
        if (this.boundTcpPort >= 0) {
            throw new IgniteCheckedException("Tcp NIO server was already created on port " + this.boundTcpPort);
        }
        IgniteCheckedException lastEx = null;
        int lastPort = this.locPortRange == 0 ? this.locPort : this.locPort + this.locPortRange - 1;
        for (int port = this.locPort; port <= lastPort; ++port) {
            try {
                GridNioFilter[] filters;
                CI2<GridNioSession, Integer> queueSizeMonitor;
                MessageFactory msgFactory = new MessageFactory(){
                    private MessageFactory impl;

                    @Override
                    @Nullable
                    public Message create(byte type) {
                        if (this.impl == null) {
                            this.impl = TcpCommunicationSpi.this.getSpiContext().messageFactory();
                        }
                        assert (this.impl != null);
                        return this.impl.create(type);
                    }
                };
                GridNioMessageReaderFactory readerFactory = new GridNioMessageReaderFactory(){
                    private MessageFormatter formatter;

                    @Override
                    public MessageReader reader(GridNioSession ses, MessageFactory msgFactory) throws IgniteCheckedException {
                        if (this.formatter == null) {
                            this.formatter = TcpCommunicationSpi.this.getSpiContext().messageFormatter();
                        }
                        assert (this.formatter != null);
                        ConnectionKey key = (ConnectionKey)ses.meta(CONN_IDX_META);
                        return key != null ? this.formatter.reader(key.nodeId(), msgFactory) : null;
                    }
                };
                GridNioMessageWriterFactory writerFactory = new GridNioMessageWriterFactory(){
                    private MessageFormatter formatter;

                    @Override
                    public MessageWriter writer(GridNioSession ses) throws IgniteCheckedException {
                        if (this.formatter == null) {
                            this.formatter = TcpCommunicationSpi.this.getSpiContext().messageFormatter();
                        }
                        assert (this.formatter != null);
                        ConnectionKey key = (ConnectionKey)ses.meta(CONN_IDX_META);
                        return key != null ? this.formatter.writer(key.nodeId()) : null;
                    }
                };
                GridDirectParser parser = new GridDirectParser(this.log.getLogger(GridDirectParser.class), msgFactory, readerFactory);
                IgnitePredicate<Message> skipRecoveryPred = new IgnitePredicate<Message>(){

                    @Override
                    public boolean apply(Message msg) {
                        return msg instanceof RecoveryLastReceivedMessage;
                    }
                };
                boolean clientMode = Boolean.TRUE.equals(this.ignite.configuration().isClientMode());
                CI2<GridNioSession, Integer> cI2 = queueSizeMonitor = !clientMode && this.slowClientQueueLimit > 0 ? new CI2<GridNioSession, Integer>(){

                    @Override
                    public void apply(GridNioSession ses, Integer qSize) {
                        TcpCommunicationSpi.this.checkClientQueueSize(ses, qSize);
                    }
                } : null;
                if (this.isSslEnabled()) {
                    GridNioSslFilter sslFilter = new GridNioSslFilter(this.ignite.configuration().getSslContextFactory().create(), true, ByteOrder.nativeOrder(), this.log);
                    sslFilter.directMode(true);
                    sslFilter.wantClientAuth(true);
                    sslFilter.needClientAuth(true);
                    filters = new GridNioFilter[]{new GridNioCodecFilter(parser, this.log, true), new GridConnectionBytesVerifyFilter(this.log), sslFilter};
                } else {
                    filters = new GridNioFilter[]{new GridNioCodecFilter(parser, this.log, true), new GridConnectionBytesVerifyFilter(this.log)};
                }
                GridNioServer<Message> srvr = GridNioServer.builder().address(this.locHost).port(port).listener(this.srvLsnr).logger(this.log).selectorCount(this.selectorsCnt).gridName(this.gridName).serverName("tcp-comm").tcpNoDelay(this.tcpNoDelay).directBuffer(this.directBuf).byteOrder(ByteOrder.nativeOrder()).socketSendBufferSize(this.sockSndBuf).socketReceiveBufferSize(this.sockRcvBuf).sendQueueLimit(this.msgQueueLimit).directMode(true).metricsListener(this.metricsLsnr).writeTimeout(this.sockWriteTimeout).selectorSpins(this.selectorSpins).filters(filters).writerFactory(writerFactory).skipRecoveryPredicate(skipRecoveryPred).messageQueueSizeListener((IgniteBiInClosure<GridNioSession, Integer>)queueSizeMonitor).readWriteSelectorsAssign(this.usePairedConnections).build();
                this.boundTcpPort = port;
                if (this.log.isInfoEnabled()) {
                    this.log.info("Successfully bound communication NIO server to TCP port [port=" + this.boundTcpPort + ", locHost=" + this.locHost + ", selectorsCnt=" + this.selectorsCnt + ", selectorSpins=" + srvr.selectorSpins() + ", pairedConn=" + this.usePairedConnections + ']');
                }
                srvr.idleTimeout(this.idleConnTimeout);
                return srvr;
            }
            catch (IgniteCheckedException e) {
                if (X.hasCause(e, SSLException.class)) {
                    throw new IgniteSpiException("Failed to create SSL context. SSL factory: " + this.ignite.configuration().getSslContextFactory() + '.', e);
                }
                lastEx = e;
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Failed to bind to local port (will try next port within range) [port=" + port + ", locHost=" + this.locHost + ']');
                }
                this.onException("Failed to bind to local port (will try next port within range) [port=" + port + ", locHost=" + this.locHost + ']', e);
                continue;
            }
        }
        throw new IgniteCheckedException("Failed to bind to any port within range [startPort=" + this.locPort + ", portRange=" + this.locPortRange + ", locHost=" + this.locHost + ']', lastEx);
    }

    @Nullable
    private IpcSharedMemoryServerEndpoint resetShmemServer() throws IgniteCheckedException {
        if (this.boundTcpShmemPort >= 0) {
            throw new IgniteCheckedException("Shared memory server was already created on port " + this.boundTcpShmemPort);
        }
        if (this.shmemPort == -1 || U.isWindows()) {
            return null;
        }
        IgniteCheckedException lastEx = null;
        for (int port = this.shmemPort; port < this.shmemPort + this.locPortRange; ++port) {
            try {
                IgniteConfiguration cfg = this.ignite.configuration();
                IpcSharedMemoryServerEndpoint srv = new IpcSharedMemoryServerEndpoint(this.log, cfg.getNodeId(), this.gridName, cfg.getWorkDirectory());
                srv.setPort(port);
                srv.omitOutOfResourcesWarning(true);
                srv.start();
                this.boundTcpShmemPort = port;
                if (this.log.isInfoEnabled()) {
                    this.log.info("Successfully bound shared memory communication to TCP port [port=" + this.boundTcpShmemPort + ", locHost=" + this.locHost + ']');
                }
                return srv;
            }
            catch (IgniteCheckedException e) {
                lastEx = e;
                if (!this.log.isDebugEnabled()) continue;
                this.log.debug("Failed to bind to local port (will try next port within range) [port=" + port + ", locHost=" + this.locHost + ']');
                continue;
            }
        }
        throw new IgniteCheckedException("Failed to bind shared memory communication to any port within range [startPort=" + this.locPort + ", portRange=" + this.locPortRange + ", locHost=" + this.locHost + ']', lastEx);
    }

    @Override
    public void spiStop() throws IgniteSpiException {
        assert (this.stopping);
        this.unregisterMBean();
        if (this.nioSrvr != null) {
            this.nioSrvr.stop();
        }
        U.interrupt(this.commWorker);
        U.join(this.commWorker, this.log);
        U.cancel(this.shmemAcceptWorker);
        U.join(this.shmemAcceptWorker, this.log);
        U.cancel(this.shmemWorkers);
        U.join(this.shmemWorkers, this.log);
        this.shmemWorkers.clear();
        Iterator i$ = this.clients.values().iterator();
        while (i$.hasNext()) {
            GridCommunicationClient[] clients0;
            for (GridCommunicationClient client : clients0 = (GridCommunicationClient[])i$.next()) {
                if (client == null) continue;
                client.forceClose();
            }
        }
        this.nioSrvr = null;
        this.commWorker = null;
        this.boundTcpPort = -1;
        if (this.log.isDebugEnabled()) {
            this.log.debug(this.stopInfo());
        }
    }

    @Override
    protected void onContextDestroyed0() {
        this.stopping = true;
        if (this.ctxInitLatch.getCount() > 0L) {
            this.ctxInitLatch.countDown();
        }
        if (this.connectGate != null) {
            this.connectGate.stopped();
        }
        Iterator i$ = this.clients.values().iterator();
        while (i$.hasNext()) {
            GridCommunicationClient[] clients0;
            for (GridCommunicationClient client : clients0 = (GridCommunicationClient[])i$.next()) {
                if (client == null) continue;
                client.forceClose();
            }
        }
        this.getSpiContext().deregisterPorts();
        this.getSpiContext().removeLocalEventListener(this.discoLsnr);
    }

    @Override
    public void onClientDisconnected(IgniteFuture<?> reconnectFut) {
        this.connectGate.disconnected(reconnectFut);
        Iterator i$ = this.clients.values().iterator();
        while (i$.hasNext()) {
            GridCommunicationClient[] clients0;
            for (GridCommunicationClient client : clients0 = (GridCommunicationClient[])i$.next()) {
                if (client == null) continue;
                client.forceClose();
            }
        }
        IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(reconnectFut, "Failed to connect client node disconnected.");
        for (GridFutureAdapter clientFut : this.clientFuts.values()) {
            clientFut.onDone(err);
        }
        this.recoveryDescs.clear();
        this.inRecDescs.clear();
        this.outRecDescs.clear();
    }

    @Override
    public void onClientReconnected(boolean clusterRestarted) {
        this.connectGate.reconnected();
    }

    void onNodeLeft(UUID nodeId) {
        assert (nodeId != null);
        GridCommunicationClient[] clients0 = (GridCommunicationClient[])this.clients.remove(nodeId);
        if (clients0 != null) {
            for (GridCommunicationClient client : clients0) {
                if (client == null) continue;
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Forcing NIO client close since node has left [nodeId=" + nodeId + ", client=" + client + ']');
                }
                client.forceClose();
            }
        }
    }

    @Override
    protected void checkConfigurationConsistency0(IgniteSpiContext spiCtx, ClusterNode node, boolean starting) throws IgniteSpiException {
        this.checkAttributePresence(node, this.createSpiAttributeName(ATTR_ADDRS));
        this.checkAttributePresence(node, this.createSpiAttributeName(ATTR_HOST_NAMES));
        this.checkAttributePresence(node, this.createSpiAttributeName(ATTR_PORT));
    }

    private void checkAttributePresence(ClusterNode node, String attrName) {
        if (node.attribute(attrName) == null) {
            U.warn(this.log, "Remote node has inconsistent configuration (required attribute was not found) [attrName=" + attrName + ", nodeId=" + node.id() + "spiCls=" + U.getSimpleName(TcpCommunicationSpi.class) + ']');
        }
    }

    @Override
    public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
        this.sendMessage0(node, msg, null);
    }

    public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
        this.sendMessage0(node, msg, ackC);
    }

    private void sendMessage0(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
        ClusterNode locNode;
        assert (node != null);
        assert (msg != null);
        if (this.log.isTraceEnabled()) {
            this.log.trace("Sending message with ack to node [node=" + node + ", msg=" + msg + ']');
        }
        if ((locNode = this.getLocalNode()) == null) {
            throw new IgniteSpiException("Local node has not been started or fully initialized [isStopping=" + this.getSpiContext().isStopping() + ']');
        }
        if (node.id().equals(locNode.id())) {
            this.notifyListener(node.id(), msg, NOOP);
        } else {
            GridCommunicationClient client = null;
            int connIdx = this.useMultipleConnections(node) ? this.connPlc.connectionIndex() : 0;
            try {
                boolean retry;
                do {
                    client = this.reserveClient(node, connIdx);
                    UUID nodeId = null;
                    if (!client.async()) {
                        nodeId = node.id();
                    }
                    retry = client.sendMessage(nodeId, msg, ackC);
                    client.release();
                    if (!retry) {
                        this.sentMsgsCnt.increment();
                    } else {
                        this.removeNodeClient(node.id(), client);
                        ClusterNode node0 = this.getSpiContext().node(node.id());
                        if (node0 == null) {
                            throw new IgniteCheckedException("Failed to send message to remote node (node has left the grid): " + node.id());
                        }
                    }
                    client = null;
                } while (retry);
            }
            catch (IgniteCheckedException e) {
                throw new IgniteSpiException("Failed to send message to remote node: " + node, e);
            }
            finally {
                if (client != null && this.removeNodeClient(node.id(), client)) {
                    client.forceClose();
                }
            }
        }
    }

    private boolean removeNodeClient(UUID nodeId, GridCommunicationClient rmvClient) {
        GridCommunicationClient[] newClients;
        GridCommunicationClient[] curClients;
        do {
            if ((curClients = (GridCommunicationClient[])this.clients.get(nodeId)) == null || rmvClient.connectionIndex() >= curClients.length || curClients[rmvClient.connectionIndex()] != rmvClient) {
                return false;
            }
            newClients = Arrays.copyOf(curClients, curClients.length);
            newClients[rmvClient.connectionIndex()] = null;
        } while (!this.clients.replace(nodeId, curClients, newClients));
        return true;
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private void addNodeClient(ClusterNode node, int connIdx, GridCommunicationClient addClient) {
        assert (this.connectionsPerNode > 0) : this.connectionsPerNode;
        assert (connIdx == addClient.connectionIndex()) : addClient;
        if (connIdx >= this.connectionsPerNode) {
            assert (!this.usePairedConnections(node));
            return;
        }
        while (true) {
            GridCommunicationClient[] newClients;
            GridCommunicationClient[] curClients = (GridCommunicationClient[])this.clients.get(node.id());
            assert (curClients == null || curClients[connIdx] == null) : "Client already created [node=" + node.id() + ", connIdx=" + connIdx + ", client=" + addClient + ", oldClient=" + curClients[connIdx] + ']';
            if (curClients == null) {
                newClients = new GridCommunicationClient[this.useMultipleConnections(node) ? this.connectionsPerNode : 1];
                newClients[connIdx] = addClient;
                if (this.clients.putIfAbsent(node.id(), newClients) != null) continue;
                return;
            }
            newClients = Arrays.copyOf(curClients, curClients.length);
            newClients[connIdx] = addClient;
            if (this.clients.replace(node.id(), curClients, newClients)) return;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Unable to fully structure code
     */
    private GridCommunicationClient reserveClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
        if (!TcpCommunicationSpi.$assertionsDisabled && node == null) {
            throw new AssertionError();
        }
        if (!TcpCommunicationSpi.$assertionsDisabled && (connIdx < 0 || connIdx >= this.connectionsPerNode) && this.usePairedConnections(node)) {
            throw new AssertionError(connIdx);
        }
        nodeId = node.id();
        while (true) {
            v0 = client = (curClients = (GridCommunicationClient[])this.clients.get(nodeId)) != null && connIdx < curClients.length ? curClients[connIdx] : null;
            if (client == null) {
                if (this.stopping) {
                    throw new IgniteSpiException("Node is stopping.");
                }
                connKey = new ConnectionKey(nodeId, connIdx, -1L);
                fut = new ConnectFuture();
                oldFut = this.clientFuts.putIfAbsent(connKey, fut);
                if (oldFut == null) {
                    try {
                        curClients0 = (GridCommunicationClient[])this.clients.get(nodeId);
                        v1 = client0 = curClients0 != null && connIdx < curClients0.length ? curClients0[connIdx] : null;
                        if (client0 == null) {
                            client0 = this.createNioClient(node, connIdx);
                            if (client0 != null) {
                                this.addNodeClient(node, connIdx, client0);
                                if (client0 instanceof GridTcpNioCommunicationClient && (tcpClient = (GridTcpNioCommunicationClient)client0).session().closeTime() > 0L && this.removeNodeClient(nodeId, client0)) {
                                    if (this.log.isDebugEnabled()) {
                                        this.log.debug("Session was closed after client creation, will retry [node=" + node + ", client=" + client0 + ']');
                                    }
                                    client0 = null;
                                }
                            } else {
                                U.sleep(200L);
                            }
                        }
                        fut.onDone(client0);
                    }
                    catch (Throwable e) {
                        fut.onDone(e);
                        if (!(e instanceof Error)) ** GOTO lbl41
                        throw (Error)e;
                    }
                    finally {
                        this.clientFuts.remove(connKey, fut);
                    }
                } else {
                    fut = oldFut;
                }
lbl41:
                // 3 sources

                if ((client = (GridCommunicationClient)fut.get()) == null) continue;
                if (this.getSpiContext().node(nodeId) == null) {
                    if (this.removeNodeClient(nodeId, client)) {
                        client.forceClose();
                    }
                    throw new IgniteSpiException("Destination node is not in topology: " + node.id());
                }
            }
            if (!TcpCommunicationSpi.$assertionsDisabled && connIdx != client.connectionIndex()) {
                throw new AssertionError(client);
            }
            if (client.reserve()) {
                return client;
            }
            this.removeNodeClient(nodeId, client);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nullable
    private GridCommunicationClient createNioClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
        block13: {
            assert (node != null);
            Integer shmemPort = (Integer)node.attribute(this.createSpiAttributeName(ATTR_SHMEM_PORT));
            ClusterNode locNode = this.getSpiContext().localNode();
            if (locNode == null) {
                throw new IgniteCheckedException("Failed to create NIO client (local node is stopping)");
            }
            if (this.log.isDebugEnabled()) {
                this.log.debug("Creating NIO client to node: " + node);
            }
            if (shmemPort != null && U.sameMacs(locNode, node)) {
                try {
                    GridCommunicationClient client = this.createShmemClient(node, connIdx, shmemPort);
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Shmem client created: " + client);
                    }
                    return client;
                }
                catch (IgniteCheckedException e) {
                    if (e.hasCause(IpcOutOfSystemResourcesException.class)) {
                        LT.warn(this.log, OUT_OF_RESOURCES_TCP_MSG);
                    }
                    if (this.getSpiContext().node(node.id()) != null) {
                        LT.warn(this.log, e.getMessage());
                    }
                    if (!this.log.isDebugEnabled()) break block13;
                    this.log.debug("Failed to establish shared memory connection with local node (node has left): " + node.id());
                }
            }
        }
        this.connectGate.enter();
        try {
            GridCommunicationClient client = this.createTcpClient(node, connIdx);
            if (this.log.isDebugEnabled()) {
                this.log.debug("TCP client created: " + client);
            }
            GridCommunicationClient gridCommunicationClient = client;
            return gridCommunicationClient;
        }
        finally {
            this.connectGate.leave();
        }
    }

    @Nullable
    private GridCommunicationClient createShmemClient(ClusterNode node, int connIdx, Integer port) throws IgniteCheckedException {
        GridShmemCommunicationClient client;
        int attempt = 1;
        int connectAttempts = 1;
        long connTimeout0 = this.connTimeout;
        IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(this);
        while (true) {
            try {
                client = new GridShmemCommunicationClient(connIdx, this.metricsLsnr, port, timeoutHelper.nextTimeoutChunk(this.connTimeout), this.log, this.getSpiContext().messageFormatter());
            }
            catch (IgniteCheckedException e) {
                if (timeoutHelper.checkFailureTimeoutReached(e)) {
                    throw e;
                }
                if (connectAttempts < 2 && X.hasCause(e, ConnectException.class)) {
                    ++connectAttempts;
                    continue;
                }
                throw e;
            }
            try {
                this.safeHandshake(client, null, node.id(), timeoutHelper.nextTimeoutChunk(connTimeout0), null, null);
            }
            catch (IgniteSpiOperationTimeoutException | HandshakeTimeoutException e) {
                client.forceClose();
                if (this.failureDetectionTimeoutEnabled() && (e instanceof HandshakeTimeoutException || timeoutHelper.checkFailureTimeoutReached(e))) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Handshake timed out (failure threshold reached) [failureDetectionTimeout=" + this.failureDetectionTimeout() + ", err=" + e.getMessage() + ", client=" + client + ']');
                    }
                    throw e;
                }
                assert (!this.failureDetectionTimeoutEnabled());
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 + ", err=" + e.getMessage() + ", client=" + client + ']');
                }
                if (attempt == this.reconCnt || connTimeout0 > this.maxConnTimeout) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Handshake timedout (will stop attempts to perform the handshake) [timeout=" + connTimeout0 + ", maxConnTimeout=" + this.maxConnTimeout + ", attempt=" + attempt + ", reconCnt=" + this.reconCnt + ", err=" + e.getMessage() + ", client=" + client + ']');
                    }
                    throw e;
                }
                ++attempt;
                connTimeout0 *= 2L;
                continue;
            }
            catch (Error | RuntimeException | IgniteCheckedException e) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Caught exception (will close client) [err=" + e.getMessage() + ", client=" + client + ']');
                }
                client.forceClose();
                throw e;
            }
            break;
        }
        return client;
    }

    private void checkClientQueueSize(GridNioSession ses, int msgQueueSize) {
        ClusterNode node;
        ConnectionKey id;
        if (this.slowClientQueueLimit > 0 && msgQueueSize > this.slowClientQueueLimit && (id = (ConnectionKey)ses.meta(CONN_IDX_META)) != null && (node = this.getSpiContext().node(id.nodeId)) != null && node.isClient()) {
            String msg = "Client node outbound message queue size exceeded slowClientQueueLimit, the client will be dropped (consider changing 'slowClientQueueLimit' configuration property) [srvNode=" + this.getSpiContext().localNode().id() + ", clientNode=" + node + ", slowClientQueueLimit=" + this.slowClientQueueLimit + ']';
            U.quietAndWarn(this.log, msg);
            this.getSpiContext().failNode(id.nodeId(), msg);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
        LinkedHashSet<Object> addrs;
        boolean isExtAddrsExist;
        Collection rmtAddrs0 = (Collection)node.attribute(this.createSpiAttributeName(ATTR_ADDRS));
        Collection rmtHostNames0 = (Collection)node.attribute(this.createSpiAttributeName(ATTR_HOST_NAMES));
        Integer boundPort = (Integer)node.attribute(this.createSpiAttributeName(ATTR_PORT));
        Collection extAddrs = (Collection)node.attribute(this.createSpiAttributeName(ATTR_EXT_ADDRS));
        boolean isRmtAddrsExist = !F.isEmpty(rmtAddrs0) && boundPort != null;
        boolean bl = isExtAddrsExist = !F.isEmpty(extAddrs);
        if (!isRmtAddrsExist && !isExtAddrsExist) {
            throw new IgniteCheckedException("Failed to send message to the destination node. Node doesn't have any TCP communication addresses or mapped external addresses. Check configuration and make sure that you use the same communication SPI on all nodes. Remote node id: " + node.id());
        }
        if (isRmtAddrsExist) {
            ArrayList<InetSocketAddress> addrs0 = new ArrayList<InetSocketAddress>(U.toSocketAddresses(rmtAddrs0, rmtHostNames0, boundPort));
            boolean sameHost = U.sameMacs(this.getSpiContext().localNode(), node);
            Collections.sort(addrs0, U.inetAddressesComparator(sameHost));
            addrs = new LinkedHashSet<InetSocketAddress>(addrs0);
        } else {
            addrs = new LinkedHashSet();
        }
        if (isExtAddrsExist) {
            addrs.addAll(extAddrs);
        }
        boolean conn = false;
        GridCommunicationClient client = null;
        Throwable errs = null;
        int connectAttempts = 1;
        for (InetSocketAddress addr : addrs) {
            long connTimeout0 = this.connTimeout;
            int attempt = 1;
            IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(this);
            while (!conn) {
                try {
                    SocketChannel ch = SocketChannel.open();
                    ch.configureBlocking(true);
                    ch.socket().setTcpNoDelay(this.tcpNoDelay);
                    ch.socket().setKeepAlive(true);
                    if (this.sockRcvBuf > 0) {
                        ch.socket().setReceiveBufferSize(this.sockRcvBuf);
                    }
                    if (this.sockSndBuf > 0) {
                        ch.socket().setSendBufferSize(this.sockSndBuf);
                    }
                    if (this.getSpiContext().node(node.id()) == null) {
                        U.closeQuiet(ch);
                        throw new ClusterTopologyCheckedException("Failed to send message (node left topology): " + node);
                    }
                    ConnectionKey connKey = new ConnectionKey(node.id(), connIdx, -1L);
                    GridNioRecoveryDescriptor recoveryDesc = this.outRecoveryDescriptor(node, connKey);
                    if (!recoveryDesc.reserve()) {
                        U.closeQuiet(ch);
                        return null;
                    }
                    long rcvCnt = -1L;
                    HashMap<Integer, Object> meta = new HashMap<Integer, Object>();
                    GridSslMeta sslMeta = null;
                    try {
                        ch.socket().connect(addr, (int)timeoutHelper.nextTimeoutChunk(this.connTimeout));
                        if (this.isSslEnabled()) {
                            sslMeta = new GridSslMeta();
                            meta.put(GridNioSessionMetaKey.SSL_META.ordinal(), sslMeta);
                            SSLEngine sslEngine = this.ignite.configuration().getSslContextFactory().create().createSSLEngine();
                            sslEngine.setUseClientMode(true);
                            sslMeta.sslEngine(sslEngine);
                        }
                        Integer handshakeConnIdx = this.useMultipleConnections(node) ? Integer.valueOf(connIdx) : null;
                        rcvCnt = this.safeHandshake(ch, recoveryDesc, node.id(), timeoutHelper.nextTimeoutChunk(connTimeout0), sslMeta, handshakeConnIdx);
                        if (rcvCnt == -1L) {
                            GridCommunicationClient gridCommunicationClient = null;
                            return gridCommunicationClient;
                        }
                    }
                    finally {
                        if (recoveryDesc != null && rcvCnt == -1L) {
                            recoveryDesc.release();
                        }
                    }
                    try {
                        meta.put(CONN_IDX_META, connKey);
                        if (recoveryDesc != null) {
                            recoveryDesc.onHandshake(rcvCnt);
                            meta.put(-1, recoveryDesc);
                        }
                        GridNioSession ses = (GridNioSession)this.nioSrvr.createSession(ch, meta).get();
                        client = new GridTcpNioCommunicationClient(connIdx, ses, this.log);
                        conn = true;
                    }
                    finally {
                        if (conn || recoveryDesc == null) continue;
                        recoveryDesc.release();
                    }
                }
                catch (IgniteSpiOperationTimeoutException | HandshakeTimeoutException e) {
                    if (client != null) {
                        client.forceClose();
                        client = null;
                    }
                    if (this.failureDetectionTimeoutEnabled() && (e instanceof HandshakeTimeoutException || timeoutHelper.checkFailureTimeoutReached(e))) {
                        String msg = "Handshake timed out (failure detection timeout is reached) [failureDetectionTimeout=" + this.failureDetectionTimeout() + ", addr=" + addr + ']';
                        this.onException(msg, e);
                        if (this.log.isDebugEnabled()) {
                            this.log.debug(msg);
                        }
                        if (errs == null) {
                            errs = new IgniteCheckedException("Failed to connect to node (is node still alive?). Make sure that each ComputeTask and cache Transaction has a timeout set in order to prevent parties from waiting forever in case of network issues [nodeId=" + node.id() + ", addrs=" + addrs + ']');
                        }
                        errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e));
                        break;
                    }
                    assert (!this.failureDetectionTimeoutEnabled());
                    this.onException("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 + ", addr=" + addr + ']', e);
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 + ", addr=" + addr + ", err=" + e + ']');
                    }
                    if (attempt == this.reconCnt || connTimeout0 > this.maxConnTimeout) {
                        if (this.log.isDebugEnabled()) {
                            this.log.debug("Handshake timedout (will stop attempts to perform the handshake) [timeout=" + connTimeout0 + ", maxConnTimeout=" + this.maxConnTimeout + ", attempt=" + attempt + ", reconCnt=" + this.reconCnt + ", err=" + e.getMessage() + ", addr=" + addr + ']');
                        }
                        if (errs == null) {
                            errs = new IgniteCheckedException("Failed to connect to node (is node still alive?). Make sure that each ComputeTask and cache Transaction has a timeout set in order to prevent parties from waiting forever in case of network issues [nodeId=" + node.id() + ", addrs=" + addrs + ']');
                        }
                        errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e));
                        break;
                    }
                    ++attempt;
                    connTimeout0 *= 2L;
                }
                catch (Exception e) {
                    boolean failureDetThrReached;
                    if (client != null) {
                        client.forceClose();
                        client = null;
                    }
                    this.onException("Client creation failed [addr=" + addr + ", err=" + e + ']', e);
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Client creation failed [addr=" + addr + ", err=" + e + ']');
                    }
                    if (failureDetThrReached = timeoutHelper.checkFailureTimeoutReached(e)) {
                        LT.warn(this.log, "Connect timed out (consider increasing 'failureDetectionTimeout' configuration property) [addr=" + addr + ", failureDetectionTimeout=" + this.failureDetectionTimeout() + ']');
                    } else if (X.hasCause(e, SocketTimeoutException.class)) {
                        LT.warn(this.log, "Connect timed out (consider increasing 'connTimeout' configuration property) [addr=" + addr + ", connTimeout=" + this.connTimeout + ']');
                    }
                    if (errs == null) {
                        errs = new IgniteCheckedException("Failed to connect to node (is node still alive?). Make sure that each ComputeTask and cache Transaction has a timeout set in order to prevent parties from waiting forever in case of network issues [nodeId=" + node.id() + ", addrs=" + addrs + ']');
                    }
                    errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e));
                    if (failureDetThrReached || connectAttempts >= 2 || !(e instanceof ConnectException) && !X.hasCause(e, ConnectException.class)) break;
                    ++connectAttempts;
                }
            }
            if (!conn) continue;
        }
        if (client != null) return client;
        assert (errs != null);
        if (X.hasCause(errs, ConnectException.class)) {
            LT.warn(this.log, "Failed to connect to a remote node (make sure that destination node is alive and operating system firewall is disabled on local and remote hosts) [addrs=" + addrs + ']');
        }
        if (this.getSpiContext().node(node.id()) == null) throw errs;
        if (!CU.clientNode(node)) {
            if (CU.clientNode(this.getLocalNode())) throw errs;
        }
        if (!X.hasCause(errs, ConnectException.class, SocketTimeoutException.class, HandshakeTimeoutException.class, IgniteSpiOperationTimeoutException.class)) throw errs;
        LT.warn(this.log, "TcpCommunicationSpi failed to establish connection to node, node will be dropped from cluster [rmtNode=" + node + ", err=" + errs + ", connectErrs=" + Arrays.toString(errs.getSuppressed()) + ']');
        this.getSpiContext().failNode(node.id(), "TcpCommunicationSpi failed to establish connection to node [rmtNode=" + node + ", errs=" + errs + ", connectErrs=" + Arrays.toString(errs.getSuppressed()) + ']');
        throw errs;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T> long safeHandshake(T client, @Nullable GridNioRecoveryDescriptor recovery, UUID rmtNodeId, long timeout, GridSslMeta sslMeta, @Nullable Integer handshakeConnIdx) throws IgniteCheckedException {
        long rcvCnt;
        block54: {
            HandshakeTimeoutObject obj = new HandshakeTimeoutObject(client, U.currentTimeMillis() + timeout);
            this.addTimeoutObject(obj);
            rcvCnt = 0L;
            try {
                if (client instanceof GridCommunicationClient) {
                    ((GridCommunicationClient)client).doHandshake(new HandshakeClosure(rmtNodeId));
                    break block54;
                }
                SocketChannel ch = (SocketChannel)client;
                boolean success = false;
                try {
                    int read;
                    ByteBuffer buf;
                    BlockingSslHandler sslHnd = null;
                    if (this.isSslEnabled()) {
                        assert (sslMeta != null);
                        sslHnd = new BlockingSslHandler(sslMeta.sslEngine(), ch, this.directBuf, ByteOrder.nativeOrder(), this.log);
                        if (!sslHnd.handshake()) {
                            throw new IgniteCheckedException("SSL handshake is not completed.");
                        }
                        ByteBuffer handBuff = sslHnd.applicationBuffer();
                        if (handBuff.remaining() < 17) {
                            buf = ByteBuffer.allocate(1000);
                            read = ch.read(buf);
                            if (read == -1) {
                                throw new IgniteCheckedException("Failed to read remote node ID (connection closed).");
                            }
                            buf.flip();
                            buf = sslHnd.decode(buf);
                        } else {
                            buf = handBuff;
                        }
                    } else {
                        buf = ByteBuffer.allocate(17);
                        for (int i = 0; i < 17; i += read) {
                            read = ch.read(buf);
                            if (read != -1) continue;
                            throw new IgniteCheckedException("Failed to read remote node ID (connection closed).");
                        }
                    }
                    UUID rmtNodeId0 = U.bytesToUuid(buf.array(), 1);
                    if (!rmtNodeId.equals(rmtNodeId0)) {
                        throw new IgniteCheckedException("Remote node ID is not as expected [expected=" + rmtNodeId + ", rcvd=" + rmtNodeId0 + ']');
                    }
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Received remote node ID: " + rmtNodeId0);
                    }
                    if (this.isSslEnabled()) {
                        assert (sslHnd != null);
                        ch.write(sslHnd.encrypt(ByteBuffer.wrap(U.IGNITE_HEADER)));
                    } else {
                        ch.write(ByteBuffer.wrap(U.IGNITE_HEADER));
                    }
                    ClusterNode locNode = this.getLocalNode();
                    if (locNode == null) {
                        throw new IgniteCheckedException("Local node has not been started or fully initialized [isStopping=" + this.getSpiContext().isStopping() + ']');
                    }
                    if (recovery != null) {
                        HandshakeMessage msg;
                        int msgSize = 33;
                        if (handshakeConnIdx != null) {
                            msg = new HandshakeMessage2(locNode.id(), recovery.incrementConnectCount(), recovery.received(), handshakeConnIdx);
                            msgSize += 4;
                        } else {
                            msg = new HandshakeMessage(locNode.id(), recovery.incrementConnectCount(), recovery.received());
                        }
                        if (this.log.isDebugEnabled()) {
                            this.log.debug("Write handshake message [rmtNode=" + rmtNodeId + ", msg=" + msg + ']');
                        }
                        buf = ByteBuffer.allocate(msgSize);
                        buf.order(ByteOrder.nativeOrder());
                        boolean written = msg.writeTo(buf, null);
                        assert (written);
                        buf.flip();
                        if (this.isSslEnabled()) {
                            assert (sslHnd != null);
                            ch.write(sslHnd.encrypt(buf));
                        } else {
                            ch.write(buf);
                        }
                    } else if (this.isSslEnabled()) {
                        assert (sslHnd != null);
                        ch.write(sslHnd.encrypt(ByteBuffer.wrap(this.nodeIdMessage().nodeIdBytesWithType)));
                    } else {
                        ch.write(ByteBuffer.wrap(this.nodeIdMessage().nodeIdBytesWithType));
                    }
                    if (recovery != null) {
                        if (this.log.isDebugEnabled()) {
                            this.log.debug("Waiting for handshake [rmtNode=" + rmtNodeId + ']');
                        }
                        if (this.isSslEnabled()) {
                            ByteBuffer inBuf;
                            assert (sslHnd != null);
                            buf = ByteBuffer.allocate(1000);
                            ByteBuffer decode = null;
                            buf.order(ByteOrder.nativeOrder());
                            for (int i = 0; i < 9; i += decode.remaining()) {
                                int read2 = ch.read(buf);
                                if (read2 == -1) {
                                    throw new IgniteCheckedException("Failed to read remote node recovery handshake (connection closed).");
                                }
                                buf.flip();
                                decode = sslHnd.decode(buf);
                                buf.clear();
                            }
                            rcvCnt = decode.getLong(1);
                            if (decode.limit() > 9) {
                                decode.position(9);
                                sslMeta.decodedBuffer(decode);
                            }
                            if ((inBuf = sslHnd.inputBuffer()).position() > 0) {
                                sslMeta.encodedBuffer(inBuf);
                            }
                        } else {
                            int read3;
                            buf = ByteBuffer.allocate(9);
                            buf.order(ByteOrder.nativeOrder());
                            for (int i = 0; i < 9; i += read3) {
                                read3 = ch.read(buf);
                                if (read3 != -1) continue;
                                throw new IgniteCheckedException("Failed to read remote node recovery handshake (connection closed).");
                            }
                            rcvCnt = buf.getLong(1);
                        }
                        if (this.log.isDebugEnabled()) {
                            this.log.debug("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']');
                        }
                        if (rcvCnt == -1L) {
                            if (this.log.isDebugEnabled()) {
                                this.log.debug("Connection rejected, will retry client creation [rmtNode=" + rmtNodeId + ']');
                            }
                        } else {
                            success = true;
                        }
                    } else {
                        success = true;
                    }
                }
                catch (IOException e) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Failed to read from channel: " + e);
                    }
                    throw new IgniteCheckedException("Failed to read from channel.", e);
                }
                finally {
                    if (!success) {
                        U.closeQuiet(ch);
                    }
                }
            }
            finally {
                boolean cancelled = obj.cancel();
                if (cancelled) {
                    this.removeTimeoutObject(obj);
                }
                if (!cancelled) {
                    throw new HandshakeTimeoutException("Failed to perform handshake due to timeout (consider increasing 'connectionTimeout' configuration property).");
                }
            }
        }
        return rcvCnt;
    }

    protected void notifyListener(UUID sndId, Message msg, IgniteRunnable msgC) {
        CommunicationListener<Message> lsnr = this.lsnr;
        if (lsnr != null) {
            lsnr.onMessage(sndId, msg, msgC);
        } else if (this.log.isDebugEnabled()) {
            this.log.debug("Received communication message without any registered listeners (will ignore, is node stopping?) [senderNodeId=" + sndId + ", msg=" + msg + ']');
        }
    }

    public void simulateNodeFailure() {
        if (this.nioSrvr != null) {
            this.nioSrvr.stop();
        }
        U.interrupt(this.commWorker);
        U.join(this.commWorker, this.log);
        Iterator i$ = this.clients.values().iterator();
        while (i$.hasNext()) {
            GridCommunicationClient[] clients0;
            for (GridCommunicationClient client : clients0 = (GridCommunicationClient[])i$.next()) {
                if (client == null) continue;
                client.forceClose();
            }
        }
    }

    private GridNioRecoveryDescriptor outRecoveryDescriptor(ClusterNode node, ConnectionKey key) {
        if (this.usePairedConnections(node)) {
            return this.recoveryDescriptor(this.outRecDescs, true, node, key);
        }
        return this.recoveryDescriptor(this.recoveryDescs, false, node, key);
    }

    private GridNioRecoveryDescriptor inRecoveryDescriptor(ClusterNode node, ConnectionKey key) {
        if (this.usePairedConnections(node)) {
            return this.recoveryDescriptor(this.inRecDescs, true, node, key);
        }
        return this.recoveryDescriptor(this.recoveryDescs, false, node, key);
    }

    private boolean useMultipleConnections(ClusterNode node) {
        return node.version().compareToIgnoreTimestamp(MULTIPLE_CONN_SINCE_VER) >= 0;
    }

    private boolean usePairedConnections(ClusterNode node) {
        if (this.usePairedConnections) {
            Boolean attr = (Boolean)node.attribute(this.createSpiAttributeName(ATTR_PAIRED_CONN));
            return attr != null && attr != false;
        }
        return false;
    }

    private GridNioRecoveryDescriptor recoveryDescriptor(ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> recoveryDescs, boolean pairedConnections, ClusterNode node, ConnectionKey key) {
        GridNioRecoveryDescriptor recovery = (GridNioRecoveryDescriptor)recoveryDescs.get(key);
        if (recovery == null) {
            int maxSize = Math.max(this.msgQueueLimit, this.ackSndThreshold);
            int queueLimit = this.unackedMsgsBufSize != 0 ? this.unackedMsgsBufSize : maxSize * 128;
            recovery = new GridNioRecoveryDescriptor(pairedConnections, queueLimit, node, this.log);
            GridNioRecoveryDescriptor old = recoveryDescs.putIfAbsent(key, recovery);
            if (old != null) {
                recovery = old;
            }
        }
        return recovery;
    }

    private void onException(String msg, Exception e) {
        this.getExceptionRegistry().onException(msg, e);
    }

    private NodeIdMessage nodeIdMessage() {
        UUID id;
        ClusterNode locNode = this.getLocalNode();
        if (locNode == null) {
            U.warn(this.log, "Local node is not started or fully initialized [isStopping=" + this.getSpiContext().isStopping() + ']');
            id = new UUID(0L, 0L);
        } else {
            id = locNode.id();
        }
        return new NodeIdMessage(id);
    }

    public String toString() {
        return S.toString(TcpCommunicationSpi.class, this);
    }

    static interface ConnectionPolicy {
        public int connectionIndex();
    }

    private static class ConnectionKey {
        private final UUID nodeId;
        private final int idx;
        private final long connCnt;

        ConnectionKey(UUID nodeId, int idx, long connCnt) {
            this.nodeId = nodeId;
            this.idx = idx;
            this.connCnt = connCnt;
        }

        long connectCount() {
            return this.connCnt;
        }

        UUID nodeId() {
            return this.nodeId;
        }

        int connectionIndex() {
            return this.idx;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            ConnectionKey key = (ConnectionKey)o;
            return this.idx == key.idx && this.nodeId.equals(key.nodeId);
        }

        public int hashCode() {
            int res = this.nodeId.hashCode();
            res = 31 * res + this.idx;
            return res;
        }

        public String toString() {
            return S.toString(ConnectionKey.class, this);
        }
    }

    private static class DisconnectedSessionInfo {
        private final GridNioRecoveryDescriptor recoveryDesc;
        private int connIdx;

        DisconnectedSessionInfo(@Nullable GridNioRecoveryDescriptor recoveryDesc, int connIdx) {
            this.recoveryDesc = recoveryDesc;
            this.connIdx = connIdx;
        }

        public String toString() {
            return S.toString(DisconnectedSessionInfo.class, this);
        }
    }

    private class ConnectGateway {
        private GridSpinReadWriteLock lock = new GridSpinReadWriteLock();
        private IgniteException err;

        private ConnectGateway() {
        }

        void enter() {
            this.lock.readLock();
            if (this.err != null) {
                this.lock.readUnlock();
                throw this.err;
            }
        }

        boolean tryEnter() {
            boolean res;
            this.lock.readLock();
            boolean bl = res = this.err == null;
            if (!res) {
                this.lock.readUnlock();
            }
            return res;
        }

        void leave() {
            this.lock.readUnlock();
        }

        void disconnected(IgniteFuture<?> reconnectFut) {
            this.lock.writeLock();
            this.err = new IgniteClientDisconnectedException(reconnectFut, "Failed to connect, client node disconnected.");
            this.lock.writeUnlock();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void reconnected() {
            this.lock.writeLock();
            try {
                if (this.err instanceof IgniteClientDisconnectedException) {
                    this.err = null;
                }
            }
            finally {
                this.lock.writeUnlock();
            }
        }

        void stopped() {
            this.lock.readLock();
            this.err = new IgniteException("Failed to connect, node stopped.");
            this.lock.readUnlock();
        }
    }

    public static class NodeIdMessage
    implements Message {
        private static final long serialVersionUID = 0L;
        private byte[] nodeIdBytes;
        private byte[] nodeIdBytesWithType;

        public NodeIdMessage() {
        }

        private NodeIdMessage(UUID nodeId) {
            assert (nodeId != null);
            this.nodeIdBytes = U.uuidToBytes(nodeId);
            this.nodeIdBytesWithType = new byte[this.nodeIdBytes.length + 1];
            this.nodeIdBytesWithType[0] = -1;
            System.arraycopy(this.nodeIdBytes, 0, this.nodeIdBytesWithType, 1, this.nodeIdBytes.length);
        }

        @Override
        public void onAckReceived() {
        }

        @Override
        public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
            assert (this.nodeIdBytes.length == 16);
            if (buf.remaining() < 17) {
                return false;
            }
            buf.put((byte)-1);
            buf.put(this.nodeIdBytes);
            return true;
        }

        @Override
        public boolean readFrom(ByteBuffer buf, MessageReader reader) {
            if (buf.remaining() < 16) {
                return false;
            }
            this.nodeIdBytes = new byte[16];
            buf.get(this.nodeIdBytes);
            return true;
        }

        @Override
        public byte directType() {
            return -1;
        }

        @Override
        public byte fieldsCount() {
            return 0;
        }

        public String toString() {
            return S.toString(NodeIdMessage.class, this);
        }
    }

    public static class RecoveryLastReceivedMessage
    implements Message {
        private static final long serialVersionUID = 0L;
        private long rcvCnt;

        public RecoveryLastReceivedMessage() {
        }

        public RecoveryLastReceivedMessage(long rcvCnt) {
            this.rcvCnt = rcvCnt;
        }

        public long received() {
            return this.rcvCnt;
        }

        @Override
        public void onAckReceived() {
        }

        @Override
        public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
            if (buf.remaining() < 9) {
                return false;
            }
            buf.put((byte)-2);
            buf.putLong(this.rcvCnt);
            return true;
        }

        @Override
        public boolean readFrom(ByteBuffer buf, MessageReader reader) {
            if (buf.remaining() < 8) {
                return false;
            }
            this.rcvCnt = buf.getLong();
            return true;
        }

        @Override
        public byte directType() {
            return -2;
        }

        @Override
        public byte fieldsCount() {
            return 0;
        }

        public String toString() {
            return S.toString(RecoveryLastReceivedMessage.class, this);
        }
    }

    public static class HandshakeMessage2
    extends HandshakeMessage {
        private static final long serialVersionUID = 0L;
        private int connIdx;

        public HandshakeMessage2() {
        }

        HandshakeMessage2(UUID nodeId, long connectCnt, long rcvCnt, int connIdx) {
            super(nodeId, connectCnt, rcvCnt);
            this.connIdx = connIdx;
        }

        @Override
        public byte directType() {
            return -44;
        }

        @Override
        public int connectionIndex() {
            return this.connIdx;
        }

        @Override
        public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
            if (!super.writeTo(buf, writer)) {
                return false;
            }
            if (buf.remaining() < 4) {
                return false;
            }
            buf.putInt(this.connIdx);
            return true;
        }

        @Override
        public boolean readFrom(ByteBuffer buf, MessageReader reader) {
            if (!super.readFrom(buf, reader)) {
                return false;
            }
            if (buf.remaining() < 4) {
                return false;
            }
            this.connIdx = buf.getInt();
            return true;
        }

        @Override
        public String toString() {
            return S.toString(HandshakeMessage2.class, this);
        }
    }

    public static class HandshakeMessage
    implements Message {
        private static final long serialVersionUID = 0L;
        private UUID nodeId;
        private long rcvCnt;
        private long connectCnt;

        public HandshakeMessage() {
        }

        public HandshakeMessage(UUID nodeId, long connectCnt, long rcvCnt) {
            assert (nodeId != null);
            assert (rcvCnt >= 0L) : rcvCnt;
            this.nodeId = nodeId;
            this.connectCnt = connectCnt;
            this.rcvCnt = rcvCnt;
        }

        public int connectionIndex() {
            return 0;
        }

        public long connectCount() {
            return this.connectCnt;
        }

        public long received() {
            return this.rcvCnt;
        }

        public UUID nodeId() {
            return this.nodeId;
        }

        @Override
        public void onAckReceived() {
        }

        @Override
        public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
            if (buf.remaining() < 33) {
                return false;
            }
            buf.put(this.directType());
            byte[] bytes = U.uuidToBytes(this.nodeId);
            assert (bytes.length == 16) : bytes.length;
            buf.put(bytes);
            buf.putLong(this.rcvCnt);
            buf.putLong(this.connectCnt);
            return true;
        }

        @Override
        public boolean readFrom(ByteBuffer buf, MessageReader reader) {
            if (buf.remaining() < 32) {
                return false;
            }
            byte[] nodeIdBytes = new byte[16];
            buf.get(nodeIdBytes);
            this.nodeId = U.bytesToUuid(nodeIdBytes, 0);
            this.rcvCnt = buf.getLong();
            this.connectCnt = buf.getLong();
            return true;
        }

        @Override
        public byte directType() {
            return -3;
        }

        @Override
        public byte fieldsCount() {
            throw new UnsupportedOperationException();
        }

        public String toString() {
            return S.toString(HandshakeMessage.class, this);
        }
    }

    private class HandshakeClosure
    extends IgniteInClosure2X<InputStream, OutputStream> {
        private static final long serialVersionUID = 0L;
        private final UUID rmtNodeId;

        private HandshakeClosure(UUID rmtNodeId) {
            this.rmtNodeId = rmtNodeId;
        }

        @Override
        public void applyx(InputStream in, OutputStream out) throws IgniteCheckedException {
            try {
                int cnt;
                byte[] b = new byte[17];
                for (int n = 0; n < 17; n += cnt) {
                    cnt = in.read(b, n, 17 - n);
                    if (cnt >= 0) continue;
                    throw new IgniteCheckedException("Failed to get remote node ID (end of stream reached)");
                }
                UUID id = U.bytesToUuid(b, 1);
                if (!this.rmtNodeId.equals(id)) {
                    throw new IgniteCheckedException("Remote node ID is not as expected [expected=" + this.rmtNodeId + ", rcvd=" + id + ']');
                }
                if (TcpCommunicationSpi.this.log.isDebugEnabled()) {
                    TcpCommunicationSpi.this.log.debug("Received remote node ID: " + id);
                }
            }
            catch (SocketTimeoutException e) {
                throw new IgniteCheckedException("Failed to perform handshake due to timeout (consider increasing 'connectionTimeout' configuration property).", e);
            }
            catch (IOException e) {
                throw new IgniteCheckedException("Failed to perform handshake.", e);
            }
            try {
                ClusterNode localNode = TcpCommunicationSpi.this.getLocalNode();
                if (localNode == null) {
                    throw new IgniteSpiException("Local node has not been started or fully initialized [isStopping=" + TcpCommunicationSpi.this.getSpiContext().isStopping() + ']');
                }
                UUID id = localNode.id();
                NodeIdMessage msg = new NodeIdMessage(id);
                out.write(U.IGNITE_HEADER);
                out.write(-1);
                out.write(msg.nodeIdBytes);
                out.flush();
                if (TcpCommunicationSpi.this.log.isDebugEnabled()) {
                    TcpCommunicationSpi.this.log.debug("Sent local node ID [locNodeId=" + id + ", rmtNodeId=" + this.rmtNodeId + ']');
                }
            }
            catch (IOException e) {
                throw new IgniteCheckedException("Failed to perform handshake.", e);
            }
        }
    }

    private static class HandshakeTimeoutObject<T>
    implements IgniteSpiTimeoutObject {
        private final IgniteUuid id = IgniteUuid.randomUuid();
        private final T obj;
        private final long endTime;
        private final AtomicBoolean done = new AtomicBoolean();

        private HandshakeTimeoutObject(T obj, long endTime) {
            assert (obj != null);
            assert (obj instanceof GridCommunicationClient || obj instanceof SelectableChannel);
            assert (endTime > 0L);
            this.obj = obj;
            this.endTime = endTime;
        }

        boolean cancel() {
            return this.done.compareAndSet(false, true);
        }

        @Override
        public void onTimeout() {
            if (this.done.compareAndSet(false, true)) {
                if (this.obj instanceof GridCommunicationClient) {
                    ((GridCommunicationClient)this.obj).forceClose();
                } else {
                    U.closeQuiet((AbstractInterruptibleChannel)this.obj);
                }
            }
        }

        @Override
        public long endTime() {
            return this.endTime;
        }

        @Override
        public IgniteUuid id() {
            return this.id;
        }

        public String toString() {
            return S.toString(HandshakeTimeoutObject.class, this);
        }
    }

    private static class ConnectFuture
    extends GridFutureAdapter<GridCommunicationClient> {
        private static final long serialVersionUID = 0L;

        private ConnectFuture() {
        }
    }

    private class CommunicationWorker
    extends IgniteSpiThread {
        private final BlockingQueue<DisconnectedSessionInfo> q;

        private CommunicationWorker(String gridName) {
            super(gridName, "tcp-comm-worker", TcpCommunicationSpi.this.log);
            this.q = new LinkedBlockingQueue<DisconnectedSessionInfo>();
        }

        @Override
        protected void body() throws InterruptedException {
            if (TcpCommunicationSpi.this.log.isDebugEnabled()) {
                TcpCommunicationSpi.this.log.debug("Tcp communication worker has been started.");
            }
            while (!this.isInterrupted()) {
                DisconnectedSessionInfo disconnectData = this.q.poll(TcpCommunicationSpi.this.idleConnTimeout, TimeUnit.MILLISECONDS);
                if (disconnectData != null) {
                    this.processDisconnect(disconnectData);
                    continue;
                }
                this.processIdle();
            }
        }

        private void processIdle() {
            this.cleanupRecovery();
            for (Map.Entry e : TcpCommunicationSpi.this.clients.entrySet()) {
                UUID nodeId = (UUID)e.getKey();
                for (GridCommunicationClient client : (GridCommunicationClient[])e.getValue()) {
                    if (client == null) continue;
                    ClusterNode node = TcpCommunicationSpi.this.getSpiContext().node(nodeId);
                    if (node == null) {
                        if (TcpCommunicationSpi.this.log.isDebugEnabled()) {
                            TcpCommunicationSpi.this.log.debug("Forcing close of non-existent node connection: " + nodeId);
                        }
                        client.forceClose();
                        TcpCommunicationSpi.this.removeNodeClient(nodeId, client);
                        continue;
                    }
                    GridNioRecoveryDescriptor recovery = null;
                    if (!TcpCommunicationSpi.this.usePairedConnections(node) && client instanceof GridTcpNioCommunicationClient && (recovery = (GridNioRecoveryDescriptor)TcpCommunicationSpi.this.recoveryDescs.get(new ConnectionKey(node.id(), client.connectionIndex(), -1L))) != null && recovery.lastAcknowledged() != recovery.received()) {
                        RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received());
                        if (TcpCommunicationSpi.this.log.isDebugEnabled()) {
                            TcpCommunicationSpi.this.log.debug("Send recovery acknowledgement on timeout [rmtNode=" + nodeId + ", rcvCnt=" + msg.received() + ']');
                        }
                        try {
                            TcpCommunicationSpi.this.nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(), msg);
                            recovery.lastAcknowledged(msg.received());
                        }
                        catch (IgniteCheckedException err) {
                            U.error(TcpCommunicationSpi.this.log, "Failed to send message: " + err, err);
                        }
                        continue;
                    }
                    long idleTime = client.getIdleTime();
                    if (idleTime < TcpCommunicationSpi.this.idleConnTimeout) continue;
                    if (recovery == null && TcpCommunicationSpi.this.usePairedConnections(node)) {
                        recovery = (GridNioRecoveryDescriptor)TcpCommunicationSpi.this.outRecDescs.get(new ConnectionKey(node.id(), client.connectionIndex(), -1L));
                    }
                    if (recovery != null && recovery.nodeAlive(TcpCommunicationSpi.this.getSpiContext().node(nodeId)) && !recovery.messagesRequests().isEmpty()) {
                        if (!TcpCommunicationSpi.this.log.isDebugEnabled()) continue;
                        TcpCommunicationSpi.this.log.debug("Node connection is idle, but there are unacknowledged messages, will wait: " + nodeId);
                        continue;
                    }
                    if (TcpCommunicationSpi.this.log.isDebugEnabled()) {
                        TcpCommunicationSpi.this.log.debug("Closing idle node connection: " + nodeId);
                    }
                    if (!client.close() && !client.closed()) continue;
                    TcpCommunicationSpi.this.removeNodeClient(nodeId, client);
                }
            }
            for (GridNioSession ses : TcpCommunicationSpi.this.nioSrvr.sessions()) {
                GridNioRecoveryDescriptor recovery = ses.inRecoveryDescriptor();
                if (recovery == null || !TcpCommunicationSpi.this.usePairedConnections(recovery.node())) continue;
                assert (ses.accepted()) : ses;
                this.sendAckOnTimeout(recovery, ses);
            }
        }

        private void sendAckOnTimeout(GridNioRecoveryDescriptor recovery, GridNioSession ses) {
            if (recovery != null && recovery.lastAcknowledged() != recovery.received()) {
                RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received());
                if (TcpCommunicationSpi.this.log.isDebugEnabled()) {
                    TcpCommunicationSpi.this.log.debug("Send recovery acknowledgement on timeout [rmtNode=" + recovery.node().id() + ", rcvCnt=" + msg.received() + ", lastAcked=" + recovery.lastAcknowledged() + ']');
                }
                try {
                    TcpCommunicationSpi.this.nioSrvr.sendSystem(ses, msg);
                    recovery.lastAcknowledged(msg.received());
                }
                catch (IgniteCheckedException e) {
                    U.error(TcpCommunicationSpi.this.log, "Failed to send message: " + e, e);
                }
            }
        }

        private void cleanupRecovery() {
            this.cleanupRecovery(TcpCommunicationSpi.this.recoveryDescs);
            this.cleanupRecovery(TcpCommunicationSpi.this.inRecDescs);
            this.cleanupRecovery(TcpCommunicationSpi.this.outRecDescs);
        }

        private void cleanupRecovery(ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> recoveryDescs) {
            GridNioRecoveryDescriptor recoveryDesc;
            HashSet left = null;
            for (Map.Entry e : recoveryDescs.entrySet()) {
                if (left != null && left.contains(e.getKey()) || (recoveryDesc = (GridNioRecoveryDescriptor)e.getValue()).nodeAlive(TcpCommunicationSpi.this.getSpiContext().node(((ConnectionKey)e.getKey()).nodeId()))) continue;
                if (left == null) {
                    left = new HashSet();
                }
                left.add(e.getKey());
            }
            if (left != null) {
                assert (!left.isEmpty());
                for (ConnectionKey id : left) {
                    recoveryDesc = (GridNioRecoveryDescriptor)recoveryDescs.get(id);
                    if (recoveryDesc == null || !recoveryDesc.onNodeLeft()) continue;
                    recoveryDescs.remove(id, recoveryDesc);
                }
            }
        }

        private void processDisconnect(DisconnectedSessionInfo sesInfo) {
            block10: {
                GridNioRecoveryDescriptor recoveryDesc = sesInfo.recoveryDesc;
                ClusterNode node = recoveryDesc.node();
                if (!recoveryDesc.nodeAlive(TcpCommunicationSpi.this.getSpiContext().node(node.id()))) {
                    return;
                }
                try {
                    if (TcpCommunicationSpi.this.log.isDebugEnabled()) {
                        TcpCommunicationSpi.this.log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']');
                    }
                    GridCommunicationClient client = TcpCommunicationSpi.this.reserveClient(node, sesInfo.connIdx);
                    client.release();
                }
                catch (IgniteCheckedException | IgniteException e) {
                    try {
                        if (recoveryDesc.nodeAlive(TcpCommunicationSpi.this.getSpiContext().node(node.id())) && TcpCommunicationSpi.this.getSpiContext().pingNode(node.id())) {
                            if (TcpCommunicationSpi.this.log.isDebugEnabled()) {
                                TcpCommunicationSpi.this.log.debug("Recovery reconnect failed, will retry [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
                            }
                            this.addProcessDisconnectRequest(sesInfo);
                        } else {
                            if (TcpCommunicationSpi.this.log.isDebugEnabled()) {
                                TcpCommunicationSpi.this.log.debug("Recovery reconnect failed, node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
                            }
                            TcpCommunicationSpi.this.onException("Recovery reconnect failed, node left [rmtNode=" + recoveryDesc.node().id() + "]", e);
                        }
                    }
                    catch (IgniteClientDisconnectedException ignored) {
                        if (!TcpCommunicationSpi.this.log.isDebugEnabled()) break block10;
                        TcpCommunicationSpi.this.log.debug("Failed to ping node, client disconnected.");
                    }
                }
            }
        }

        void addProcessDisconnectRequest(DisconnectedSessionInfo sesInfo) {
            boolean add = this.q.add(sesInfo);
            assert (add);
        }
    }

    private class ShmemWorker
    extends GridWorker {
        private final IpcEndpoint endpoint;

        private ShmemWorker(IpcEndpoint endpoint) {
            super(TcpCommunicationSpi.this.gridName, "shmem-worker", TcpCommunicationSpi.this.log);
            this.endpoint = endpoint;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected void body() throws InterruptedException {
            try {
                MessageFactory msgFactory = new MessageFactory(){
                    private MessageFactory impl;

                    @Override
                    @Nullable
                    public Message create(byte type) {
                        if (this.impl == null) {
                            this.impl = TcpCommunicationSpi.this.getSpiContext().messageFactory();
                        }
                        assert (this.impl != null);
                        return this.impl.create(type);
                    }
                };
                GridNioMessageWriterFactory writerFactory = new GridNioMessageWriterFactory(){
                    private MessageFormatter formatter;

                    @Override
                    public MessageWriter writer(GridNioSession ses) throws IgniteCheckedException {
                        if (this.formatter == null) {
                            this.formatter = TcpCommunicationSpi.this.getSpiContext().messageFormatter();
                        }
                        assert (this.formatter != null);
                        ConnectionKey connKey = (ConnectionKey)ses.meta(CONN_IDX_META);
                        return connKey != null ? this.formatter.writer(connKey.nodeId()) : null;
                    }
                };
                GridNioMessageReaderFactory readerFactory = new GridNioMessageReaderFactory(){
                    private MessageFormatter formatter;

                    @Override
                    public MessageReader reader(GridNioSession ses, MessageFactory msgFactory) throws IgniteCheckedException {
                        if (this.formatter == null) {
                            this.formatter = TcpCommunicationSpi.this.getSpiContext().messageFormatter();
                        }
                        assert (this.formatter != null);
                        ConnectionKey connKey = (ConnectionKey)ses.meta(CONN_IDX_META);
                        return connKey != null ? this.formatter.reader(connKey.nodeId(), msgFactory) : null;
                    }
                };
                IpcToNioAdapter adapter = new IpcToNioAdapter(TcpCommunicationSpi.this.metricsLsnr, this.log, this.endpoint, TcpCommunicationSpi.this.srvLsnr, writerFactory, new GridNioCodecFilter(new GridDirectParser(this.log.getLogger(GridDirectParser.class), msgFactory, readerFactory), this.log, true), new GridConnectionBytesVerifyFilter(this.log));
                adapter.serve();
            }
            finally {
                TcpCommunicationSpi.this.shmemWorkers.remove(this);
                this.endpoint.close();
            }
        }

        @Override
        public void cancel() {
            super.cancel();
            this.endpoint.close();
        }

        @Override
        protected void cleanup() {
            super.cleanup();
            this.endpoint.close();
        }

        @Override
        public String toString() {
            return S.toString(ShmemWorker.class, this);
        }
    }

    private class ShmemAcceptWorker
    extends GridWorker {
        private final IpcSharedMemoryServerEndpoint srv;

        ShmemAcceptWorker(IpcSharedMemoryServerEndpoint srv) {
            super(TcpCommunicationSpi.this.gridName, "shmem-communication-acceptor", TcpCommunicationSpi.this.log);
            this.srv = srv;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected void body() throws InterruptedException {
            try {
                while (!Thread.interrupted()) {
                    ShmemWorker e = new ShmemWorker(this.srv.accept());
                    TcpCommunicationSpi.this.shmemWorkers.add(e);
                    new IgniteThread(e).start();
                }
            }
            catch (IgniteCheckedException e) {
                if (!this.isCancelled()) {
                    U.error(this.log, "Shmem server failed.", e);
                }
            }
            finally {
                this.srv.close();
            }
        }

        @Override
        public void cancel() {
            super.cancel();
            this.srv.close();
        }
    }

    private static class HandshakeTimeoutException
    extends IgniteCheckedException {
        private static final long serialVersionUID = 0L;

        HandshakeTimeoutException(String msg) {
            super(msg);
        }
    }
}

