/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.spi.communication.tcp;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.Channel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.StringJoiner;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.AddressResolver;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteFeatures;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.IgniteTooManyOpenFilesException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.GridManager;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener;
import org.apache.ignite.internal.managers.tracing.GridTracingManager;
import org.apache.ignite.internal.processors.metric.impl.MetricUtils;
import org.apache.ignite.internal.processors.tracing.MTC;
import org.apache.ignite.internal.processors.tracing.NoopTracing;
import org.apache.ignite.internal.processors.tracing.Span;
import org.apache.ignite.internal.processors.tracing.Tracing;
import org.apache.ignite.internal.processors.tracing.messages.TraceableMessagesTable;
import org.apache.ignite.internal.util.GridConcurrentFactory;
import org.apache.ignite.internal.util.GridSpinReadWriteLock;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.ipc.IpcEndpoint;
import org.apache.ignite.internal.util.ipc.IpcToNioAdapter;
import org.apache.ignite.internal.util.ipc.shmem.IpcOutOfSystemResourcesException;
import org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryServerEndpoint;
import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
import org.apache.ignite.internal.util.nio.GridCommunicationClient;
import org.apache.ignite.internal.util.nio.GridConnectionBytesVerifyFilter;
import org.apache.ignite.internal.util.nio.GridDirectParser;
import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
import org.apache.ignite.internal.util.nio.GridNioFilter;
import org.apache.ignite.internal.util.nio.GridNioFilterAdapter;
import org.apache.ignite.internal.util.nio.GridNioMessageReaderFactory;
import org.apache.ignite.internal.util.nio.GridNioMessageTracker;
import org.apache.ignite.internal.util.nio.GridNioMessageWriterFactory;
import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
import org.apache.ignite.internal.util.nio.GridNioServer;
import org.apache.ignite.internal.util.nio.GridNioServerListener;
import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter;
import org.apache.ignite.internal.util.nio.GridNioSession;
import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
import org.apache.ignite.internal.util.nio.GridNioTracerFilter;
import org.apache.ignite.internal.util.nio.GridSelectorNioSessionImpl;
import org.apache.ignite.internal.util.nio.GridShmemCommunicationClient;
import org.apache.ignite.internal.util.nio.GridTcpNioCommunicationClient;
import org.apache.ignite.internal.util.nio.ssl.BlockingSslHandler;
import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
import org.apache.ignite.internal.util.nio.ssl.GridSslMeta;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.internal.worker.WorkersRegistry;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteExperimental;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.plugin.extensions.communication.MessageFormatter;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.ExponentialBackoffTimeoutStrategy;
import org.apache.ignite.spi.IgnitePortProtocol;
import org.apache.ignite.spi.IgniteSpiAdapter;
import org.apache.ignite.spi.IgniteSpiConfiguration;
import org.apache.ignite.spi.IgniteSpiConsistencyChecked;
import org.apache.ignite.spi.IgniteSpiContext;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.IgniteSpiMBeanAdapter;
import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
import org.apache.ignite.spi.IgniteSpiThread;
import org.apache.ignite.spi.IgniteSpiTimeoutObject;
import org.apache.ignite.spi.communication.CommunicationListener;
import org.apache.ignite.spi.communication.CommunicationSpi;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationMetricsListener;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiMBean;
import org.apache.ignite.spi.communication.tcp.internal.CommunicationListenerEx;
import org.apache.ignite.spi.communication.tcp.internal.ConnectionKey;
import org.apache.ignite.spi.communication.tcp.internal.ConnectionRequestFuture;
import org.apache.ignite.spi.communication.tcp.internal.ConnectionRequestor;
import org.apache.ignite.spi.communication.tcp.internal.HandshakeException;
import org.apache.ignite.spi.communication.tcp.internal.NodeUnreachableException;
import org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationConnectionCheckFuture;
import org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationNodeConnectionCheckFuture;
import org.apache.ignite.spi.communication.tcp.internal.TcpConnectionIndexAwareMessage;
import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage;
import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage2;
import org.apache.ignite.spi.communication.tcp.messages.HandshakeWaitMessage;
import org.apache.ignite.spi.communication.tcp.messages.NodeIdMessage;
import org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage;
import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.IgniteDiscoveryThread;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;

@IgniteSpiMultipleInstancesSupport(value=true)
@IgniteSpiConsistencyChecked(optional=false)
public class TcpCommunicationSpi
extends IgniteSpiAdapter
implements CommunicationSpi<Message> {
    private static final int CONNECTION_ESTABLISH_THRESHOLD_MS = 100;
    public static final String OUT_OF_RESOURCES_TCP_MSG = "Failed to allocate shared memory segment (switching to TCP, may be slower).";
    public static final String ATTR_ADDRS = "comm.tcp.addrs";
    public static final String ATTR_HOST_NAMES = "comm.tcp.host.names";
    public static final String ATTR_PORT = "comm.tcp.port";
    public static final String ATTR_SHMEM_PORT = "comm.shmem.tcp.port";
    public static final String ATTR_EXT_ADDRS = "comm.tcp.ext-addrs";
    public static final String ATTR_PAIRED_CONN = "comm.tcp.pairedConnection";
    public static final String ATTR_FORCE_CLIENT_SERVER_CONNECTIONS = "comm.force.client.srv.connections";
    public static final int DFLT_PORT = 47100;
    public static final int DFLT_SHMEM_PORT = -1;
    public static final long DFLT_IDLE_CONN_TIMEOUT = 600000L;
    public static final int DFLT_SOCK_BUF_SIZE = 32768;
    public static final long DFLT_CONN_TIMEOUT = 5000L;
    public static final long DFLT_MAX_CONN_TIMEOUT = 600000L;
    public static final int DFLT_RECONNECT_CNT = 10;
    public static final int DFLT_MSG_QUEUE_LIMIT = 0;
    public static final int DFLT_SELECTORS_CNT = Math.max(4, Runtime.getRuntime().availableProcessors() / 2);
    private static final int DFLT_INITIAL_TIMEOUT = 500;
    private static final int DFLT_NEED_WAIT_DELAY = 200;
    private static final int DFLT_RECONNECT_DELAY = 50;
    private static final IgniteProductVersion VERSION_SINCE_CLIENT_COULD_WAIT_TO_CONNECT = IgniteProductVersion.fromString("2.1.4");
    public static final int CONN_IDX_META = GridNioSessionMetaKey.nextUniqueKey();
    public static final int CONSISTENT_ID_META = GridNioSessionMetaKey.nextUniqueKey();
    private static final int TRACKER_META = GridNioSessionMetaKey.nextUniqueKey();
    private static final int CHANNEL_FUT_META = GridNioSessionMetaKey.nextUniqueKey();
    public static final int DFLT_PORT_RANGE = 100;
    public static final boolean DFLT_TCP_NODELAY = true;
    public static final boolean DFLT_FILTER_REACHABLE_ADDRESSES = false;
    public static final int DFLT_ACK_SND_THRESHOLD = 32;
    public static final long DFLT_SOCK_WRITE_TIMEOUT = 2000L;
    public static final int DFLT_CONN_PER_NODE = 1;
    public static final int MAX_CONN_PER_NODE = 1024;
    private static final IgniteRunnable NOOP = () -> {};
    public static final short NODE_ID_MSG_TYPE = -1;
    public static final short RECOVERY_LAST_ID_MSG_TYPE = -2;
    public static final short HANDSHAKE_MSG_TYPE = -3;
    public static final short HANDSHAKE_WAIT_MSG_TYPE = -28;
    public static final String COMMUNICATION_METRICS_GROUP_NAME = MetricUtils.metricName("communication", "tcp");
    public static final String SENT_MESSAGES_METRIC_NAME = "sentMessagesCount";
    public static final String SENT_MESSAGES_METRIC_DESC = "Total number of messages sent by current node";
    public static final String RECEIVED_MESSAGES_METRIC_NAME = "receivedMessagesCount";
    public static final String RECEIVED_MESSAGES_METRIC_DESC = "Total number of messages received by current node";
    public static final String SENT_MESSAGES_BY_TYPE_METRIC_NAME = "sentMessagesByType";
    public static final String SENT_MESSAGES_BY_TYPE_METRIC_DESC = "Total number of messages with given type sent by current node";
    public static final String RECEIVED_MESSAGES_BY_TYPE_METRIC_NAME = "receivedMessagesByType";
    public static final String RECEIVED_MESSAGES_BY_TYPE_METRIC_DESC = "Total number of messages with given type received by current node";
    public static final String SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME = "sentMessagesToNode";
    public static final String SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC = "Total number of messages sent by current node to the given node";
    public static final String RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME = "receivedMessagesFromNode";
    public static final String RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC = "Total number of messages received by current node from the given node";
    public static final Integer DISABLED_CLIENT_PORT = 0;
    private ConnectGateway connectGate;
    private ConnectionPolicy connPlc = new FirstConnectionPolicy();
    private ConnectionPolicy chConnPlc;
    private boolean enableForcibleNodeKill = IgniteSystemProperties.getBoolean("IGNITE_ENABLE_FORCIBLE_NODE_KILL");
    private boolean enableTroubleshootingLog = IgniteSystemProperties.getBoolean("IGNITE_TROUBLESHOOTING_LOGGER");
    private final GridNioServerListener<Message> srvLsnr = new GridNioServerListenerAdapter<Message>(){

        @Override
        public void onSessionWriteTimeout(GridNioSession ses) {
            LT.warn(TcpCommunicationSpi.this.log, "Communication SPI session write timed out (consider increasing 'socketWriteTimeout' configuration property) [remoteAddr=" + ses.remoteAddress() + ", writeTimeout=" + TcpCommunicationSpi.this.sockWriteTimeout + ']');
            if (TcpCommunicationSpi.this.log.isDebugEnabled()) {
                TcpCommunicationSpi.this.log.debug("Closing communication SPI session on write timeout [remoteAddr=" + ses.remoteAddress() + ", writeTimeout=" + TcpCommunicationSpi.this.sockWriteTimeout + ']');
            }
            ses.close();
        }

        @Override
        public void onConnected(GridNioSession ses) {
            block9: {
                if (ses.accepted()) {
                    if (TcpCommunicationSpi.this.log.isInfoEnabled()) {
                        TcpCommunicationSpi.this.log.info("Accepted incoming communication connection [locAddr=" + ses.localAddress() + ", rmtAddr=" + ses.remoteAddress() + ']');
                    }
                    try {
                        boolean client = Boolean.TRUE.equals(TcpCommunicationSpi.this.ignite().configuration().isClientMode());
                        if (client || TcpCommunicationSpi.this.ctxInitLatch.getCount() == 0L || !TcpCommunicationSpi.this.isHandshakeWaitSupported()) {
                            if (TcpCommunicationSpi.this.log.isDebugEnabled()) {
                                TcpCommunicationSpi.this.log.debug("Sending local node ID to newly accepted session: " + ses);
                            }
                            ses.sendNoFuture(TcpCommunicationSpi.this.nodeIdMessage(), null);
                            break block9;
                        }
                        if (TcpCommunicationSpi.this.log.isDebugEnabled()) {
                            TcpCommunicationSpi.this.log.debug("Sending handshake wait message to newly accepted session: " + ses);
                        }
                        ses.sendNoFuture(new HandshakeWaitMessage(), null);
                    }
                    catch (IgniteCheckedException e) {
                        U.error(TcpCommunicationSpi.this.log, "Failed to send message: " + e, e);
                    }
                } else if (TcpCommunicationSpi.this.log.isInfoEnabled()) {
                    TcpCommunicationSpi.this.log.info("Established outgoing communication connection [locAddr=" + ses.localAddress() + ", rmtAddr=" + ses.remoteAddress() + ']');
                }
            }
        }

        @Override
        public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
            ConnectionKey connId = (ConnectionKey)ses.meta(CONN_IDX_META);
            if (connId != null) {
                CommunicationListener lsnr0;
                GridNioRecoveryDescriptor outDesc;
                if (connId.dummy()) {
                    return;
                }
                UUID id = connId.nodeId();
                GridCommunicationClient[] nodeClients = (GridCommunicationClient[])TcpCommunicationSpi.this.clients.get(id);
                if (nodeClients != null) {
                    for (GridCommunicationClient client : nodeClients) {
                        if (!(client instanceof GridTcpNioCommunicationClient) || ((GridTcpNioCommunicationClient)client).session() != ses) continue;
                        client.close();
                        TcpCommunicationSpi.this.removeNodeClient(id, client);
                    }
                }
                if (!TcpCommunicationSpi.this.stopping && (outDesc = ses.outRecoveryDescriptor()) != null) {
                    if (outDesc.nodeAlive(TcpCommunicationSpi.this.getSpiContext().node(id))) {
                        if (!outDesc.messagesRequests().isEmpty()) {
                            if (TcpCommunicationSpi.this.log.isDebugEnabled()) {
                                TcpCommunicationSpi.this.log.debug("Session was closed but there are unacknowledged messages, will try to reconnect [rmtNode=" + outDesc.node().id() + ']');
                            }
                            DisconnectedSessionInfo disconnectData = new DisconnectedSessionInfo(outDesc, connId.connectionIndex());
                            TcpCommunicationSpi.this.commWorker.addProcessDisconnectRequest(disconnectData);
                        }
                    } else {
                        outDesc.onNodeLeft();
                    }
                }
                if ((lsnr0 = TcpCommunicationSpi.this.lsnr) != null) {
                    lsnr0.onDisconnected(id);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void onFirstMessage(GridNioSession ses, Message msg) {
            ClusterNode rmtNode;
            ConnectionKey connKey;
            UUID sndId;
            if (msg instanceof NodeIdMessage) {
                sndId = U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes(), 0);
                connKey = new ConnectionKey(sndId, 0, -1L);
            } else {
                assert (msg instanceof HandshakeMessage) : msg;
                HandshakeMessage msg0 = (HandshakeMessage)msg;
                sndId = msg0.nodeId();
                connKey = new ConnectionKey(sndId, msg0.connectionIndex(), msg0.connectCount());
            }
            if (TcpCommunicationSpi.this.log.isDebugEnabled()) {
                TcpCommunicationSpi.this.log.debug("Remote node ID received: " + sndId);
            }
            if ((rmtNode = TcpCommunicationSpi.this.getSpiContext().node(sndId)) == null) {
                DiscoverySpi discoverySpi = TcpCommunicationSpi.this.ignite().configuration().getDiscoverySpi();
                boolean unknownNode = true;
                if (discoverySpi instanceof TcpDiscoverySpi) {
                    TcpDiscoverySpi tcpDiscoverySpi = (TcpDiscoverySpi)discoverySpi;
                    ClusterNode node0 = tcpDiscoverySpi.getNode0(sndId);
                    if (node0 != null) {
                        assert (node0.isClient()) : node0;
                        if (node0.version().compareTo(VERSION_SINCE_CLIENT_COULD_WAIT_TO_CONNECT) >= 0) {
                            unknownNode = false;
                        }
                    }
                } else if (discoverySpi instanceof IgniteDiscoverySpi) {
                    boolean bl = unknownNode = !((IgniteDiscoverySpi)discoverySpi).knownNode(sndId);
                }
                if (unknownNode) {
                    U.warn(TcpCommunicationSpi.this.log, "Close incoming connection, unknown node [nodeId=" + sndId + ", ses=" + ses + ']');
                    ses.send(new RecoveryLastReceivedMessage(-4L)).listen(fut -> ses.close());
                } else {
                    ses.send(new RecoveryLastReceivedMessage(-3L)).listen(fut -> ses.close());
                }
                return;
            }
            ses.addMeta(CONSISTENT_ID_META, rmtNode.consistentId());
            ConnectionKey old = ses.addMeta(CONN_IDX_META, connKey);
            assert (old == null);
            ClusterNode locNode = TcpCommunicationSpi.this.getSpiContext().localNode();
            if (ses.remoteAddress() == null) {
                return;
            }
            assert (msg instanceof HandshakeMessage) : msg;
            HandshakeMessage msg0 = (HandshakeMessage)msg;
            if (TcpCommunicationSpi.this.log.isDebugEnabled()) {
                TcpCommunicationSpi.this.log.debug("Received handshake message [locNodeId=" + locNode.id() + ", rmtNodeId=" + sndId + ", msg=" + msg0 + ']');
            }
            if (TcpCommunicationSpi.this.isChannelConnIdx(msg0.connectionIndex())) {
                ses.send(new RecoveryLastReceivedMessage(0L));
            } else if (TcpCommunicationSpi.this.usePairedConnections(rmtNode)) {
                GridNioRecoveryDescriptor recoveryDesc = TcpCommunicationSpi.this.inRecoveryDescriptor(rmtNode, connKey);
                ConnectClosureNew c = new ConnectClosureNew(ses, recoveryDesc, rmtNode);
                boolean reserve = recoveryDesc.tryReserve(msg0.connectCount(), c);
                if (reserve) {
                    this.connectedNew(recoveryDesc, ses, true);
                } else if (c.failed) {
                    ses.send(new RecoveryLastReceivedMessage(-1L));
                    this.closeStaleConnections(connKey);
                }
            } else {
                assert (connKey.connectionIndex() >= 0) : connKey;
                GridCommunicationClient[] curClients = (GridCommunicationClient[])TcpCommunicationSpi.this.clients.get(sndId);
                GridCommunicationClient oldClient = curClients != null && connKey.connectionIndex() < curClients.length ? curClients[connKey.connectionIndex()] : null;
                boolean hasShmemClient = false;
                if (oldClient != null) {
                    if (oldClient instanceof GridTcpNioCommunicationClient) {
                        if (TcpCommunicationSpi.this.log.isInfoEnabled()) {
                            TcpCommunicationSpi.this.log.info("Received incoming connection when already connected to this node, rejecting [locNode=" + locNode.id() + ", rmtNode=" + sndId + ']');
                        }
                        ses.send(new RecoveryLastReceivedMessage(-1L));
                        this.closeStaleConnections(connKey);
                        return;
                    }
                    assert (oldClient instanceof GridShmemCommunicationClient);
                    hasShmemClient = true;
                }
                GridFutureAdapter<GridCommunicationClient> fut2 = new GridFutureAdapter<GridCommunicationClient>();
                GridFutureAdapter<GridTcpNioCommunicationClient> oldFut = TcpCommunicationSpi.this.clientFuts.putIfAbsent(connKey, fut2);
                GridNioRecoveryDescriptor recoveryDesc = TcpCommunicationSpi.this.inRecoveryDescriptor(rmtNode, connKey);
                if (oldFut == null) {
                    curClients = (GridCommunicationClient[])TcpCommunicationSpi.this.clients.get(sndId);
                    GridCommunicationClient gridCommunicationClient = oldClient = curClients != null && connKey.connectionIndex() < curClients.length ? curClients[connKey.connectionIndex()] : null;
                    if (oldClient != null) {
                        if (oldClient instanceof GridTcpNioCommunicationClient) {
                            assert (oldClient.connectionIndex() == connKey.connectionIndex()) : oldClient;
                            if (TcpCommunicationSpi.this.log.isInfoEnabled()) {
                                TcpCommunicationSpi.this.log.info("Received incoming connection when already connected to this node, rejecting [locNode=" + locNode.id() + ", rmtNode=" + sndId + ']');
                            }
                            ses.send(new RecoveryLastReceivedMessage(-1L));
                            this.closeStaleConnections(connKey);
                            fut2.onDone(oldClient);
                            return;
                        }
                        assert (oldClient instanceof GridShmemCommunicationClient);
                        hasShmemClient = true;
                    }
                    boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(), new ConnectClosure(ses, recoveryDesc, rmtNode, connKey, msg0, !hasShmemClient, fut2));
                    if (TcpCommunicationSpi.this.log.isDebugEnabled()) {
                        TcpCommunicationSpi.this.log.debug("Received incoming connection from remote node [rmtNode=" + rmtNode.id() + ", reserved=" + reserved + ", recovery=" + recoveryDesc + ']');
                    }
                    if (reserved) {
                        try {
                            GridTcpNioCommunicationClient client = this.connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient);
                            fut2.onDone(client);
                        }
                        finally {
                            TcpCommunicationSpi.this.clientFuts.remove(connKey, fut2);
                        }
                    }
                } else if (oldFut instanceof ConnectFuture && locNode.order() < rmtNode.order()) {
                    if (TcpCommunicationSpi.this.log.isInfoEnabled()) {
                        TcpCommunicationSpi.this.log.info("Received incoming connection from remote node while connecting to this node, rejecting [locNode=" + locNode.id() + ", locNodeOrder=" + locNode.order() + ", rmtNode=" + rmtNode.id() + ", rmtNodeOrder=" + rmtNode.order() + ']');
                    }
                    ses.send(new RecoveryLastReceivedMessage(-1L));
                } else {
                    boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(), new ConnectClosure(ses, recoveryDesc, rmtNode, connKey, msg0, !hasShmemClient, fut2));
                    GridTcpNioCommunicationClient client = null;
                    if (reserved) {
                        client = this.connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient);
                    }
                    if (oldFut instanceof ConnectionRequestFuture && !oldFut.isDone()) {
                        oldFut.onDone(client);
                    }
                }
            }
        }

        private void closeStaleConnections(ConnectionKey connKey) {
            for (GridNioSession ses0 : TcpCommunicationSpi.this.nioSrvr.sessions()) {
                ConnectionKey key0 = (ConnectionKey)ses0.meta(CONN_IDX_META);
                if (!ses0.accepted() || key0 == null || !key0.nodeId().equals(connKey.nodeId()) || key0.connectionIndex() != connKey.connectionIndex() || key0.connectCount() >= connKey.connectCount()) continue;
                ses0.close();
            }
        }

        @Override
        public void onMessageSent(GridNioSession ses, Message msg) {
            Object consistentId = ses.meta(CONSISTENT_ID_META);
            if (consistentId != null) {
                TcpCommunicationSpi.this.metricsLsnr.onMessageSent(msg, consistentId);
            }
        }

        private void onChannelCreate(GridSelectorNioSessionImpl ses, ConnectionKey connKey, Message msg) {
            TcpCommunicationSpi.this.cleanupLocalNodeRecoveryDescriptor(connKey);
            ses.send(msg).listen(sendFut -> {
                if (sendFut.error() != null) {
                    U.error(TcpCommunicationSpi.this.log, "Fail to send channel creation response to the remote node. Session will be closed [nodeId=" + connKey.nodeId() + ", idx=" + connKey.connectionIndex() + ']', sendFut.error());
                    ses.close();
                    return;
                }
                ses.closeSocketOnSessionClose(false);
                ses.close().listen(closeFut -> {
                    if (closeFut.error() != null) {
                        U.error(TcpCommunicationSpi.this.log, "Nio session has not been properly closed [nodeId=" + connKey.nodeId() + ", idx=" + connKey.connectionIndex() + ']', closeFut.error());
                        U.closeQuiet(ses.key().channel());
                        return;
                    }
                    TcpCommunicationSpi.this.notifyChannelEvtListener(connKey.nodeId(), ses.key().channel(), msg);
                });
            });
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onMessage(GridNioSession ses, Message msg) {
            Span span = MTC.span();
            span.addLog(() -> "Communication received");
            span.addTag("message", () -> TraceableMessagesTable.traceName(msg));
            ConnectionKey connKey = (ConnectionKey)ses.meta(CONN_IDX_META);
            if (connKey == null) {
                assert (ses.accepted()) : ses;
                if (!TcpCommunicationSpi.this.connectGate.tryEnter()) {
                    if (TcpCommunicationSpi.this.log.isDebugEnabled()) {
                        TcpCommunicationSpi.this.log.debug("Close incoming connection, failed to enter gateway.");
                    }
                    ses.send(new RecoveryLastReceivedMessage(-2L)).listen(fut -> ses.close());
                    return;
                }
                try {
                    this.onFirstMessage(ses, msg);
                }
                finally {
                    TcpCommunicationSpi.this.connectGate.leave();
                }
            } else {
                IgniteRunnable c;
                Object consistentId = ses.meta(CONSISTENT_ID_META);
                assert (consistentId != null);
                if (TcpCommunicationSpi.this.isChannelConnIdx(connKey.connectionIndex())) {
                    if (ses.meta(CHANNEL_FUT_META) == null) {
                        this.onChannelCreate((GridSelectorNioSessionImpl)ses, connKey, msg);
                    } else {
                        GridFutureAdapter fut2 = (GridFutureAdapter)ses.meta(CHANNEL_FUT_META);
                        GridSelectorNioSessionImpl ses0 = (GridSelectorNioSessionImpl)ses;
                        ses0.closeSocketOnSessionClose(false);
                        ses0.close().listen(f -> {
                            if (f.error() != null) {
                                fut2.onDone(f.error());
                                return;
                            }
                            fut2.onDone(ses0.key().channel());
                        });
                    }
                    return;
                }
                if (msg instanceof RecoveryLastReceivedMessage) {
                    TcpCommunicationSpi.this.metricsLsnr.onMessageReceived(msg, consistentId);
                    GridNioRecoveryDescriptor recovery = ses.outRecoveryDescriptor();
                    if (recovery != null) {
                        RecoveryLastReceivedMessage msg0 = (RecoveryLastReceivedMessage)msg;
                        if (TcpCommunicationSpi.this.log.isDebugEnabled()) {
                            TcpCommunicationSpi.this.log.debug("Received recovery acknowledgement [rmtNode=" + connKey.nodeId() + ", connIdx=" + connKey.connectionIndex() + ", rcvCnt=" + msg0.received() + ']');
                        }
                        recovery.ackReceived(msg0.received());
                    }
                    return;
                }
                GridNioRecoveryDescriptor recovery = ses.inRecoveryDescriptor();
                if (recovery != null) {
                    long rcvCnt = recovery.onReceived();
                    if (rcvCnt % (long)TcpCommunicationSpi.this.ackSndThreshold == 0L) {
                        if (TcpCommunicationSpi.this.log.isDebugEnabled()) {
                            TcpCommunicationSpi.this.log.debug("Send recovery acknowledgement [rmtNode=" + connKey.nodeId() + ", connIdx=" + connKey.connectionIndex() + ", rcvCnt=" + rcvCnt + ']');
                        }
                        ses.systemMessage(new RecoveryLastReceivedMessage(rcvCnt));
                        recovery.lastAcknowledged(rcvCnt);
                    }
                } else if (connKey.dummy()) {
                    assert (msg instanceof NodeIdMessage) : msg;
                    TcpCommunicationNodeConnectionCheckFuture fut3 = (TcpCommunicationNodeConnectionCheckFuture)ses.meta(TcpCommunicationConnectionCheckFuture.SES_FUT_META);
                    assert (fut3 != null) : msg;
                    fut3.onConnected(U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes(), 0));
                    TcpCommunicationSpi.this.nioSrvr.closeFromWorkerThread(ses);
                    return;
                }
                TcpCommunicationSpi.this.metricsLsnr.onMessageReceived(msg, consistentId);
                if (TcpCommunicationSpi.this.msgQueueLimit > 0) {
                    GridNioMessageTracker tracker = (GridNioMessageTracker)ses.meta(TRACKER_META);
                    if (tracker == null) {
                        tracker = new GridNioMessageTracker(ses, TcpCommunicationSpi.this.msgQueueLimit);
                        GridNioMessageTracker old = ses.addMeta(TRACKER_META, tracker);
                        assert (old == null);
                    }
                    tracker.onMessageReceived();
                    c = tracker;
                } else {
                    c = NOOP;
                }
                TcpCommunicationSpi.this.notifyListener(connKey.nodeId(), msg, c);
            }
        }

        @Override
        public void onFailure(FailureType failureType, Throwable failure) {
            if (TcpCommunicationSpi.this.ignite instanceof IgniteEx) {
                ((IgniteEx)TcpCommunicationSpi.this.ignite).context().failure().process(new FailureContext(failureType, failure));
            }
        }

        private GridTcpNioCommunicationClient connected(GridNioRecoveryDescriptor recovery, GridNioSession ses, ClusterNode node, long rcvCnt, boolean sndRes, boolean createClient) {
            ConnectionKey connKey = (ConnectionKey)ses.meta(CONN_IDX_META);
            assert (connKey != null && connKey.connectionIndex() >= 0) : connKey;
            assert (!TcpCommunicationSpi.this.usePairedConnections(node));
            recovery.onHandshake(rcvCnt);
            ses.inRecoveryDescriptor(recovery);
            ses.outRecoveryDescriptor(recovery);
            TcpCommunicationSpi.this.nioSrvr.resend(ses);
            try {
                if (sndRes) {
                    TcpCommunicationSpi.this.nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.received()));
                }
            }
            catch (IgniteCheckedException e) {
                U.error(TcpCommunicationSpi.this.log, "Failed to send message: " + e, e);
            }
            recovery.onConnected();
            GridTcpNioCommunicationClient client = null;
            if (createClient) {
                client = new GridTcpNioCommunicationClient(connKey.connectionIndex(), ses, TcpCommunicationSpi.this.log);
                TcpCommunicationSpi.this.addNodeClient(node, connKey.connectionIndex(), client);
            }
            return client;
        }

        private void connectedNew(GridNioRecoveryDescriptor recovery, GridNioSession ses, boolean sndRes) {
            try {
                ses.inRecoveryDescriptor(recovery);
                if (sndRes) {
                    TcpCommunicationSpi.this.nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.received()));
                }
                recovery.onConnected();
            }
            catch (IgniteCheckedException e) {
                U.error(TcpCommunicationSpi.this.log, "Failed to send message: " + e, e);
            }
        }

        class ConnectClosure
        implements IgniteInClosure<Boolean> {
            private static final long serialVersionUID = 0L;
            private final GridNioSession ses;
            private final GridNioRecoveryDescriptor recoveryDesc;
            private final ClusterNode rmtNode;
            private final HandshakeMessage msg;
            private final GridFutureAdapter<GridCommunicationClient> fut;
            private final boolean createClient;
            private final ConnectionKey connKey;

            ConnectClosure(GridNioSession ses, GridNioRecoveryDescriptor recoveryDesc, ClusterNode rmtNode, ConnectionKey connKey, HandshakeMessage msg, boolean createClient, GridFutureAdapter<GridCommunicationClient> fut) {
                this.ses = ses;
                this.recoveryDesc = recoveryDesc;
                this.rmtNode = rmtNode;
                this.connKey = connKey;
                this.msg = msg;
                this.createClient = createClient;
                this.fut = fut;
            }

            @Override
            public void apply(Boolean success) {
                if (success.booleanValue()) {
                    try {
                        IgniteInClosure lsnr = new IgniteInClosure<IgniteInternalFuture<?>>(){

                            @Override
                            public void apply(IgniteInternalFuture<?> msgFut) {
                                try {
                                    msgFut.get();
                                    GridTcpNioCommunicationClient client = this.connected(ConnectClosure.this.recoveryDesc, ConnectClosure.this.ses, ConnectClosure.this.rmtNode, ConnectClosure.this.msg.received(), false, ConnectClosure.this.createClient);
                                    ConnectClosure.this.fut.onDone(client);
                                }
                                catch (IgniteCheckedException e) {
                                    if (TcpCommunicationSpi.this.log.isDebugEnabled()) {
                                        TcpCommunicationSpi.this.log.debug("Failed to send recovery handshake [rmtNode=" + ConnectClosure.this.rmtNode.id() + ", err=" + e + ']');
                                    }
                                    ConnectClosure.this.recoveryDesc.release();
                                    ConnectClosure.this.fut.onDone();
                                }
                                finally {
                                    TcpCommunicationSpi.this.clientFuts.remove(ConnectClosure.this.connKey, ConnectClosure.this.fut);
                                }
                            }
                        };
                        TcpCommunicationSpi.this.nioSrvr.sendSystem(this.ses, new RecoveryLastReceivedMessage(this.recoveryDesc.received()), lsnr);
                    }
                    catch (IgniteCheckedException e) {
                        U.error(TcpCommunicationSpi.this.log, "Failed to send message: " + e, e);
                    }
                } else {
                    try {
                        this.fut.onDone();
                    }
                    finally {
                        TcpCommunicationSpi.this.clientFuts.remove(this.connKey, this.fut);
                    }
                }
            }
        }

        class ConnectClosureNew
        implements IgniteInClosure<Boolean> {
            private static final long serialVersionUID = 0L;
            private final GridNioSession ses;
            private final GridNioRecoveryDescriptor recoveryDesc;
            private final ClusterNode rmtNode;
            private boolean failed;

            ConnectClosureNew(GridNioSession ses, GridNioRecoveryDescriptor recoveryDesc, ClusterNode rmtNode) {
                this.ses = ses;
                this.recoveryDesc = recoveryDesc;
                this.rmtNode = rmtNode;
            }

            @Override
            public void apply(Boolean success) {
                try {
                    boolean bl = this.failed = success == false;
                    if (success.booleanValue()) {
                        IgniteInClosure lsnr = new IgniteInClosure<IgniteInternalFuture<?>>(){

                            @Override
                            public void apply(IgniteInternalFuture<?> msgFut) {
                                try {
                                    msgFut.get();
                                    this.connectedNew(ConnectClosureNew.this.recoveryDesc, ConnectClosureNew.this.ses, false);
                                }
                                catch (IgniteCheckedException e) {
                                    if (TcpCommunicationSpi.this.log.isDebugEnabled()) {
                                        TcpCommunicationSpi.this.log.debug("Failed to send recovery handshake [rmtNode=" + ConnectClosureNew.this.rmtNode.id() + ", err=" + e + ']');
                                    }
                                    ConnectClosureNew.this.recoveryDesc.release();
                                }
                            }
                        };
                        TcpCommunicationSpi.this.nioSrvr.sendSystem(this.ses, new RecoveryLastReceivedMessage(this.recoveryDesc.received()), lsnr);
                    } else {
                        TcpCommunicationSpi.this.nioSrvr.sendSystem(this.ses, new RecoveryLastReceivedMessage(-1L));
                    }
                }
                catch (IgniteCheckedException e) {
                    U.error(TcpCommunicationSpi.this.log, "Failed to send message: " + e, e);
                }
            }
        }
    };
    @LoggerResource
    private IgniteLogger log;
    @LoggerResource(categoryName="org.apache.ignite.internal.diagnostic")
    private IgniteLogger diagnosticLog;
    private String locAddr;
    private volatile InetAddress locHost;
    private int locPort = 47100;
    private int locPortRange = 100;
    private int shmemPort = -1;
    private boolean directBuf = true;
    private boolean directSndBuf;
    private long idleConnTimeout = 600000L;
    private long connTimeout = 5000L;
    private long maxConnTimeout = 600000L;
    private int reconCnt = 10;
    private int sockSndBuf = 32768;
    private int sockRcvBuf = 32768;
    private int msgQueueLimit = 0;
    private int slowClientQueueLimit;
    private GridNioServer<Message> nioSrvr;
    private IpcSharedMemoryServerEndpoint shmemSrv;
    private boolean usePairedConnections;
    private int connectionsPerNode = 1;
    private boolean tcpNoDelay = true;
    private boolean filterReachableAddresses = false;
    private int ackSndThreshold = 32;
    private int unackedMsgsBufSize;
    private long sockWriteTimeout = 2000L;
    private CommunicationWorker commWorker;
    private ShmemAcceptWorker shmemAcceptWorker;
    private final Collection<ShmemWorker> shmemWorkers = new ConcurrentLinkedDeque<ShmemWorker>();
    private final ConcurrentMap<UUID, GridCommunicationClient[]> clients = GridConcurrentFactory.newMap();
    private volatile CommunicationListener<Message> lsnr;
    private int boundTcpPort = -1;
    private int boundTcpShmemPort = -1;
    private int selectorsCnt = DFLT_SELECTORS_CNT;
    private long selectorSpins = IgniteSystemProperties.getLong("IGNITE_SELECTOR_SPINS", 0L);
    private boolean forceClientToSrvConnections;
    private AddressResolver addrRslvr;
    private final CountDownLatch ctxInitLatch = new CountDownLatch(1);
    private volatile boolean stopping;
    private TcpCommunicationMetricsListener metricsLsnr;
    private final ConcurrentMap<ConnectionKey, GridFutureAdapter<GridCommunicationClient>> clientFuts = GridConcurrentFactory.newMap();
    private final ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> recoveryDescs = GridConcurrentFactory.newMap();
    private final ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> outRecDescs = GridConcurrentFactory.newMap();
    private final ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> inRecDescs = GridConcurrentFactory.newMap();
    private final GridLocalEventListener discoLsnr = new DiscoveryListener();
    private ConnectionRequestor connectionRequestor;
    protected Tracing tracing;

    private boolean isSslEnabled() {
        return this.ignite.configuration().getSslContextFactory() != null;
    }

    @IgniteSpiConfiguration(optional=true)
    public TcpCommunicationSpi setAddressResolver(AddressResolver addrRslvr) {
        if (this.addrRslvr == null) {
            this.addrRslvr = addrRslvr;
        }
        return this;
    }

    public AddressResolver getAddressResolver() {
        return this.addrRslvr;
    }

    @Override
    @IgniteInstanceResource
    protected void injectResources(Ignite ignite) {
        super.injectResources(ignite);
        if (ignite != null) {
            this.setAddressResolver(ignite.configuration().getAddressResolver());
            this.setLocalAddress(ignite.configuration().getLocalHost());
            this.tracing = ignite instanceof IgniteEx ? ((IgniteEx)ignite).context().tracing() : new NoopTracing();
        }
    }

    @IgniteSpiConfiguration(optional=true)
    public TcpCommunicationSpi setLocalAddress(String locAddr) {
        if (this.locAddr == null) {
            this.locAddr = locAddr;
        }
        return this;
    }

    public String getLocalAddress() {
        return this.locAddr;
    }

    @IgniteSpiConfiguration(optional=true)
    public TcpCommunicationSpi setLocalPort(int locPort) {
        this.locPort = locPort;
        return this;
    }

    public int getLocalPort() {
        return this.locPort;
    }

    @IgniteSpiConfiguration(optional=true)
    public TcpCommunicationSpi setLocalPortRange(int locPortRange) {
        this.locPortRange = locPortRange;
        return this;
    }

    public int getLocalPortRange() {
        return this.locPortRange;
    }

    public boolean isUsePairedConnections() {
        return this.usePairedConnections;
    }

    @IgniteSpiConfiguration(optional=true)
    public TcpCommunicationSpi setUsePairedConnections(boolean usePairedConnections) {
        this.usePairedConnections = usePairedConnections;
        return this;
    }

    @IgniteSpiConfiguration(optional=true)
    public TcpCommunicationSpi setConnectionsPerNode(int maxConnectionsPerNode) {
        this.connectionsPerNode = maxConnectionsPerNode;
        return this;
    }

    public int getConnectionsPerNode() {
        return this.connectionsPerNode;
    }

    @IgniteSpiConfiguration(optional=true)
    public TcpCommunicationSpi setSharedMemoryPort(int shmemPort) {
        this.shmemPort = shmemPort;
        return this;
    }

    public int getSharedMemoryPort() {
        return this.shmemPort;
    }

    @IgniteSpiConfiguration(optional=true)
    public TcpCommunicationSpi setIdleConnectionTimeout(long idleConnTimeout) {
        this.idleConnTimeout = idleConnTimeout;
        return this;
    }

    public long getIdleConnectionTimeout() {
        return this.idleConnTimeout;
    }

    public long getSocketWriteTimeout() {
        return this.sockWriteTimeout;
    }

    @IgniteSpiConfiguration(optional=true)
    public TcpCommunicationSpi setSocketWriteTimeout(long sockWriteTimeout) {
        this.sockWriteTimeout = sockWriteTimeout;
        return this;
    }

    public int getAckSendThreshold() {
        return this.ackSndThreshold;
    }

    @IgniteSpiConfiguration(optional=true)
    public TcpCommunicationSpi setAckSendThreshold(int ackSndThreshold) {
        this.ackSndThreshold = ackSndThreshold;
        return this;
    }

    public int getUnacknowledgedMessagesBufferSize() {
        return this.unackedMsgsBufSize;
    }

    @IgniteSpiConfiguration(optional=true)
    public TcpCommunicationSpi setUnacknowledgedMessagesBufferSize(int unackedMsgsBufSize) {
        this.unackedMsgsBufSize = unackedMsgsBufSize;
        return this;
    }

    @IgniteSpiConfiguration(optional=true)
    public TcpCommunicationSpi setConnectTimeout(long connTimeout) {
        this.connTimeout = connTimeout;
        this.failureDetectionTimeoutEnabled(false);
        return this;
    }

    public long getConnectTimeout() {
        return this.connTimeout;
    }

    @IgniteSpiConfiguration(optional=true)
    public TcpCommunicationSpi setMaxConnectTimeout(long maxConnTimeout) {
        this.maxConnTimeout = maxConnTimeout;
        this.failureDetectionTimeoutEnabled(false);
        return this;
    }

    public long getMaxConnectTimeout() {
        return this.maxConnTimeout;
    }

    @IgniteSpiConfiguration(optional=true)
    public TcpCommunicationSpi setReconnectCount(int reconCnt) {
        this.reconCnt = reconCnt;
        this.failureDetectionTimeoutEnabled(false);
        return this;
    }

    public int getReconnectCount() {
        return this.reconCnt;
    }

    @IgniteSpiConfiguration(optional=true)
    public TcpCommunicationSpi setDirectBuffer(boolean directBuf) {
        this.directBuf = directBuf;
        return this;
    }

    public boolean isDirectBuffer() {
        return this.directBuf;
    }

    public boolean isDirectSendBuffer() {
        return this.directSndBuf;
    }

    @IgniteSpiConfiguration(optional=true)
    public TcpCommunicationSpi setDirectSendBuffer(boolean directSndBuf) {
        this.directSndBuf = directSndBuf;
        return this;
    }

    @IgniteSpiConfiguration(optional=true)
    public TcpCommunicationSpi setSelectorsCount(int selectorsCnt) {
        this.selectorsCnt = selectorsCnt;
        return this;
    }

    public int getSelectorsCount() {
        return this.selectorsCnt;
    }

    public long getSelectorSpins() {
        return this.selectorSpins;
    }

    @IgniteSpiConfiguration(optional=true)
    public TcpCommunicationSpi setSelectorSpins(long selectorSpins) {
        this.selectorSpins = selectorSpins;
        return this;
    }

    @IgniteSpiConfiguration(optional=true)
    public TcpCommunicationSpi setTcpNoDelay(boolean tcpNoDelay) {
        this.tcpNoDelay = tcpNoDelay;
        return this;
    }

    public boolean isTcpNoDelay() {
        return this.tcpNoDelay;
    }

    public boolean isFilterReachableAddresses() {
        return this.filterReachableAddresses;
    }

    @IgniteSpiConfiguration(optional=true)
    public TcpCommunicationSpi setFilterReachableAddresses(boolean filterReachableAddresses) {
        this.filterReachableAddresses = filterReachableAddresses;
        return this;
    }

    @IgniteSpiConfiguration(optional=true)
    public TcpCommunicationSpi setSocketReceiveBuffer(int sockRcvBuf) {
        this.sockRcvBuf = sockRcvBuf;
        return this;
    }

    public int getSocketReceiveBuffer() {
        return this.sockRcvBuf;
    }

    @IgniteSpiConfiguration(optional=true)
    public TcpCommunicationSpi setSocketSendBuffer(int sockSndBuf) {
        this.sockSndBuf = sockSndBuf;
        return this;
    }

    public int getSocketSendBuffer() {
        return this.sockSndBuf;
    }

    @IgniteSpiConfiguration(optional=true)
    public TcpCommunicationSpi setMessageQueueLimit(int msgQueueLimit) {
        this.msgQueueLimit = msgQueueLimit;
        return this;
    }

    public int getMessageQueueLimit() {
        return this.msgQueueLimit;
    }

    public int getSlowClientQueueLimit() {
        return this.slowClientQueueLimit;
    }

    @IgniteSpiConfiguration(optional=true)
    public TcpCommunicationSpi setSlowClientQueueLimit(int slowClientQueueLimit) {
        this.slowClientQueueLimit = slowClientQueueLimit;
        return this;
    }

    @IgniteExperimental
    public boolean forceClientToServerConnections() {
        return this.forceClientToSrvConnections;
    }

    @IgniteSpiConfiguration(optional=true)
    @IgniteExperimental
    public TcpCommunicationSpi setForceClientToServerConnections(boolean forceClientToSrvConnections) {
        this.forceClientToSrvConnections = forceClientToSrvConnections;
        return this;
    }

    @Override
    public void setListener(CommunicationListener<Message> lsnr) {
        this.lsnr = lsnr;
    }

    public CommunicationListener getListener() {
        return this.lsnr;
    }

    @IgniteExperimental
    public void setConnectionRequestor(ConnectionRequestor connectionRequestor) {
        this.connectionRequestor = connectionRequestor;
    }

    @Override
    public int getSentMessagesCount() {
        if (this.metricsLsnr == null) {
            return 0;
        }
        return this.metricsLsnr.sentMessagesCount();
    }

    @Override
    public long getSentBytesCount() {
        if (this.metricsLsnr == null) {
            return 0L;
        }
        return this.metricsLsnr.sentBytesCount();
    }

    @Override
    public int getReceivedMessagesCount() {
        if (this.metricsLsnr == null) {
            return 0;
        }
        return this.metricsLsnr.receivedMessagesCount();
    }

    @Override
    public long getReceivedBytesCount() {
        if (this.metricsLsnr == null) {
            return 0L;
        }
        return this.metricsLsnr.receivedBytesCount();
    }

    public Map<String, Long> getReceivedMessagesByType() {
        return this.metricsLsnr.receivedMessagesByType();
    }

    public Map<UUID, Long> getReceivedMessagesByNode() {
        return this.metricsLsnr.receivedMessagesByNode();
    }

    public Map<String, Long> getSentMessagesByType() {
        return this.metricsLsnr.sentMessagesByType();
    }

    public Map<UUID, Long> getSentMessagesByNode() {
        return this.metricsLsnr.sentMessagesByNode();
    }

    @Override
    public int getOutboundMessagesQueueSize() {
        GridNioServer<Message> srv = this.nioSrvr;
        return srv != null ? srv.outboundMessagesQueueSize() : 0;
    }

    @Override
    public void resetMetrics() {
        this.metricsLsnr.resetMetrics();
    }

    public IgniteInternalFuture<String> dumpNodeStatistics(final UUID nodeId) {
        StringBuilder sb = new StringBuilder("Communication SPI statistics [rmtNode=").append(nodeId).append(']').append(U.nl());
        this.dumpInfo(sb, nodeId);
        GridNioServer<Message> nioSrvr = this.nioSrvr;
        if (nioSrvr != null) {
            sb.append("NIO sessions statistics:");
            IgnitePredicate<GridNioSession> p = new IgnitePredicate<GridNioSession>(){

                @Override
                public boolean apply(GridNioSession ses) {
                    ConnectionKey connId = (ConnectionKey)ses.meta(CONN_IDX_META);
                    return connId != null && nodeId.equals(connId.nodeId());
                }
            };
            return nioSrvr.dumpStats(sb.toString(), p);
        }
        sb.append(U.nl()).append("GridNioServer is null.");
        return new GridFinishedFuture<String>(sb.toString());
    }

    public void dumpStats() {
        final IgniteLogger log = this.diagnosticLog;
        if (log != null) {
            StringBuilder sb = new StringBuilder();
            this.dumpInfo(sb, null);
            U.warn(log, sb.toString());
            GridNioServer<Message> nioSrvr = this.nioSrvr;
            if (nioSrvr != null) {
                nioSrvr.dumpStats().listen((IgniteInClosure<IgniteInternalFuture<String>>)new CI1<IgniteInternalFuture<String>>(){

                    @Override
                    public void apply(IgniteInternalFuture<String> fut) {
                        try {
                            U.warn(log, fut.get());
                        }
                        catch (Exception e) {
                            U.error(log, "Failed to dump NIO server statistics: " + e, e);
                        }
                    }
                });
            }
        }
    }

    private void dumpInfo(StringBuilder sb, UUID dstNodeId) {
        GridNioRecoveryDescriptor desc;
        sb.append("Communication SPI recovery descriptors: ").append(U.nl());
        for (Map.Entry entry : this.recoveryDescs.entrySet()) {
            desc = (GridNioRecoveryDescriptor)entry.getValue();
            if (dstNodeId != null && !dstNodeId.equals(((ConnectionKey)entry.getKey()).nodeId())) continue;
            sb.append("    [key=").append(entry.getKey()).append(", msgsSent=").append(desc.sent()).append(", msgsAckedByRmt=").append(desc.acked()).append(", msgsRcvd=").append(desc.received()).append(", lastAcked=").append(desc.lastAcknowledged()).append(", reserveCnt=").append(desc.reserveCount()).append(", descIdHash=").append(System.identityHashCode(desc)).append(']').append(U.nl());
        }
        for (Map.Entry entry : this.outRecDescs.entrySet()) {
            desc = (GridNioRecoveryDescriptor)entry.getValue();
            if (dstNodeId != null && !dstNodeId.equals(((ConnectionKey)entry.getKey()).nodeId())) continue;
            sb.append("    [key=").append(entry.getKey()).append(", msgsSent=").append(desc.sent()).append(", msgsAckedByRmt=").append(desc.acked()).append(", reserveCnt=").append(desc.reserveCount()).append(", connected=").append(desc.connected()).append(", reserved=").append(desc.reserved()).append(", descIdHash=").append(System.identityHashCode(desc)).append(']').append(U.nl());
        }
        for (Map.Entry entry : this.inRecDescs.entrySet()) {
            desc = (GridNioRecoveryDescriptor)entry.getValue();
            if (dstNodeId != null && !dstNodeId.equals(((ConnectionKey)entry.getKey()).nodeId())) continue;
            sb.append("    [key=").append(entry.getKey()).append(", msgsRcvd=").append(desc.received()).append(", lastAcked=").append(desc.lastAcknowledged()).append(", reserveCnt=").append(desc.reserveCount()).append(", connected=").append(desc.connected()).append(", reserved=").append(desc.reserved()).append(", handshakeIdx=").append(desc.handshakeIndex()).append(", descIdHash=").append(System.identityHashCode(desc)).append(']').append(U.nl());
        }
        sb.append("Communication SPI clients: ").append(U.nl());
        for (Map.Entry entry : this.clients.entrySet()) {
            GridCommunicationClient[] clients0;
            UUID clientNodeId = (UUID)entry.getKey();
            if (dstNodeId != null && !dstNodeId.equals(clientNodeId)) continue;
            for (GridCommunicationClient client : clients0 = (GridCommunicationClient[])entry.getValue()) {
                if (client == null) continue;
                sb.append("    [node=").append(clientNodeId).append(", client=").append(client).append(']').append(U.nl());
            }
        }
    }

    @Override
    public Map<String, Object> getNodeAttributes() throws IgniteSpiException {
        boolean forceClientToSrvConnections;
        this.initFailureDetectionTimeout();
        if (Boolean.TRUE.equals(this.ignite.configuration().isClientMode())) {
            this.assertParameter(this.locPort > 1023 || this.locPort == -1, "locPort > 1023 || locPort == -1");
        } else {
            this.assertParameter(this.locPort > 1023, "locPort > 1023");
        }
        this.assertParameter(this.locPort <= 65535, "locPort < 0xffff");
        this.assertParameter(this.locPortRange >= 0, "locPortRange >= 0");
        this.assertParameter(this.idleConnTimeout > 0L, "idleConnTimeout > 0");
        this.assertParameter(this.sockRcvBuf >= 0, "sockRcvBuf >= 0");
        this.assertParameter(this.sockSndBuf >= 0, "sockSndBuf >= 0");
        this.assertParameter(this.msgQueueLimit >= 0, "msgQueueLimit >= 0");
        this.assertParameter(this.shmemPort > 0 || this.shmemPort == -1, "shmemPort > 0 || shmemPort == -1");
        this.assertParameter(this.selectorsCnt > 0, "selectorsCnt > 0");
        this.assertParameter(this.connectionsPerNode > 0, "connectionsPerNode > 0");
        this.assertParameter(this.connectionsPerNode <= 1024, "connectionsPerNode <= 1024");
        if (!this.failureDetectionTimeoutEnabled()) {
            this.assertParameter(this.reconCnt > 0, "reconnectCnt > 0");
            this.assertParameter(this.connTimeout >= 0L, "connTimeout >= 0");
            this.assertParameter(this.maxConnTimeout >= this.connTimeout, "maxConnTimeout >= connTimeout");
        }
        this.assertParameter(this.sockWriteTimeout >= 0L, "sockWriteTimeout >= 0");
        this.assertParameter(this.ackSndThreshold > 0, "ackSndThreshold > 0");
        this.assertParameter(this.unackedMsgsBufSize >= 0, "unackedMsgsBufSize >= 0");
        if (this.unackedMsgsBufSize > 0) {
            this.assertParameter(this.unackedMsgsBufSize >= this.msgQueueLimit * 5, "Specified 'unackedMsgsBufSize' is too low, it should be at least 'msgQueueLimit * 5'.");
            this.assertParameter(this.unackedMsgsBufSize >= this.ackSndThreshold * 5, "Specified 'unackedMsgsBufSize' is too low, it should be at least 'ackSndThreshold * 5'.");
        }
        this.connPlc = this.connectionsPerNode > 1 ? new RoundRobinConnectionPolicy() : new FirstConnectionPolicy();
        this.chConnPlc = new ConnectionPolicy(){
            private final AtomicInteger chIdx = new AtomicInteger(1025);

            @Override
            public int connectionIndex() {
                return this.chIdx.incrementAndGet();
            }
        };
        try {
            this.locHost = U.resolveLocalHost(this.locAddr);
        }
        catch (IOException e) {
            throw new IgniteSpiException("Failed to initialize local address: " + this.locAddr, e);
        }
        try {
            this.shmemSrv = this.resetShmemServer();
        }
        catch (IgniteCheckedException e) {
            U.warn(this.log, "Failed to start shared memory communication server.", e);
        }
        try {
            this.nioSrvr = this.resetNioServer();
        }
        catch (IgniteCheckedException e) {
            throw new IgniteSpiException("Failed to initialize TCP server: " + this.locHost, e);
        }
        boolean bl = forceClientToSrvConnections = this.forceClientToServerConnections() || this.locPort == -1;
        if (this.usePairedConnections && forceClientToSrvConnections) {
            throw new IgniteSpiException("Node using paired connections is not allowed to start in forced client to server connections mode.");
        }
        try {
            IgniteBiTuple<Collection<String>, Collection<String>> addrs = U.resolveLocalAddresses(this.locHost);
            if (this.locPort != -1 && addrs.get1().isEmpty() && addrs.get2().isEmpty()) {
                throw new IgniteCheckedException("No network addresses found (is networking enabled?).");
            }
            Collection<InetSocketAddress> extAddrs = this.addrRslvr == null ? null : U.resolveAddresses(this.addrRslvr, F.flat(Arrays.asList(addrs.get1(), addrs.get2())), this.boundTcpPort);
            HashMap<String, Object> res = new HashMap<String, Object>(5);
            boolean setEmptyHostNamesAttr = !IgniteSystemProperties.getBoolean("IGNITE_TCP_COMM_SET_ATTR_HOST_NAMES", false) && !F.isEmpty(this.locAddr) && this.locHost.getHostAddress().equals(this.locAddr) && !this.locHost.isAnyLocalAddress() && !this.locHost.isLoopbackAddress();
            res.put(this.createSpiAttributeName(ATTR_ADDRS), addrs.get1());
            res.put(this.createSpiAttributeName(ATTR_HOST_NAMES), setEmptyHostNamesAttr ? Collections.emptyList() : addrs.get2());
            res.put(this.createSpiAttributeName(ATTR_PORT), this.boundTcpPort == -1 ? DISABLED_CLIENT_PORT : this.boundTcpPort);
            res.put(this.createSpiAttributeName(ATTR_SHMEM_PORT), this.boundTcpShmemPort >= 0 ? Integer.valueOf(this.boundTcpShmemPort) : null);
            res.put(this.createSpiAttributeName(ATTR_EXT_ADDRS), extAddrs);
            res.put(this.createSpiAttributeName(ATTR_PAIRED_CONN), this.usePairedConnections);
            res.put(this.createSpiAttributeName(ATTR_FORCE_CLIENT_SERVER_CONNECTIONS), forceClientToSrvConnections);
            return res;
        }
        catch (IOException | IgniteCheckedException e) {
            throw new IgniteSpiException("Failed to resolve local host to addresses: " + this.locHost, e);
        }
    }

    public int boundPort() {
        return this.boundTcpPort;
    }

    @Override
    public void spiStart(String igniteInstanceName) throws IgniteSpiException {
        assert (this.locHost != null);
        this.startStopwatch();
        if (this.log.isDebugEnabled()) {
            this.log.debug(this.configInfo("locAddr", this.locAddr));
            this.log.debug(this.configInfo("locPort", this.locPort));
            this.log.debug(this.configInfo("locPortRange", this.locPortRange));
            this.log.debug(this.configInfo("idleConnTimeout", this.idleConnTimeout));
            this.log.debug(this.configInfo("directBuf", this.directBuf));
            this.log.debug(this.configInfo("directSendBuf", this.directSndBuf));
            this.log.debug(this.configInfo("selectorsCnt", this.selectorsCnt));
            this.log.debug(this.configInfo("tcpNoDelay", this.tcpNoDelay));
            this.log.debug(this.configInfo("sockSndBuf", this.sockSndBuf));
            this.log.debug(this.configInfo("sockRcvBuf", this.sockRcvBuf));
            this.log.debug(this.configInfo("shmemPort", this.shmemPort));
            this.log.debug(this.configInfo("msgQueueLimit", this.msgQueueLimit));
            this.log.debug(this.configInfo("connectionsPerNode", this.connectionsPerNode));
            if (this.failureDetectionTimeoutEnabled()) {
                this.log.debug(this.configInfo("connTimeout", this.connTimeout));
                this.log.debug(this.configInfo("maxConnTimeout", this.maxConnTimeout));
                this.log.debug(this.configInfo("reconCnt", this.reconCnt));
            } else {
                this.log.debug(this.configInfo("failureDetectionTimeout", this.failureDetectionTimeout()));
            }
            this.log.debug(this.configInfo("sockWriteTimeout", this.sockWriteTimeout));
            this.log.debug(this.configInfo("ackSndThreshold", this.ackSndThreshold));
            this.log.debug(this.configInfo("unackedMsgsBufSize", this.unackedMsgsBufSize));
        }
        if (!this.tcpNoDelay) {
            U.quietAndWarn(this.log, "'TCP_NO_DELAY' for communication is off, which should be used with caution since may produce significant delays with some scenarios.");
        }
        if (this.slowClientQueueLimit > 0 && this.msgQueueLimit > 0 && this.slowClientQueueLimit >= this.msgQueueLimit) {
            U.quietAndWarn(this.log, "Slow client queue limit is set to a value greater than or equal to message queue limit (slow client queue limit will have no effect) [msgQueueLimit=" + this.msgQueueLimit + ", slowClientQueueLimit=" + this.slowClientQueueLimit + ']');
        }
        if (this.msgQueueLimit == 0) {
            U.quietAndWarn(this.log, "Message queue limit is set to 0 which may lead to potential OOMEs when running cache operations in FULL_ASYNC or PRIMARY_SYNC modes due to message queues growth on sender and receiver sides.");
        }
        this.registerMBean(igniteInstanceName, new TcpCommunicationSpiMBeanImpl(this), TcpCommunicationSpiMBean.class);
        this.connectGate = new ConnectGateway();
        if (this.shmemSrv != null) {
            this.shmemAcceptWorker = new ShmemAcceptWorker(this.shmemSrv);
            new IgniteThread(this.shmemAcceptWorker).start();
        }
        this.nioSrvr.start();
        this.commWorker = new CommunicationWorker(igniteInstanceName, this.log);
        new IgniteSpiThread(igniteInstanceName, this.commWorker.name(), this.log){

            @Override
            protected void body() {
                TcpCommunicationSpi.this.commWorker.run();
            }
        }.start();
        if (this.log.isDebugEnabled()) {
            this.log.debug(this.startInfo());
        }
    }

    @Override
    public void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException {
        if (this.boundTcpPort > 0) {
            spiCtx.registerPort(this.boundTcpPort, IgnitePortProtocol.TCP);
        }
        if (this.boundTcpShmemPort > 0) {
            spiCtx.registerPort(this.boundTcpShmemPort, IgnitePortProtocol.TCP);
        }
        spiCtx.addLocalEventListener(this.discoLsnr, 11, 12);
        this.ctxInitLatch.countDown();
        this.metricsLsnr = new TcpCommunicationMetricsListener(this.ignite, spiCtx);
    }

    @Override
    public IgniteSpiContext getSpiContext() {
        if (this.ctxInitLatch.getCount() > 0L) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Waiting for context initialization.");
            }
            try {
                U.await(this.ctxInitLatch);
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Context has been initialized.");
                }
            }
            catch (IgniteInterruptedCheckedException e) {
                U.warn(this.log, "Thread has been interrupted while waiting for SPI context initialization.", e);
            }
        }
        return super.getSpiContext();
    }

    private GridNioServer<Message> resetNioServer() throws IgniteCheckedException {
        if (this.boundTcpPort >= 0) {
            throw new IgniteCheckedException("Tcp NIO server was already created on port " + this.boundTcpPort);
        }
        IgniteCheckedException lastEx = null;
        int lastPort = this.locPort == -1 ? -1 : (this.locPortRange == 0 ? this.locPort : this.locPort + this.locPortRange - 1);
        for (int port = this.locPort; port <= lastPort; ++port) {
            try {
                MessageFactory msgFactory = new MessageFactory(){
                    private MessageFactory impl;

                    @Override
                    @Nullable
                    public Message create(short type) {
                        if (this.impl == null) {
                            this.impl = TcpCommunicationSpi.this.getSpiContext().messageFactory();
                        }
                        assert (this.impl != null);
                        return this.impl.create(type);
                    }
                };
                GridNioMessageReaderFactory readerFactory = new GridNioMessageReaderFactory(){
                    private IgniteSpiContext context;
                    private MessageFormatter formatter;

                    @Override
                    public MessageReader reader(GridNioSession ses, MessageFactory msgFactory) throws IgniteCheckedException {
                        IgniteSpiContext ctx = TcpCommunicationSpi.super.getSpiContext();
                        if (this.formatter == null || this.context != ctx) {
                            this.context = ctx;
                            this.formatter = this.context.messageFormatter();
                        }
                        assert (this.formatter != null);
                        ConnectionKey key = (ConnectionKey)ses.meta(CONN_IDX_META);
                        return key != null ? this.formatter.reader(key.nodeId(), msgFactory) : null;
                    }
                };
                GridNioMessageWriterFactory writerFactory = new GridNioMessageWriterFactory(){
                    private IgniteSpiContext context;
                    private MessageFormatter formatter;

                    @Override
                    public MessageWriter writer(GridNioSession ses) throws IgniteCheckedException {
                        IgniteSpiContext ctx = TcpCommunicationSpi.super.getSpiContext();
                        if (this.formatter == null || this.context != ctx) {
                            this.context = ctx;
                            this.formatter = this.context.messageFormatter();
                        }
                        assert (this.formatter != null);
                        ConnectionKey key = (ConnectionKey)ses.meta(CONN_IDX_META);
                        return key != null ? this.formatter.writer(key.nodeId()) : null;
                    }
                };
                GridDirectParser parser = new GridDirectParser(this.log.getLogger(GridDirectParser.class), msgFactory, readerFactory);
                IgnitePredicate<Message> skipRecoveryPred = new IgnitePredicate<Message>(){

                    @Override
                    public boolean apply(Message msg) {
                        return msg instanceof RecoveryLastReceivedMessage;
                    }
                };
                boolean clientMode = Boolean.TRUE.equals(this.ignite.configuration().isClientMode());
                IgniteBiInClosure<GridNioSession, Integer> queueSizeMonitor = !clientMode && this.slowClientQueueLimit > 0 ? this::checkClientQueueSize : null;
                ArrayList<GridNioFilterAdapter> filters = new ArrayList<GridNioFilterAdapter>();
                if (this.tracing instanceof GridTracingManager && ((GridManager)((Object)this.tracing)).enabled()) {
                    filters.add(new GridNioTracerFilter(this.log, this.tracing));
                }
                filters.add(new GridNioCodecFilter(parser, this.log, true));
                filters.add(new GridConnectionBytesVerifyFilter(this.log));
                if (this.isSslEnabled()) {
                    GridNioSslFilter sslFilter = new GridNioSslFilter(this.ignite.configuration().getSslContextFactory().create(), true, ByteOrder.LITTLE_ENDIAN, this.log);
                    sslFilter.directMode(true);
                    sslFilter.wantClientAuth(true);
                    sslFilter.needClientAuth(true);
                    filters.add(sslFilter);
                }
                GridNioServer.Builder<Message> builder = GridNioServer.builder().address(this.locHost).port(port).listener(this.srvLsnr).logger(this.log).selectorCount(this.selectorsCnt).igniteInstanceName(this.igniteInstanceName).serverName("tcp-comm").tcpNoDelay(this.tcpNoDelay).directBuffer(this.directBuf).byteOrder(ByteOrder.LITTLE_ENDIAN).socketSendBufferSize(this.sockSndBuf).socketReceiveBufferSize(this.sockRcvBuf).sendQueueLimit(this.msgQueueLimit).directMode(true).writeTimeout(this.sockWriteTimeout).selectorSpins(this.selectorSpins).filters(filters.toArray(new GridNioFilter[filters.size()])).writerFactory(writerFactory).skipRecoveryPredicate(skipRecoveryPred).messageQueueSizeListener(queueSizeMonitor).tracing(this.tracing).readWriteSelectorsAssign(this.usePairedConnections);
                if (this.ignite instanceof IgniteEx) {
                    IgniteEx igniteEx = (IgniteEx)this.ignite;
                    builder.workerListener(igniteEx.context().workersRegistry()).metricRegistry(igniteEx.context().metric().registry(COMMUNICATION_METRICS_GROUP_NAME));
                }
                GridNioServer<Message> srvr = builder.build();
                this.boundTcpPort = port;
                if (this.log.isInfoEnabled()) {
                    this.log.info("Successfully bound communication NIO server to TCP port [port=" + this.boundTcpPort + ", locHost=" + this.locHost + ", selectorsCnt=" + this.selectorsCnt + ", selectorSpins=" + srvr.selectorSpins() + ", pairedConn=" + this.usePairedConnections + ']');
                }
                srvr.idleTimeout(this.idleConnTimeout);
                return srvr;
            }
            catch (IgniteCheckedException e) {
                if (X.hasCause((Throwable)e, SSLException.class)) {
                    throw new IgniteSpiException("Failed to create SSL context. SSL factory: " + this.ignite.configuration().getSslContextFactory() + '.', e);
                }
                lastEx = e;
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Failed to bind to local port (will try next port within range) [port=" + port + ", locHost=" + this.locHost + ']');
                }
                this.onException("Failed to bind to local port (will try next port within range) [port=" + port + ", locHost=" + this.locHost + ']', e);
                continue;
            }
        }
        throw new IgniteCheckedException("Failed to bind to any port within range [startPort=" + this.locPort + ", portRange=" + this.locPortRange + ", locHost=" + this.locHost + ']', lastEx);
    }

    @Nullable
    private IpcSharedMemoryServerEndpoint resetShmemServer() throws IgniteCheckedException {
        if (this.boundTcpShmemPort >= 0) {
            throw new IgniteCheckedException("Shared memory server was already created on port " + this.boundTcpShmemPort);
        }
        if (this.shmemPort == -1 || U.isWindows()) {
            return null;
        }
        IgniteCheckedException lastEx = null;
        for (int port = this.shmemPort; port < this.shmemPort + this.locPortRange; ++port) {
            try {
                IgniteConfiguration cfg = this.ignite.configuration();
                IpcSharedMemoryServerEndpoint srv = new IpcSharedMemoryServerEndpoint(this.log, cfg.getNodeId(), this.igniteInstanceName, cfg.getWorkDirectory());
                srv.setPort(port);
                srv.omitOutOfResourcesWarning(true);
                srv.start();
                this.boundTcpShmemPort = port;
                if (this.log.isInfoEnabled()) {
                    this.log.info("Successfully bound shared memory communication to TCP port [port=" + this.boundTcpShmemPort + ", locHost=" + this.locHost + ']');
                }
                return srv;
            }
            catch (IgniteCheckedException e) {
                lastEx = e;
                if (!this.log.isDebugEnabled()) continue;
                this.log.debug("Failed to bind to local port (will try next port within range) [port=" + port + ", locHost=" + this.locHost + ']');
                continue;
            }
        }
        throw new IgniteCheckedException("Failed to bind shared memory communication to any port within range [startPort=" + this.locPort + ", portRange=" + this.locPortRange + ", locHost=" + this.locHost + ']', lastEx);
    }

    @Override
    public void spiStop() throws IgniteSpiException {
        assert (this.stopping);
        this.unregisterMBean();
        if (this.nioSrvr != null) {
            this.nioSrvr.stop();
        }
        U.cancel(this.shmemAcceptWorker);
        U.join(this.shmemAcceptWorker, this.log);
        U.cancel(this.shmemWorkers);
        U.join(this.shmemWorkers, this.log);
        this.shmemWorkers.clear();
        Iterator iterator = this.clients.values().iterator();
        while (iterator.hasNext()) {
            GridCommunicationClient[] clients0;
            for (GridCommunicationClient client : clients0 = (GridCommunicationClient[])iterator.next()) {
                if (client == null) continue;
                client.forceClose();
            }
        }
        for (GridFutureAdapter fut : this.clientFuts.values()) {
            if (!(fut instanceof ConnectionRequestFuture)) continue;
            fut.onDone(new IgniteSpiException("SPI is being stopped."));
        }
        if (this.commWorker != null) {
            U.cancel(this.commWorker);
            U.join(this.commWorker, this.log);
        }
        this.nioSrvr = null;
        this.commWorker = null;
        this.boundTcpPort = -1;
        if (this.log.isDebugEnabled()) {
            this.log.debug(this.stopInfo());
        }
    }

    @Override
    protected void onContextDestroyed0() {
        this.stopping = true;
        if (this.ctxInitLatch.getCount() > 0L) {
            this.ctxInitLatch.countDown();
        }
        if (this.connectGate != null) {
            this.connectGate.stopped();
        }
        this.getSpiContext().deregisterPorts();
        this.getSpiContext().removeLocalEventListener(this.discoLsnr);
    }

    @Override
    public void onClientDisconnected(IgniteFuture<?> reconnectFut) {
        this.connectGate.disconnected(reconnectFut);
        Iterator iterator = this.clients.values().iterator();
        while (iterator.hasNext()) {
            GridCommunicationClient[] clients0;
            for (GridCommunicationClient client : clients0 = (GridCommunicationClient[])iterator.next()) {
                if (client == null) continue;
                client.forceClose();
            }
        }
        IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(reconnectFut, "Failed to connect client node disconnected.");
        for (GridFutureAdapter clientFut : this.clientFuts.values()) {
            clientFut.onDone(err);
        }
        this.recoveryDescs.clear();
        this.inRecDescs.clear();
        this.outRecDescs.clear();
    }

    @Override
    public void onClientReconnected(boolean clusterRestarted) {
        this.connectGate.reconnected();
    }

    void onNodeLeft(Object consistentId, UUID nodeId) {
        assert (nodeId != null);
        this.metricsLsnr.onNodeLeft(consistentId);
        GridCommunicationClient[] clients0 = (GridCommunicationClient[])this.clients.remove(nodeId);
        if (clients0 != null) {
            for (GridCommunicationClient client : clients0) {
                if (client == null) continue;
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Forcing NIO client close since node has left [nodeId=" + nodeId + ", client=" + client + ']');
                }
                client.forceClose();
            }
        }
    }

    @Override
    protected void checkConfigurationConsistency0(IgniteSpiContext spiCtx, ClusterNode node, boolean starting) throws IgniteSpiException {
        this.checkAttributePresence(node, this.createSpiAttributeName(ATTR_ADDRS));
        this.checkAttributePresence(node, this.createSpiAttributeName(ATTR_HOST_NAMES));
        this.checkAttributePresence(node, this.createSpiAttributeName(ATTR_PORT));
    }

    private void checkAttributePresence(ClusterNode node, String attrName) {
        if (node.attribute(attrName) == null) {
            U.warn(this.log, "Remote node has inconsistent configuration (required attribute was not found) [attrName=" + attrName + ", nodeId=" + node.id() + "spiCls=" + U.getSimpleName(TcpCommunicationSpi.class) + ']');
        }
    }

    @Override
    public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
        this.sendMessage0(node, msg, null);
    }

    public IgniteFuture<BitSet> checkConnection(List<ClusterNode> nodes) {
        long timeout;
        TcpCommunicationConnectionCheckFuture fut = new TcpCommunicationConnectionCheckFuture(this, this.log.getLogger(TcpCommunicationConnectionCheckFuture.class), this.nioSrvr, nodes);
        long l = timeout = this.failureDetectionTimeoutEnabled() ? this.failureDetectionTimeout() : this.connTimeout;
        if (this.log.isInfoEnabled()) {
            this.log.info("Start check connection process [nodeCnt=" + nodes.size() + ", timeout=" + timeout + ']');
        }
        fut.init(timeout);
        return new IgniteFutureImpl<BitSet>(fut);
    }

    public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
        this.sendMessage0(node, msg, ackC);
    }

    private void sendMessage0(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
        assert (node != null);
        assert (msg != null);
        IgniteLogger log = this.log;
        if (log != null && log.isTraceEnabled()) {
            log.trace("Sending message with ack to node [node=" + node + ", msg=" + msg + ']');
        }
        if (this.isLocalNodeDisconnected()) {
            throw new IgniteSpiException("Failed to send a message to remote node because local node has been disconnected [rmtNodeId=" + node.id() + ']');
        }
        ClusterNode locNode = this.getLocalNode();
        if (locNode == null) {
            throw new IgniteSpiException("Local node has not been started or fully initialized [isStopping=" + this.getSpiContext().isStopping() + ']');
        }
        if (node.id().equals(locNode.id())) {
            this.notifyListener(node.id(), msg, NOOP);
        } else {
            int msgConnIdx;
            Message connIdxMsg;
            GridCommunicationClient client = null;
            Message message = connIdxMsg = msg instanceof GridIoMessage ? ((GridIoMessage)msg).message() : msg;
            int connIdx = connIdxMsg instanceof TcpConnectionIndexAwareMessage ? ((msgConnIdx = ((TcpConnectionIndexAwareMessage)connIdxMsg).connectionIndex()) == -1 ? this.connPlc.connectionIndex() : msgConnIdx) : this.connPlc.connectionIndex();
            try {
                boolean retry;
                do {
                    client = this.reserveClient(node, connIdx);
                    UUID nodeId = null;
                    if (!client.async()) {
                        nodeId = node.id();
                    }
                    retry = client.sendMessage(nodeId, msg, ackC);
                    client.release();
                    if (retry) {
                        this.removeNodeClient(node.id(), client);
                        ClusterNode node0 = this.getSpiContext().node(node.id());
                        if (node0 == null) {
                            throw new ClusterTopologyCheckedException("Failed to send message to remote node (node has left the grid): " + node.id());
                        }
                    }
                    client = null;
                } while (retry);
            }
            catch (Throwable t) {
                if (this.stopping) {
                    throw new IgniteSpiException("Node is stopping.", t);
                }
                log.error("Failed to send message to remote node [node=" + node + ", msg=" + msg + ']', t);
                if (t instanceof Error) {
                    throw (Error)t;
                }
                if (t instanceof RuntimeException) {
                    throw (RuntimeException)t;
                }
                throw new IgniteSpiException("Failed to send message to remote node: " + node, t);
            }
            finally {
                if (client != null && this.removeNodeClient(node.id(), client)) {
                    client.forceClose();
                }
            }
        }
    }

    private boolean isLocalNodeDisconnected() {
        boolean disconnected = false;
        if (this.ignite instanceof IgniteKernal) {
            disconnected = ((IgniteKernal)this.ignite).context().clientDisconnected();
        }
        return disconnected;
    }

    private boolean removeNodeClient(UUID nodeId, GridCommunicationClient rmvClient) {
        GridCommunicationClient[] newClients;
        GridCommunicationClient[] curClients;
        do {
            if ((curClients = (GridCommunicationClient[])this.clients.get(nodeId)) == null || rmvClient.connectionIndex() >= curClients.length || curClients[rmvClient.connectionIndex()] != rmvClient) {
                return false;
            }
            newClients = Arrays.copyOf(curClients, curClients.length);
            newClients[rmvClient.connectionIndex()] = null;
        } while (!this.clients.replace(nodeId, curClients, newClients));
        return true;
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private void addNodeClient(ClusterNode node, int connIdx, GridCommunicationClient addClient) {
        assert (this.connectionsPerNode > 0) : this.connectionsPerNode;
        assert (connIdx == addClient.connectionIndex()) : addClient;
        if (connIdx >= this.connectionsPerNode) {
            assert (!this.usePairedConnections(node));
            return;
        }
        while (true) {
            GridCommunicationClient[] newClients;
            GridCommunicationClient[] curClients = (GridCommunicationClient[])this.clients.get(node.id());
            assert (curClients == null || curClients[connIdx] == null) : "Client already created [node=" + node.id() + ", connIdx=" + connIdx + ", client=" + addClient + ", oldClient=" + curClients[connIdx] + ']';
            if (curClients == null) {
                newClients = new GridCommunicationClient[this.connectionsPerNode];
                newClients[connIdx] = addClient;
                if (this.clients.putIfAbsent(node.id(), newClients) != null) continue;
                return;
            }
            newClients = (GridCommunicationClient[])curClients.clone();
            newClients[connIdx] = addClient;
            if (this.clients.replace(node.id(), curClients, newClients)) return;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Unable to fully structure code
     */
    private GridCommunicationClient reserveClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
        if (!TcpCommunicationSpi.$assertionsDisabled && node == null) {
            throw new AssertionError();
        }
        if (!TcpCommunicationSpi.$assertionsDisabled && (connIdx < 0 || connIdx >= this.connectionsPerNode) && this.usePairedConnections(node)) {
            throw new AssertionError(connIdx);
        }
        if (this.getLocalNode().isClient() && node.isClient() && TcpCommunicationSpi.DISABLED_CLIENT_PORT.equals(node.attribute(this.createSpiAttributeName("comm.tcp.port")))) {
            throw new IgniteSpiException("Cannot send message to the client node with no server socket opened.");
        }
        nodeId = node.id();
        while (true) {
            v0 = client = (curClients = (GridCommunicationClient[])this.clients.get(nodeId)) != null && connIdx < curClients.length ? curClients[connIdx] : null;
            if (client == null) {
                if (this.stopping) {
                    throw new IgniteSpiException("Node is stopping.");
                }
                connKey = new ConnectionKey(nodeId, connIdx, -1L);
                fut = new ConnectFuture();
                oldFut = this.clientFuts.putIfAbsent(connKey, fut);
                if (oldFut == null) {
                    try {
                        curClients0 = (GridCommunicationClient[])this.clients.get(nodeId);
                        v1 = client0 = curClients0 != null && connIdx < curClients0.length ? curClients0[connIdx] : null;
                        if (client0 == null) {
                            client0 = this.createCommunicationClient(node, connIdx);
                            if (client0 != null) {
                                this.addNodeClient(node, connIdx, client0);
                                if (client0 instanceof GridTcpNioCommunicationClient && (tcpClient = (GridTcpNioCommunicationClient)client0).session().closeTime() > 0L && this.removeNodeClient(nodeId, client0)) {
                                    if (this.log.isDebugEnabled()) {
                                        this.log.debug("Session was closed after client creation, will retry [node=" + node + ", client=" + client0 + ']');
                                    }
                                    client0 = null;
                                }
                            } else {
                                U.sleep(200L);
                                if (this.getSpiContext().node(node.id()) == null) {
                                    throw new ClusterTopologyCheckedException("Failed to send message (node left topology): " + node);
                                }
                            }
                        }
                        fut.onDone(client0);
                    }
                    catch (NodeUnreachableException e) {
                        this.log.warning(e.getMessage());
                        fut = this.handleUnreachableNodeException(node, connIdx, fut, e);
                    }
                    catch (Throwable e) {
                        fut.onDone(e);
                        if (e instanceof IgniteTooManyOpenFilesException) {
                            throw e;
                        }
                        if (!(e instanceof Error)) ** GOTO lbl51
                        throw (Error)e;
                    }
                    finally {
                        this.clientFuts.remove(connKey, fut);
                    }
                } else {
                    fut = oldFut;
                }
lbl51:
                // 4 sources

                if ((client = (GridCommunicationClient)fut.get()) == null) {
                    if (!this.isLocalNodeDisconnected()) continue;
                    throw new IgniteCheckedException("Unable to create TCP client due to local node disconnecting.");
                }
                if (this.getSpiContext().node(nodeId) == null) {
                    if (this.removeNodeClient(nodeId, client)) {
                        client.forceClose();
                    }
                    throw new IgniteSpiException("Destination node is not in topology: " + node.id());
                }
            }
            if (!TcpCommunicationSpi.$assertionsDisabled && connIdx != client.connectionIndex()) {
                throw new AssertionError(client);
            }
            if (client.reserve()) {
                return client;
            }
            this.removeNodeClient(nodeId, client);
        }
    }

    private GridFutureAdapter<GridCommunicationClient> handleUnreachableNodeException(ClusterNode node, int connIdx, GridFutureAdapter<GridCommunicationClient> fut, NodeUnreachableException e) throws IgniteCheckedException {
        if (this.connectionRequestor != null) {
            ConnectFuture fut0 = (ConnectFuture)((Object)fut);
            ConnectionRequestFuture triggerFut = new ConnectionRequestFuture();
            triggerFut.listen(f -> {
                try {
                    fut0.onDone(f.get());
                }
                catch (Throwable t) {
                    fut0.onDone(t);
                }
            });
            this.clientFuts.put(new ConnectionKey(node.id(), connIdx, -1L), triggerFut);
            fut = triggerFut;
            try {
                this.connectionRequestor.request(node, connIdx);
                long failTimeout = this.failureDetectionTimeoutEnabled() ? this.failureDetectionTimeout() : this.getConnectTimeout();
                fut.get(failTimeout);
            }
            catch (IgniteCheckedException triggerException) {
                IgniteSpiException spiE = new IgniteSpiException(triggerException);
                spiE.addSuppressed(e);
                String msg = "Failed to wait for establishing inverse communication connection from node " + node;
                this.log.warning(msg, spiE);
                fut.onDone(spiE);
                throw spiE;
            }
        } else {
            fut.onDone(e);
            throw new IgniteCheckedException(e);
        }
        return fut;
    }

    @Nullable
    private GridCommunicationClient createCommunicationClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
        block13: {
            assert (node != null);
            Integer shmemPort = (Integer)node.attribute(this.createSpiAttributeName(ATTR_SHMEM_PORT));
            ClusterNode locNode = this.getSpiContext().localNode();
            if (locNode == null) {
                throw new IgniteCheckedException("Failed to create NIO client (local node is stopping)");
            }
            if (this.log.isDebugEnabled()) {
                this.log.debug("Creating NIO client to node: " + node);
            }
            if (shmemPort != null && U.sameMacs(locNode, node)) {
                try {
                    GridCommunicationClient client = this.createShmemClient(node, connIdx, shmemPort);
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Shmem client created: " + client);
                    }
                    return client;
                }
                catch (IgniteCheckedException e) {
                    if (e.hasCause(IpcOutOfSystemResourcesException.class)) {
                        LT.warn(this.log, OUT_OF_RESOURCES_TCP_MSG);
                    }
                    if (this.getSpiContext().node(node.id()) != null) {
                        LT.warn(this.log, e.getMessage());
                    }
                    if (!this.log.isDebugEnabled()) break block13;
                    this.log.debug("Failed to establish shared memory connection with local node (node has left): " + node.id());
                }
            }
        }
        long start = System.currentTimeMillis();
        GridCommunicationClient client = this.createTcpClient(node, connIdx);
        long time = System.currentTimeMillis() - start;
        if (time > 100L) {
            if (this.log.isInfoEnabled()) {
                this.log.info("TCP client created [client=" + this.clientString(client, node) + ", duration=" + time + "ms]");
            }
        } else if (this.log.isDebugEnabled()) {
            this.log.debug("TCP client created [client=" + this.clientString(client, node) + ", duration=" + time + "ms]");
        }
        return client;
    }

    private String clientString(GridCommunicationClient client, ClusterNode node) throws IgniteCheckedException {
        if (client == null) {
            assert (node != null);
            StringJoiner joiner = new StringJoiner(", ", "null, node addrs=[", "]");
            for (InetSocketAddress addr : this.nodeAddresses(node)) {
                joiner.add(addr.toString());
            }
            return joiner.toString();
        }
        return client.toString();
    }

    @Nullable
    private GridCommunicationClient createShmemClient(ClusterNode node, int connIdx, Integer port) throws IgniteCheckedException {
        GridShmemCommunicationClient client;
        int attempt = 1;
        int connectAttempts = 1;
        long connTimeout0 = this.connTimeout;
        IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(this, !node.isClient());
        while (true) {
            try {
                client = new GridShmemCommunicationClient(connIdx, this.metricsLsnr.metricRegistry(), port, timeoutHelper.nextTimeoutChunk(this.connTimeout), this.log, this.getSpiContext().messageFormatter());
            }
            catch (IgniteCheckedException e) {
                if (timeoutHelper.checkFailureTimeoutReached(e)) {
                    throw e;
                }
                if (connectAttempts < 2 && X.hasCause((Throwable)e, ConnectException.class)) {
                    ++connectAttempts;
                    continue;
                }
                throw e;
            }
            try {
                this.safeShmemHandshake(client, node.id(), timeoutHelper.nextTimeoutChunk(connTimeout0));
            }
            catch (IgniteSpiOperationTimeoutException e) {
                client.forceClose();
                if (this.failureDetectionTimeoutEnabled() && timeoutHelper.checkFailureTimeoutReached(e)) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Handshake timed out (failure threshold reached) [failureDetectionTimeout=" + this.failureDetectionTimeout() + ", err=" + e.getMessage() + ", client=" + client + ']');
                    }
                    throw e;
                }
                assert (!this.failureDetectionTimeoutEnabled());
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 + ", err=" + e.getMessage() + ", client=" + client + ']');
                }
                if (attempt == this.reconCnt || connTimeout0 > this.maxConnTimeout) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Handshake timedout (will stop attempts to perform the handshake) [timeout=" + connTimeout0 + ", maxConnTimeout=" + this.maxConnTimeout + ", attempt=" + attempt + ", reconCnt=" + this.reconCnt + ", err=" + e.getMessage() + ", client=" + client + ']');
                    }
                    throw e;
                }
                ++attempt;
                connTimeout0 *= 2L;
                continue;
            }
            catch (Error | RuntimeException | IgniteCheckedException e) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Caught exception (will close client) [err=" + e.getMessage() + ", client=" + client + ']');
                }
                client.forceClose();
                throw e;
            }
            break;
        }
        return client;
    }

    private void checkClientQueueSize(GridNioSession ses, int msgQueueSize) {
        ClusterNode node;
        ConnectionKey id;
        if (this.slowClientQueueLimit > 0 && msgQueueSize > this.slowClientQueueLimit && (id = (ConnectionKey)ses.meta(CONN_IDX_META)) != null && (node = this.getSpiContext().node(id.nodeId())) != null && node.isClient()) {
            String msg = "Client node outbound message queue size exceeded slowClientQueueLimit, the client will be dropped (consider changing 'slowClientQueueLimit' configuration property) [srvNode=" + this.getSpiContext().localNode().id() + ", clientNode=" + node + ", slowClientQueueLimit=" + this.slowClientQueueLimit + ']';
            U.quietAndWarn(this.log, msg);
            this.getSpiContext().failNode(id.nodeId(), msg);
        }
    }

    private Collection<InetSocketAddress> nodeAddresses(ClusterNode node) throws IgniteCheckedException {
        return this.nodeAddresses(node, this.filterReachableAddresses);
    }

    public Collection<InetSocketAddress> nodeAddresses(ClusterNode node, boolean filterReachableAddresses) throws IgniteCheckedException {
        LinkedHashSet<InetSocketAddress> addrs;
        boolean isExtAddrsExist;
        Collection rmtAddrs0 = (Collection)node.attribute(this.createSpiAttributeName(ATTR_ADDRS));
        Collection rmtHostNames0 = (Collection)node.attribute(this.createSpiAttributeName(ATTR_HOST_NAMES));
        Integer boundPort = (Integer)node.attribute(this.createSpiAttributeName(ATTR_PORT));
        Collection extAddrs = (Collection)node.attribute(this.createSpiAttributeName(ATTR_EXT_ADDRS));
        boolean isRmtAddrsExist = !F.isEmpty(rmtAddrs0) && boundPort != null;
        boolean bl = isExtAddrsExist = !F.isEmpty(extAddrs);
        if (!isRmtAddrsExist && !isExtAddrsExist) {
            throw new IgniteCheckedException("Failed to send message to the destination node. Node doesn't have any TCP communication addresses or mapped external addresses. Check configuration and make sure that you use the same communication SPI on all nodes. Remote node id: " + node.id());
        }
        if (isRmtAddrsExist) {
            ArrayList<InetSocketAddress> addrs0 = new ArrayList<InetSocketAddress>(U.toSocketAddresses(rmtAddrs0, rmtHostNames0, boundPort));
            boolean sameHost = U.sameMacs(this.getSpiContext().localNode(), node);
            addrs0.sort(U.inetAddressesComparator(sameHost));
            addrs = new LinkedHashSet<InetSocketAddress>(addrs0);
        } else {
            addrs = new LinkedHashSet<InetSocketAddress>();
        }
        if (isExtAddrsExist) {
            addrs.addAll(extAddrs);
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Addresses resolved from attributes [rmtNode=" + node.id() + ", addrs=" + addrs + ", isRmtAddrsExist=" + isRmtAddrsExist + ']');
        }
        if (filterReachableAddresses) {
            HashSet<InetAddress> allInetAddrs = U.newHashSet(addrs.size());
            for (InetSocketAddress addr : addrs) {
                if (addr.isUnresolved()) continue;
                allInetAddrs.add(addr.getAddress());
            }
            List<InetAddress> reachableInetAddrs = U.filterReachable(allInetAddrs);
            if (reachableInetAddrs.size() < allInetAddrs.size()) {
                LinkedHashSet addrs0 = U.newLinkedHashSet(addrs.size());
                ArrayList<InetSocketAddress> unreachableInetAddr = new ArrayList<InetSocketAddress>(allInetAddrs.size() - reachableInetAddrs.size());
                for (InetSocketAddress addr : addrs) {
                    if (reachableInetAddrs.contains(addr.getAddress())) {
                        addrs0.add(addr);
                        continue;
                    }
                    unreachableInetAddr.add(addr);
                }
                addrs0.addAll(unreachableInetAddr);
                addrs = addrs0;
            }
            if (this.log.isDebugEnabled()) {
                this.log.debug("Addresses to connect for node [rmtNode=" + node.id() + ", addrs=" + addrs + ']');
            }
        }
        return addrs;
    }

    protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
        GridNioSession ses = this.createNioSession(node, connIdx);
        return ses == null ? null : new GridTcpNioCommunicationClient(connIdx, ses, this.log);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private GridNioSession createNioSession(ClusterNode node, int connIdx) throws IgniteCheckedException {
        boolean locNodeIsSrv;
        boolean bl = locNodeIsSrv = !this.getLocalNode().isClient() && !this.getLocalNode().isDaemon();
        if (!(Thread.currentThread() instanceof IgniteDiscoveryThread) && locNodeIsSrv && node.isClient() && this.forceClientToServerConnections(node)) {
            String msg = "Failed to connect to node " + node.id() + " because it is started in 'forceClientToServerConnections' mode; inverse connection will be requested.";
            throw new NodeUnreachableException(msg);
        }
        Collection<InetSocketAddress> addrs = this.nodeAddresses(node);
        GridNioSession ses = null;
        IgniteCheckedException errs = null;
        long totalTimeout = this.failureDetectionTimeoutEnabled() ? (node.isClient() ? this.clientFailureDetectionTimeout() : this.failureDetectionTimeout()) : ExponentialBackoffTimeoutStrategy.totalBackoffTimeout(this.connTimeout, this.maxConnTimeout, this.reconCnt);
        HashSet<InetSocketAddress> failedAddrsSet = new HashSet<InetSocketAddress>();
        int skippedAddrs = 0;
        for (InetSocketAddress addr : addrs) {
            if (addr.isUnresolved()) {
                failedAddrsSet.add(addr);
                continue;
            }
            ExponentialBackoffTimeoutStrategy connTimeoutStgy = new ExponentialBackoffTimeoutStrategy(totalTimeout, this.failureDetectionTimeoutEnabled() ? 500L : this.connTimeout, this.maxConnTimeout);
            while (ses == null) {
                CommunicationWorker commWorker0;
                block55: {
                    String msg;
                    if (this.stopping) {
                        throw new IgniteSpiException("Node is stopping.");
                    }
                    if (this.isLocalNodeAddress(addr)) {
                        if (this.log.isDebugEnabled()) {
                            this.log.debug("Skipping local address [addr=" + addr + ", locAddrs=" + node.attribute(this.createSpiAttributeName(ATTR_ADDRS)) + ", node=" + node + ']');
                        }
                        ++skippedAddrs;
                        break;
                    }
                    long timeout = 0L;
                    this.connectGate.enter();
                    try {
                        if (this.getSpiContext().node(node.id()) == null) {
                            throw new ClusterTopologyCheckedException("Failed to send message (node left topology): " + node);
                        }
                        SocketChannel ch = this.openSocketChannel();
                        ch.configureBlocking(true);
                        ch.socket().setTcpNoDelay(this.tcpNoDelay);
                        ch.socket().setKeepAlive(true);
                        if (this.sockRcvBuf > 0) {
                            ch.socket().setReceiveBufferSize(this.sockRcvBuf);
                        }
                        if (this.sockSndBuf > 0) {
                            ch.socket().setSendBufferSize(this.sockSndBuf);
                        }
                        ConnectionKey connKey = new ConnectionKey(node.id(), connIdx, -1L);
                        GridNioRecoveryDescriptor recoveryDesc = this.outRecoveryDescriptor(node, connKey);
                        assert (recoveryDesc != null) : "Recovery descriptor not found [connKey=" + connKey + ", rmtNode=" + node.id() + ']';
                        if (!recoveryDesc.reserve()) {
                            U.closeQuiet(ch);
                            GridNioSession sesFromRecovery = recoveryDesc.session();
                            if (sesFromRecovery != null) {
                                while (sesFromRecovery.closeTime() == 0L) {
                                    sesFromRecovery.close();
                                }
                            }
                            GridNioSession gridNioSession = null;
                            return gridNioSession;
                        }
                        HashMap<Integer, Object> meta = new HashMap<Integer, Object>();
                        GridSslMeta sslMeta = null;
                        try {
                            ClusterNode locNode;
                            timeout = connTimeoutStgy.nextTimeout();
                            ch.socket().connect(addr, (int)timeout);
                            if (this.getSpiContext().node(node.id()) == null) {
                                throw new ClusterTopologyCheckedException("Failed to send message (node left topology): " + node);
                            }
                            if (this.isSslEnabled()) {
                                sslMeta = new GridSslMeta();
                                meta.put(GridNioSessionMetaKey.SSL_META.ordinal(), sslMeta);
                                SSLEngine sslEngine = this.ignite.configuration().getSslContextFactory().create().createSSLEngine();
                                sslEngine.setUseClientMode(true);
                                sslMeta.sslEngine(sslEngine);
                            }
                            if ((locNode = this.getLocalNode()) == null) {
                                throw new IgniteCheckedException("Local node has not been started or fully initialized [isStopping=" + this.getSpiContext().isStopping() + ']');
                            }
                            timeout = connTimeoutStgy.nextTimeout(timeout);
                            long rcvCnt = this.safeTcpHandshake(ch, node.id(), timeout, sslMeta, new HandshakeMessage2(locNode.id(), recoveryDesc.incrementConnectCount(), recoveryDesc.received(), connIdx));
                            if (rcvCnt == -1L) {
                                GridNioSession gridNioSession = null;
                                return gridNioSession;
                            }
                            if (rcvCnt == -2L) {
                                throw new ClusterTopologyCheckedException("Remote node started stop procedure: " + node.id());
                            }
                            if (rcvCnt == -4L) {
                                throw new ClusterTopologyCheckedException("Remote node does not observe current node in topology : " + node.id());
                            }
                            if (rcvCnt == -3L) {
                                if (connTimeoutStgy.checkTimeout(200L)) {
                                    U.warn(this.log, "Handshake NEED_WAIT timed out (will stop attempts to perform the handshake) [node=" + node.id() + ", connTimeoutStgy=" + connTimeoutStgy + ", addr=" + addr + ", failureDetectionTimeoutEnabled=" + this.failureDetectionTimeoutEnabled() + ", timeout=" + timeout + ']');
                                    throw new ClusterTopologyCheckedException("Failed to connect to node (current or target node is out of topology on target node within timeout). Make sure that each ComputeTask and cache Transaction has a timeout set in order to prevent parties from waiting forever in case of network issues [nodeId=" + node.id() + ", addrs=" + addrs + ']');
                                }
                                if (this.log.isDebugEnabled()) {
                                    this.log.debug("NEED_WAIT received, handshake after delay [node = " + node + ", outOfTopologyDelay = " + 200 + "ms]");
                                }
                                U.sleep(200L);
                                continue;
                            }
                            if (rcvCnt < 0L) {
                                throw new IgniteCheckedException("Unsupported negative receivedCount [rcvCnt=" + rcvCnt + ", senderNode=" + node + ']');
                            }
                            recoveryDesc.onHandshake(rcvCnt);
                            meta.put(CONSISTENT_ID_META, node.consistentId());
                            meta.put(CONN_IDX_META, connKey);
                            meta.put(GridNioServer.RECOVERY_DESC_META_KEY, recoveryDesc);
                            ses = (GridNioSession)this.nioSrvr.createSession(ch, meta, false, null).get();
                        }
                        finally {
                            if (ses != null) continue;
                            U.closeQuiet(ch);
                            if (recoveryDesc == null) continue;
                            recoveryDesc.release();
                            continue;
                        }
                    }
                    catch (IgniteSpiOperationTimeoutException e) {
                        if (ses != null) {
                            ses.close();
                            ses = null;
                        }
                        this.onException("Handshake timed out (will retry with increased timeout) [connTimeoutStrategy=" + connTimeoutStgy + ", addr=" + addr + ']', e);
                        if (this.log.isDebugEnabled()) {
                            this.log.debug("Handshake timed out (will retry with increased timeout) [connTimeoutStrategy=" + connTimeoutStgy + ", addr=" + addr + ", err=" + e + ']');
                        }
                        if (connTimeoutStgy.checkTimeout()) {
                            U.warn(this.log, "Handshake timed out (will stop attempts to perform the handshake) [node=" + node.id() + ", connTimeoutStrategy=" + connTimeoutStgy + ", err=" + e.getMessage() + ", addr=" + addr + ", failureDetectionTimeoutEnabled=" + this.failureDetectionTimeoutEnabled() + ", timeout=" + timeout + ']');
                            msg = "Failed to connect to node (is node still alive?). Make sure that each ComputeTask and cache Transaction has a timeout set in order to prevent parties from waiting forever in case of network issues [nodeId=" + node.id() + ", addrs=" + addrs + ']';
                            if (errs == null) {
                                errs = new IgniteCheckedException(msg, e);
                                break;
                            }
                            errs.addSuppressed(new IgniteCheckedException(msg, e));
                            break;
                        }
                    }
                    catch (ClusterTopologyCheckedException e) {
                        throw e;
                    }
                    catch (Exception e) {
                        if (ses != null) {
                            ses.close();
                            ses = null;
                        }
                        this.onException("Client creation failed [addr=" + addr + ", err=" + e + ']', e);
                        if (this.log.isDebugEnabled()) {
                            this.log.debug("Client creation failed [addr=" + addr + ", err=" + e + ']');
                        }
                        if (X.hasCause((Throwable)e, "Too many open files", SocketException.class)) {
                            throw new IgniteTooManyOpenFilesException(e);
                        }
                        if (connTimeoutStgy.checkTimeout()) {
                            U.warn(this.log, "Connection timed out (will stop attempts to perform the connect) [node=" + node.id() + ", connTimeoutStgy=" + connTimeoutStgy + ", failureDetectionTimeoutEnabled=" + this.failureDetectionTimeoutEnabled() + ", timeout=" + timeout + ", err=" + e.getMessage() + ", addr=" + addr + ']');
                            msg = "Failed to connect to node (is node still alive?). Make sure that each ComputeTask and cache Transaction has a timeout set in order to prevent parties from waiting forever in case of network issues [nodeId=" + node.id() + ", addrs=" + addrs + ']';
                            if (errs == null) {
                                errs = new IgniteCheckedException(msg, e);
                                break;
                            }
                            errs.addSuppressed(new IgniteCheckedException(msg, e));
                            break;
                        }
                        if (node.isClient() && this.isNodeUnreachableException(e)) {
                            failedAddrsSet.add(addr);
                        }
                        if (this.isRecoverableException(e)) {
                            U.sleep(50L);
                            break block55;
                        }
                        msg = "Failed to connect to node due to unrecoverable exception (is node still alive?). Make sure that each ComputeTask and cache Transaction has a timeout set in order to prevent parties from waiting forever in case of network issues [nodeId=" + node.id() + ", addrs=" + addrs + ", err= " + e + ']';
                        if (errs == null) {
                            errs = new IgniteCheckedException(msg, e);
                            break;
                        }
                        errs.addSuppressed(new IgniteCheckedException(msg, e));
                        break;
                    }
                    finally {
                        this.connectGate.leave();
                        continue;
                    }
                }
                if ((commWorker0 = this.commWorker) == null || commWorker0.runner() != Thread.currentThread()) continue;
                commWorker0.updateHeartbeat();
            }
            if (ses == null) continue;
            break;
        }
        if (ses == null) {
            if (!(Thread.currentThread() instanceof IgniteDiscoveryThread) && locNodeIsSrv && node.isClient() && addrs.size() - skippedAddrs == failedAddrsSet.size()) {
                String msg = "Failed to connect to all addresses of node " + node.id() + ": " + failedAddrsSet + "; inverse connection will be requested.";
                throw new NodeUnreachableException(msg);
            }
            this.processSessionCreationError(node, addrs, errs == null ? new IgniteCheckedException("No session found") : errs);
        }
        return ses;
    }

    protected SocketChannel openSocketChannel() throws IOException {
        return SocketChannel.open();
    }

    void closeConnections(UUID nodeId) throws IgniteCheckedException {
        GridCommunicationClient[] clients = (GridCommunicationClient[])this.clients.remove(nodeId);
        if (Objects.nonNull(clients)) {
            for (GridCommunicationClient client : clients) {
                client.forceClose();
            }
        }
        for (ConnectionKey connKey : this.clientFuts.keySet()) {
            GridFutureAdapter fut;
            if (!nodeId.equals(connKey.nodeId()) || !Objects.nonNull(fut = (GridFutureAdapter)this.clientFuts.remove(connKey))) continue;
            ((GridCommunicationClient)fut.get()).forceClose();
        }
    }

    private boolean isLocalNodeAddress(InetSocketAddress addr) {
        return addr.getPort() == this.boundTcpPort && (this.locHost.equals(addr.getAddress()) || addr.getAddress().isAnyLocalAddress() || this.locHost.isAnyLocalAddress() && U.isLocalAddress(addr.getAddress()));
    }

    protected void processSessionCreationError(ClusterNode node, Collection<InetSocketAddress> addrs, IgniteCheckedException errs) throws IgniteCheckedException {
        assert (errs != null);
        boolean commErrResolve = false;
        IgniteSpiContext ctx = this.getSpiContext();
        if (this.isRecoverableException(errs) && ctx.communicationFailureResolveSupported()) {
            commErrResolve = true;
            ctx.resolveCommunicationFailure(node, errs);
        }
        if (!commErrResolve && this.enableForcibleNodeKill && ctx.node(node.id()) != null && node.isClient() && !this.getLocalNode().isClient() && this.isRecoverableException(errs)) {
            String msg = "TcpCommunicationSpi failed to establish connection to node, node will be dropped from cluster [rmtNode=" + node + ']';
            if (this.enableTroubleshootingLog) {
                U.error(this.log, msg, errs);
            } else {
                U.warn(this.log, msg);
            }
            ctx.failNode(node.id(), "TcpCommunicationSpi failed to establish connection to node [rmtNode=" + node + ", errs=" + errs + ", connectErrs=" + X.getSuppressedList(errs) + ']');
        }
        throw errs;
    }

    private boolean isRecoverableException(Exception errs) {
        return X.hasCause((Throwable)errs, IOException.class, HandshakeException.class, IgniteSpiOperationTimeoutException.class);
    }

    private boolean isNodeUnreachableException(Exception e) {
        return e instanceof SocketTimeoutException;
    }

    private IgniteSpiOperationTimeoutException handshakeTimeoutException() {
        return new IgniteSpiOperationTimeoutException("Failed to perform handshake due to timeout (consider increasing 'connectionTimeout' configuration property).");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void safeShmemHandshake(GridCommunicationClient client, UUID rmtNodeId, long timeout) throws IgniteCheckedException {
        HandshakeTimeoutObject obj = new HandshakeTimeoutObject(client, U.currentTimeMillis() + timeout);
        this.addTimeoutObject(obj);
        try {
            client.doHandshake(new HandshakeClosure(rmtNodeId));
        }
        finally {
            if (!obj.cancel()) {
                throw this.handshakeTimeoutException();
            }
            this.removeTimeoutObject(obj);
        }
    }

    private long safeTcpHandshake(SocketChannel ch, UUID rmtNodeId, long timeout, GridSslMeta sslMeta, HandshakeMessage msg) throws IgniteCheckedException {
        long rcvCnt;
        HandshakeTimeoutObject obj = new HandshakeTimeoutObject(ch, U.currentTimeMillis() + timeout);
        this.addTimeoutObject(obj);
        try {
            short msgType;
            int read;
            ByteBuffer buf;
            BlockingSslHandler sslHnd = null;
            if (this.isSslEnabled()) {
                short msgType2;
                assert (sslMeta != null);
                sslHnd = new BlockingSslHandler(sslMeta.sslEngine(), ch, this.directBuf, ByteOrder.LITTLE_ENDIAN, this.log);
                if (!sslHnd.handshake()) {
                    throw new HandshakeException("SSL handshake is not completed.");
                }
                ByteBuffer handBuff = sslHnd.applicationBuffer();
                if (handBuff.remaining() >= 2 && (msgType2 = TcpCommunicationSpi.makeMessageType(handBuff.get(0), handBuff.get(1))) == -28) {
                    long l = -3L;
                    return l;
                }
                if (handBuff.remaining() < 18) {
                    buf = ByteBuffer.allocate(1000);
                    read = ch.read(buf);
                    if (read == -1) {
                        throw new HandshakeException("Failed to read remote node ID (connection closed).");
                    }
                    buf.flip();
                    buf = sslHnd.decode(buf);
                    if (handBuff.remaining() >= 2 && (msgType = TcpCommunicationSpi.makeMessageType(handBuff.get(0), handBuff.get(1))) == -28) {
                        long l = -3L;
                        return l;
                    }
                } else {
                    buf = handBuff;
                }
            } else {
                buf = ByteBuffer.allocate(18);
                for (int i = 0; i < 18; i += read) {
                    read = ch.read(buf);
                    if (read == -1) {
                        throw new HandshakeException("Failed to read remote node ID (connection closed).");
                    }
                    if (read < 2 || (msgType = TcpCommunicationSpi.makeMessageType(buf.get(0), buf.get(1))) != -28) continue;
                    long l = -3L;
                    return l;
                }
            }
            UUID rmtNodeId0 = U.bytesToUuid(buf.array(), 2);
            if (!rmtNodeId.equals(rmtNodeId0)) {
                throw new HandshakeException("Remote node ID is not as expected [expected=" + rmtNodeId + ", rcvd=" + rmtNodeId0 + ']');
            }
            if (this.log.isDebugEnabled()) {
                this.log.debug("Received remote node ID: " + rmtNodeId0);
            }
            if (this.isSslEnabled()) {
                assert (sslHnd != null);
                U.writeFully(ch, sslHnd.encrypt(ByteBuffer.wrap(U.IGNITE_HEADER)));
            } else {
                U.writeFully(ch, ByteBuffer.wrap(U.IGNITE_HEADER));
            }
            if (this.log.isDebugEnabled()) {
                this.log.debug("Writing handshake message [rmtNode=" + rmtNodeId + ", msg=" + msg + ']');
            }
            buf = ByteBuffer.allocate(msg.getMessageSize());
            buf.order(ByteOrder.LITTLE_ENDIAN);
            boolean written = msg.writeTo(buf, null);
            assert (written);
            buf.flip();
            if (this.isSslEnabled()) {
                assert (sslHnd != null);
                U.writeFully(ch, sslHnd.encrypt(buf));
            } else {
                U.writeFully(ch, buf);
            }
            if (this.log.isDebugEnabled()) {
                this.log.debug("Waiting for handshake [rmtNode=" + rmtNodeId + ']');
            }
            if (this.isSslEnabled()) {
                ByteBuffer inBuf;
                ByteBuffer decode0;
                assert (sslHnd != null);
                buf = ByteBuffer.allocate(1000);
                buf.order(ByteOrder.LITTLE_ENDIAN);
                ByteBuffer decode = ByteBuffer.allocate(2 * buf.capacity());
                decode.order(ByteOrder.LITTLE_ENDIAN);
                for (int i = 0; i < 10; i += decode0.remaining()) {
                    int read2 = ch.read(buf);
                    if (read2 == -1) {
                        throw new HandshakeException("Failed to read remote node recovery handshake (connection closed).");
                    }
                    buf.flip();
                    decode0 = sslHnd.decode(buf);
                    decode = this.appendAndResizeIfNeeded(decode, decode0);
                    buf.clear();
                }
                decode.flip();
                rcvCnt = decode.getLong(2);
                if (decode.limit() > 10) {
                    decode.position(10);
                    sslMeta.decodedBuffer(decode);
                }
                if ((inBuf = sslHnd.inputBuffer()).position() > 0) {
                    sslMeta.encodedBuffer(inBuf);
                }
            } else {
                int read3;
                buf = ByteBuffer.allocate(10);
                buf.order(ByteOrder.LITTLE_ENDIAN);
                for (int i = 0; i < 10; i += read3) {
                    read3 = ch.read(buf);
                    if (read3 != -1) continue;
                    throw new HandshakeException("Failed to read remote node recovery handshake (connection closed).");
                }
                rcvCnt = buf.getLong(2);
            }
            if (this.log.isDebugEnabled()) {
                this.log.debug("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']');
            }
            if (rcvCnt == -1L && this.log.isDebugEnabled()) {
                this.log.debug("Connection rejected, will retry client creation [rmtNode=" + rmtNodeId + ']');
            }
        }
        catch (IOException e) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Failed to read from channel: " + e);
            }
            throw new IgniteCheckedException("Failed to read from channel.", e);
        }
        finally {
            if (!obj.cancel()) {
                throw this.handshakeTimeoutException();
            }
            this.removeTimeoutObject(obj);
        }
        return rcvCnt;
    }

    protected void notifyListener(UUID sndId, Message msg, IgniteRunnable msgC) {
        CommunicationListener<Message> lsnr = this.lsnr;
        MTC.span().addLog(() -> "Communication listeners notified");
        if (lsnr != null) {
            lsnr.onMessage(sndId, msg, msgC);
        } else if (this.log.isDebugEnabled()) {
            this.log.debug("Received communication message without any registered listeners (will ignore, is node stopping?) [senderNodeId=" + sndId + ", msg=" + msg + ']');
        }
    }

    private void notifyChannelEvtListener(UUID nodeId, Channel channel, Message initMsg) {
        CommunicationListener<Message> lsnr0;
        if (this.log.isDebugEnabled()) {
            this.log.debug("Notify appropriate listeners due to a new channel opened: " + channel);
        }
        if ((lsnr0 = this.lsnr) instanceof CommunicationListenerEx) {
            ((CommunicationListenerEx)lsnr0).onChannelOpened(nodeId, initMsg, channel);
        }
    }

    private ByteBuffer appendAndResizeIfNeeded(ByteBuffer target, ByteBuffer src) {
        if (target.remaining() < src.remaining()) {
            int newSize = Math.max(target.capacity() * 2, target.capacity() + src.remaining());
            ByteBuffer tmp = ByteBuffer.allocate(newSize);
            tmp.order(target.order());
            target.flip();
            tmp.put(target);
            target = tmp;
        }
        target.put(src);
        return target;
    }

    public void simulateNodeFailure() {
        if (this.nioSrvr != null) {
            this.nioSrvr.stop();
        }
        if (this.commWorker != null) {
            U.interrupt(this.commWorker.runner());
        }
        U.join(this.commWorker, this.log);
        Iterator iterator = this.clients.values().iterator();
        while (iterator.hasNext()) {
            GridCommunicationClient[] clients0;
            for (GridCommunicationClient client : clients0 = (GridCommunicationClient[])iterator.next()) {
                if (client == null) continue;
                client.forceClose();
            }
        }
    }

    private GridNioRecoveryDescriptor outRecoveryDescriptor(ClusterNode node, ConnectionKey key) {
        if (this.usePairedConnections(node)) {
            return this.recoveryDescriptor(this.outRecDescs, true, node, key);
        }
        return this.recoveryDescriptor(this.recoveryDescs, false, node, key);
    }

    private GridNioRecoveryDescriptor inRecoveryDescriptor(ClusterNode node, ConnectionKey key) {
        if (this.usePairedConnections(node)) {
            return this.recoveryDescriptor(this.inRecDescs, true, node, key);
        }
        return this.recoveryDescriptor(this.recoveryDescs, false, node, key);
    }

    private boolean usePairedConnections(ClusterNode node) {
        if (this.usePairedConnections) {
            Boolean attr = (Boolean)node.attribute(this.createSpiAttributeName(ATTR_PAIRED_CONN));
            return attr != null && attr != false;
        }
        return false;
    }

    private void cleanupLocalNodeRecoveryDescriptor(ConnectionKey key) {
        ClusterNode node = this.getLocalNode();
        if (this.usePairedConnections(node)) {
            this.inRecDescs.remove(key);
            this.outRecDescs.remove(key);
        } else {
            this.recoveryDescs.remove(key);
        }
    }

    private boolean forceClientToServerConnections(ClusterNode node) {
        Boolean forceClientToSrvConnections = (Boolean)node.attribute(this.createSpiAttributeName(ATTR_FORCE_CLIENT_SERVER_CONNECTIONS));
        return Boolean.TRUE.equals(forceClientToSrvConnections);
    }

    private GridNioRecoveryDescriptor recoveryDescriptor(ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> recoveryDescs, boolean pairedConnections, ClusterNode node, ConnectionKey key) {
        GridNioRecoveryDescriptor recovery = (GridNioRecoveryDescriptor)recoveryDescs.get(key);
        if (recovery == null) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Missing recovery descriptor for the node (will create a new one) [locNodeId=" + this.getLocalNode().id() + ", key=" + key + ", rmtNode=" + node + ']');
            }
            int maxSize = Math.max(this.msgQueueLimit, this.ackSndThreshold);
            int queueLimit = this.unackedMsgsBufSize != 0 ? this.unackedMsgsBufSize : maxSize * 128;
            recovery = new GridNioRecoveryDescriptor(pairedConnections, queueLimit, node, this.log);
            GridNioRecoveryDescriptor old = recoveryDescs.putIfAbsent(key, recovery);
            if (old != null) {
                recovery = old;
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Will use existing recovery descriptor: " + recovery);
                }
            } else if (this.log.isDebugEnabled()) {
                this.log.debug("Initialized recovery descriptor [desc=" + recovery + ", maxSize=" + maxSize + ", queueLimit=" + queueLimit + ']');
            }
        }
        return recovery;
    }

    private void onException(String msg, Exception e) {
        this.getExceptionRegistry().onException(msg, e);
    }

    private NodeIdMessage nodeIdMessage() {
        UUID locNodeId = this.ignite instanceof IgniteEx ? ((IgniteEx)this.ignite).context().localNodeId() : this.safeLocalNodeId();
        return new NodeIdMessage(locNodeId);
    }

    private UUID safeLocalNodeId() {
        UUID id;
        ClusterNode locNode = this.getLocalNode();
        if (locNode == null) {
            U.warn(this.log, "Local node is not started or fully initialized [isStopping=" + this.getSpiContext().isStopping() + ']');
            id = new UUID(0L, 0L);
        } else {
            id = locNode.id();
        }
        return id;
    }

    @Override
    public TcpCommunicationSpi setName(String name) {
        super.setName(name);
        return this;
    }

    private boolean isHandshakeWaitSupported() {
        DiscoverySpi discoSpi = this.ignite().configuration().getDiscoverySpi();
        if (discoSpi instanceof IgniteDiscoverySpi) {
            return ((IgniteDiscoverySpi)discoSpi).allNodesSupport(IgniteFeatures.TCP_COMMUNICATION_SPI_HANDSHAKE_WAIT_MESSAGE);
        }
        Collection<ClusterNode> nodes = discoSpi.getRemoteNodes();
        return IgniteFeatures.allNodesSupports(nodes, IgniteFeatures.TCP_COMMUNICATION_SPI_HANDSHAKE_WAIT_MESSAGE);
    }

    public String toString() {
        return S.toString(TcpCommunicationSpi.class, this);
    }

    private static void writeMessageType(OutputStream os, short type) throws IOException {
        os.write((byte)(type & 0xFF));
        os.write((byte)(type >> 8 & 0xFF));
    }

    public static void writeMessageType(ByteBuffer buf, short type) {
        buf.put((byte)(type & 0xFF));
        buf.put((byte)(type >> 8 & 0xFF));
    }

    public static short makeMessageType(byte b0, byte b1) {
        return (short)((b1 & 0xFF) << 8 | b0 & 0xFF);
    }

    private static WorkersRegistry getWorkersRegistry(Ignite ignite) {
        return ignite instanceof IgniteEx ? ((IgniteEx)ignite).context().workersRegistry() : null;
    }

    public IgniteInternalFuture<Channel> openChannel(ClusterNode remote, Message initMsg) throws IgniteSpiException {
        assert (!remote.isLocal()) : remote;
        assert (initMsg != null);
        assert (this.chConnPlc != null);
        assert (IgniteFeatures.nodeSupports(remote, IgniteFeatures.CHANNEL_COMMUNICATION)) : "Node doesn't support direct connection over socket channel [nodeId=" + remote.id() + ']';
        ConnectionKey key = new ConnectionKey(remote.id(), this.chConnPlc.connectionIndex());
        GridFutureAdapter<Channel> chFut = new GridFutureAdapter<Channel>();
        this.connectGate.enter();
        try {
            final GridNioSession ses = this.createNioSession(remote, key.connectionIndex());
            assert (ses != null) : "Session must be established [remoteId=" + remote.id() + ", key=" + key + ']';
            this.cleanupLocalNodeRecoveryDescriptor(key);
            ses.addMeta(CHANNEL_FUT_META, chFut);
            ses.send(initMsg).listen(f -> {
                if (f.error() != null) {
                    GridFutureAdapter rq = (GridFutureAdapter)ses.meta(CHANNEL_FUT_META);
                    assert (rq != null);
                    rq.onDone(f.error());
                    ses.close();
                    return;
                }
                this.addTimeoutObject(new IgniteSpiTimeoutObject(){

                    @Override
                    public IgniteUuid id() {
                        return IgniteUuid.randomUuid();
                    }

                    @Override
                    public long endTime() {
                        return U.currentTimeMillis() + TcpCommunicationSpi.this.connTimeout;
                    }

                    @Override
                    public void onTimeout() {
                        GridFutureAdapter rq = (GridFutureAdapter)ses.meta(CHANNEL_FUT_META);
                        assert (rq != null);
                        if (rq.onDone(TcpCommunicationSpi.this.handshakeTimeoutException())) {
                            ses.close();
                        }
                    }
                });
            });
            GridFutureAdapter<Channel> gridFutureAdapter = chFut;
            return gridFutureAdapter;
        }
        catch (IgniteCheckedException e) {
            throw new IgniteSpiException("Unable to create new channel connection to the remote node: " + remote, e);
        }
        finally {
            this.connectGate.leave();
        }
    }

    private boolean isChannelConnIdx(int connIdx) {
        return connIdx > 1024;
    }

    private class TcpCommunicationSpiMBeanImpl
    extends IgniteSpiMBeanAdapter
    implements TcpCommunicationSpiMBean {
        TcpCommunicationSpiMBeanImpl(IgniteSpiAdapter spiAdapter) {
            super(spiAdapter);
        }

        @Override
        public String getLocalAddress() {
            return TcpCommunicationSpi.this.getLocalAddress();
        }

        @Override
        public int getLocalPort() {
            return TcpCommunicationSpi.this.getLocalPort();
        }

        @Override
        public int getLocalPortRange() {
            return TcpCommunicationSpi.this.getLocalPortRange();
        }

        @Override
        public boolean isUsePairedConnections() {
            return TcpCommunicationSpi.this.isUsePairedConnections();
        }

        @Override
        public int getConnectionsPerNode() {
            return TcpCommunicationSpi.this.getConnectionsPerNode();
        }

        @Override
        public int getSharedMemoryPort() {
            return TcpCommunicationSpi.this.getSharedMemoryPort();
        }

        @Override
        public long getIdleConnectionTimeout() {
            return TcpCommunicationSpi.this.getIdleConnectionTimeout();
        }

        @Override
        public long getSocketWriteTimeout() {
            return TcpCommunicationSpi.this.getSocketWriteTimeout();
        }

        @Override
        public int getAckSendThreshold() {
            return TcpCommunicationSpi.this.getAckSendThreshold();
        }

        @Override
        public int getUnacknowledgedMessagesBufferSize() {
            return TcpCommunicationSpi.this.getUnacknowledgedMessagesBufferSize();
        }

        @Override
        public long getConnectTimeout() {
            return TcpCommunicationSpi.this.getConnectTimeout();
        }

        @Override
        public long getMaxConnectTimeout() {
            return TcpCommunicationSpi.this.getMaxConnectTimeout();
        }

        @Override
        public int getReconnectCount() {
            return TcpCommunicationSpi.this.getReconnectCount();
        }

        @Override
        public boolean isDirectBuffer() {
            return TcpCommunicationSpi.this.isDirectBuffer();
        }

        @Override
        public boolean isDirectSendBuffer() {
            return TcpCommunicationSpi.this.isDirectSendBuffer();
        }

        @Override
        public int getSelectorsCount() {
            return TcpCommunicationSpi.this.getSelectorsCount();
        }

        @Override
        public long getSelectorSpins() {
            return TcpCommunicationSpi.this.getSelectorSpins();
        }

        @Override
        public boolean isTcpNoDelay() {
            return TcpCommunicationSpi.this.isTcpNoDelay();
        }

        @Override
        public int getSocketReceiveBuffer() {
            return TcpCommunicationSpi.this.getSocketReceiveBuffer();
        }

        @Override
        public int getSocketSendBuffer() {
            return TcpCommunicationSpi.this.getSocketSendBuffer();
        }

        @Override
        public int getMessageQueueLimit() {
            return TcpCommunicationSpi.this.getMessageQueueLimit();
        }

        @Override
        public int getSlowClientQueueLimit() {
            return TcpCommunicationSpi.this.getSlowClientQueueLimit();
        }

        @Override
        public void dumpStats() {
            TcpCommunicationSpi.this.dumpStats();
        }

        @Override
        public int getSentMessagesCount() {
            return TcpCommunicationSpi.this.getSentMessagesCount();
        }

        @Override
        public long getSentBytesCount() {
            return TcpCommunicationSpi.this.getSentBytesCount();
        }

        @Override
        public int getReceivedMessagesCount() {
            return TcpCommunicationSpi.this.getReceivedMessagesCount();
        }

        @Override
        public long getReceivedBytesCount() {
            return TcpCommunicationSpi.this.getReceivedBytesCount();
        }

        @Override
        public Map<String, Long> getReceivedMessagesByType() {
            return TcpCommunicationSpi.this.getReceivedMessagesByType();
        }

        @Override
        public Map<UUID, Long> getReceivedMessagesByNode() {
            return TcpCommunicationSpi.this.getReceivedMessagesByNode();
        }

        @Override
        public Map<String, Long> getSentMessagesByType() {
            return TcpCommunicationSpi.this.getSentMessagesByType();
        }

        @Override
        public Map<UUID, Long> getSentMessagesByNode() {
            return TcpCommunicationSpi.this.getSentMessagesByNode();
        }

        @Override
        public int getOutboundMessagesQueueSize() {
            return TcpCommunicationSpi.this.getOutboundMessagesQueueSize();
        }
    }

    private class RoundRobinConnectionPolicy
    implements ConnectionPolicy {
        private RoundRobinConnectionPolicy() {
        }

        @Override
        public int connectionIndex() {
            return (int)(U.safeAbs(Thread.currentThread().getId()) % (long)TcpCommunicationSpi.this.connectionsPerNode);
        }
    }

    private static class FirstConnectionPolicy
    implements ConnectionPolicy {
        private FirstConnectionPolicy() {
        }

        @Override
        public int connectionIndex() {
            return 0;
        }
    }

    static interface ConnectionPolicy {
        public int connectionIndex();
    }

    private static class DisconnectedSessionInfo {
        private final GridNioRecoveryDescriptor recoveryDesc;
        private int connIdx;

        DisconnectedSessionInfo(@Nullable GridNioRecoveryDescriptor recoveryDesc, int connIdx) {
            this.recoveryDesc = recoveryDesc;
            this.connIdx = connIdx;
        }

        public String toString() {
            return S.toString(DisconnectedSessionInfo.class, this);
        }
    }

    private static class ConnectGateway {
        private GridSpinReadWriteLock lock = new GridSpinReadWriteLock();
        private IgniteException err;

        private ConnectGateway() {
        }

        void enter() {
            this.lock.readLock();
            if (this.err != null) {
                this.lock.readUnlock();
                throw this.err;
            }
        }

        boolean tryEnter() {
            boolean res;
            this.lock.readLock();
            boolean bl = res = this.err == null;
            if (!res) {
                this.lock.readUnlock();
            }
            return res;
        }

        void leave() {
            this.lock.readUnlock();
        }

        void disconnected(IgniteFuture<?> reconnectFut) {
            this.lock.writeLock();
            this.err = new IgniteClientDisconnectedException(reconnectFut, "Failed to connect, client node disconnected.");
            this.lock.writeUnlock();
        }

        void reconnected() {
            this.lock.writeLock();
            try {
                if (this.err instanceof IgniteClientDisconnectedException) {
                    this.err = null;
                }
            }
            finally {
                this.lock.writeUnlock();
            }
        }

        void stopped() {
            this.lock.readLock();
            this.err = new IgniteException("Failed to connect, node stopped.");
            this.lock.readUnlock();
        }
    }

    private class HandshakeClosure
    extends IgniteInClosure2X<InputStream, OutputStream> {
        private static final long serialVersionUID = 0L;
        private final UUID rmtNodeId;

        private HandshakeClosure(UUID rmtNodeId) {
            this.rmtNodeId = rmtNodeId;
        }

        @Override
        public void applyx(InputStream in, OutputStream out) throws IgniteCheckedException {
            try {
                int cnt;
                byte[] b = new byte[18];
                for (int n = 0; n < 18; n += cnt) {
                    cnt = in.read(b, n, 18 - n);
                    if (cnt >= 0) continue;
                    throw new IgniteCheckedException("Failed to get remote node ID (end of stream reached)");
                }
                UUID id = U.bytesToUuid(b, 2);
                if (!this.rmtNodeId.equals(id)) {
                    throw new IgniteCheckedException("Remote node ID is not as expected [expected=" + this.rmtNodeId + ", rcvd=" + id + ']');
                }
                if (TcpCommunicationSpi.this.log.isDebugEnabled()) {
                    TcpCommunicationSpi.this.log.debug("Received remote node ID: " + id);
                }
            }
            catch (SocketTimeoutException e) {
                throw new IgniteCheckedException("Failed to perform handshake due to timeout (consider increasing 'connectionTimeout' configuration property).", e);
            }
            catch (IOException e) {
                throw new IgniteCheckedException("Failed to perform handshake.", e);
            }
            try {
                ClusterNode localNode = TcpCommunicationSpi.this.getLocalNode();
                if (localNode == null) {
                    throw new IgniteSpiException("Local node has not been started or fully initialized [isStopping=" + TcpCommunicationSpi.this.getSpiContext().isStopping() + ']');
                }
                UUID id = localNode.id();
                NodeIdMessage msg = new NodeIdMessage(id);
                out.write(U.IGNITE_HEADER);
                TcpCommunicationSpi.writeMessageType(out, (short)-1);
                out.write(msg.nodeIdBytes());
                out.flush();
                if (TcpCommunicationSpi.this.log.isDebugEnabled()) {
                    TcpCommunicationSpi.this.log.debug("Sent local node ID [locNodeId=" + id + ", rmtNodeId=" + this.rmtNodeId + ']');
                }
            }
            catch (IOException e) {
                throw new IgniteCheckedException("Failed to perform handshake.", e);
            }
        }
    }

    private static class HandshakeTimeoutObject<T>
    implements IgniteSpiTimeoutObject {
        private final IgniteUuid id = IgniteUuid.randomUuid();
        private final T obj;
        private final long endTime;
        private final AtomicBoolean done = new AtomicBoolean();

        private HandshakeTimeoutObject(T obj, long endTime) {
            assert (obj != null);
            assert (obj instanceof GridCommunicationClient || obj instanceof SelectableChannel);
            assert (endTime > 0L);
            this.obj = obj;
            this.endTime = endTime;
        }

        boolean cancel() {
            return this.done.compareAndSet(false, true);
        }

        @Override
        public void onTimeout() {
            if (this.done.compareAndSet(false, true)) {
                if (this.obj instanceof GridCommunicationClient) {
                    ((GridCommunicationClient)this.obj).forceClose();
                } else {
                    U.closeQuiet((AutoCloseable)this.obj);
                }
            }
        }

        @Override
        public long endTime() {
            return this.endTime;
        }

        @Override
        public IgniteUuid id() {
            return this.id;
        }

        public String toString() {
            return S.toString(HandshakeTimeoutObject.class, this);
        }
    }

    private static class ConnectFuture
    extends GridFutureAdapter<GridCommunicationClient> {
        private ConnectFuture() {
        }
    }

    private class CommunicationWorker
    extends GridWorker {
        private final BlockingQueue<DisconnectedSessionInfo> q;

        private CommunicationWorker(String igniteInstanceName, IgniteLogger log) {
            super(igniteInstanceName, "tcp-comm-worker", log, TcpCommunicationSpi.getWorkersRegistry(TcpCommunicationSpi.this.ignite));
            this.q = new LinkedBlockingQueue<DisconnectedSessionInfo>();
        }

        @Override
        protected void body() throws InterruptedException {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Tcp communication worker has been started.");
            }
            Throwable err = null;
            try {
                while (!this.isCancelled()) {
                    DisconnectedSessionInfo disconnectData;
                    this.blockingSectionBegin();
                    try {
                        disconnectData = this.q.poll(TcpCommunicationSpi.this.idleConnTimeout, TimeUnit.MILLISECONDS);
                    }
                    finally {
                        this.blockingSectionEnd();
                    }
                    if (disconnectData != null) {
                        this.processDisconnect(disconnectData);
                    } else {
                        this.processIdle();
                    }
                    this.onIdle();
                }
            }
            catch (Throwable t) {
                if (!(t instanceof InterruptedException)) {
                    err = t;
                }
                throw t;
            }
            finally {
                if (TcpCommunicationSpi.this.ignite instanceof IgniteEx) {
                    if (err == null && !TcpCommunicationSpi.this.stopping) {
                        err = new IllegalStateException("Thread  " + TcpCommunicationSpi.this.getName() + " is terminated unexpectedly.");
                    }
                    if (err instanceof OutOfMemoryError) {
                        ((IgniteEx)TcpCommunicationSpi.this.ignite).context().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, err));
                    } else if (err != null) {
                        ((IgniteEx)TcpCommunicationSpi.this.ignite).context().failure().process(new FailureContext(FailureType.SYSTEM_WORKER_TERMINATION, err));
                    }
                }
            }
        }

        private void processIdle() {
            this.cleanupRecovery();
            for (Map.Entry e : TcpCommunicationSpi.this.clients.entrySet()) {
                UUID nodeId = (UUID)e.getKey();
                for (GridCommunicationClient client : (GridCommunicationClient[])e.getValue()) {
                    if (client == null) continue;
                    ClusterNode node = TcpCommunicationSpi.this.getSpiContext().node(nodeId);
                    if (node == null) {
                        if (this.log.isDebugEnabled()) {
                            this.log.debug("Forcing close of non-existent node connection: " + nodeId);
                        }
                        client.forceClose();
                        TcpCommunicationSpi.this.removeNodeClient(nodeId, client);
                        continue;
                    }
                    GridNioRecoveryDescriptor recovery = null;
                    if (!TcpCommunicationSpi.this.usePairedConnections(node) && client instanceof GridTcpNioCommunicationClient && (recovery = (GridNioRecoveryDescriptor)TcpCommunicationSpi.this.recoveryDescs.get(new ConnectionKey(node.id(), client.connectionIndex(), -1L))) != null && recovery.lastAcknowledged() != recovery.received()) {
                        RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received());
                        if (this.log.isDebugEnabled()) {
                            this.log.debug("Send recovery acknowledgement on timeout [rmtNode=" + nodeId + ", rcvCnt=" + msg.received() + ']');
                        }
                        try {
                            TcpCommunicationSpi.this.nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(), msg);
                            recovery.lastAcknowledged(msg.received());
                        }
                        catch (IgniteCheckedException err) {
                            U.error(this.log, "Failed to send message: " + err, err);
                        }
                        continue;
                    }
                    long idleTime = client.getIdleTime();
                    if (idleTime < TcpCommunicationSpi.this.idleConnTimeout) continue;
                    if (recovery == null && TcpCommunicationSpi.this.usePairedConnections(node)) {
                        recovery = (GridNioRecoveryDescriptor)TcpCommunicationSpi.this.outRecDescs.get(new ConnectionKey(node.id(), client.connectionIndex(), -1L));
                    }
                    if (recovery != null && recovery.nodeAlive(TcpCommunicationSpi.this.getSpiContext().node(nodeId)) && !recovery.messagesRequests().isEmpty()) {
                        if (!this.log.isDebugEnabled()) continue;
                        this.log.debug("Node connection is idle, but there are unacknowledged messages, will wait: " + nodeId);
                        continue;
                    }
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Closing idle node connection: " + nodeId);
                    }
                    if (!client.close() && !client.closed()) continue;
                    TcpCommunicationSpi.this.removeNodeClient(nodeId, client);
                }
            }
            for (GridNioSession ses : TcpCommunicationSpi.this.nioSrvr.sessions()) {
                GridNioRecoveryDescriptor recovery = ses.inRecoveryDescriptor();
                if (recovery == null || !TcpCommunicationSpi.this.usePairedConnections(recovery.node())) continue;
                assert (ses.accepted()) : ses;
                this.sendAckOnTimeout(recovery, ses);
            }
        }

        private void sendAckOnTimeout(GridNioRecoveryDescriptor recovery, GridNioSession ses) {
            if (recovery != null && recovery.lastAcknowledged() != recovery.received()) {
                RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received());
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Send recovery acknowledgement on timeout [rmtNode=" + recovery.node().id() + ", rcvCnt=" + msg.received() + ", lastAcked=" + recovery.lastAcknowledged() + ']');
                }
                try {
                    TcpCommunicationSpi.this.nioSrvr.sendSystem(ses, msg);
                    recovery.lastAcknowledged(msg.received());
                }
                catch (IgniteCheckedException e) {
                    U.error(this.log, "Failed to send message: " + e, e);
                }
            }
        }

        private void cleanupRecovery() {
            this.cleanupRecovery(TcpCommunicationSpi.this.recoveryDescs);
            this.cleanupRecovery(TcpCommunicationSpi.this.inRecDescs);
            this.cleanupRecovery(TcpCommunicationSpi.this.outRecDescs);
        }

        private void cleanupRecovery(ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> recoveryDescs) {
            GridNioRecoveryDescriptor recoveryDesc;
            HashSet left = null;
            for (Map.Entry e : recoveryDescs.entrySet()) {
                if (left != null && left.contains(e.getKey()) || (recoveryDesc = (GridNioRecoveryDescriptor)e.getValue()).nodeAlive(TcpCommunicationSpi.this.getSpiContext().node(((ConnectionKey)e.getKey()).nodeId()))) continue;
                if (left == null) {
                    left = new HashSet();
                }
                left.add(e.getKey());
            }
            if (left != null) {
                assert (!left.isEmpty());
                for (ConnectionKey id : left) {
                    recoveryDesc = (GridNioRecoveryDescriptor)recoveryDescs.get(id);
                    if (recoveryDesc == null || !recoveryDesc.onNodeLeft()) continue;
                    recoveryDescs.remove(id, recoveryDesc);
                }
            }
        }

        private void processDisconnect(DisconnectedSessionInfo sesInfo) {
            block13: {
                GridNioRecoveryDescriptor recoveryDesc = sesInfo.recoveryDesc;
                ClusterNode node = recoveryDesc.node();
                if (!recoveryDesc.nodeAlive(TcpCommunicationSpi.this.getSpiContext().node(node.id()))) {
                    return;
                }
                try {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']');
                    }
                    GridCommunicationClient client = TcpCommunicationSpi.this.reserveClient(node, sesInfo.connIdx);
                    client.release();
                }
                catch (ClusterTopologyCheckedException e) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Recovery reconnect failed, node stopping [rmtNode=" + recoveryDesc.node().id() + ']');
                    }
                }
                catch (IgniteTooManyOpenFilesException e) {
                    TcpCommunicationSpi.this.onException(e.getMessage(), e);
                    throw e;
                }
                catch (IgniteCheckedException | IgniteException e) {
                    try {
                        if (recoveryDesc.nodeAlive(TcpCommunicationSpi.this.getSpiContext().node(node.id())) && TcpCommunicationSpi.this.getSpiContext().pingNode(node.id())) {
                            if (this.log.isDebugEnabled()) {
                                this.log.debug("Recovery reconnect failed, will retry [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
                            }
                            this.addProcessDisconnectRequest(sesInfo);
                        } else {
                            if (this.log.isDebugEnabled()) {
                                this.log.debug("Recovery reconnect failed, node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
                            }
                            TcpCommunicationSpi.this.onException("Recovery reconnect failed, node left [rmtNode=" + recoveryDesc.node().id() + "]", e);
                        }
                    }
                    catch (IgniteClientDisconnectedException ignored) {
                        if (!this.log.isDebugEnabled()) break block13;
                        this.log.debug("Failed to ping node, client disconnected.");
                    }
                }
            }
        }

        void addProcessDisconnectRequest(DisconnectedSessionInfo sesInfo) {
            boolean add = this.q.add(sesInfo);
            assert (add);
        }
    }

    private class ShmemWorker
    extends GridWorker {
        private final IpcEndpoint endpoint;

        private ShmemWorker(IpcEndpoint endpoint) {
            super(TcpCommunicationSpi.this.igniteInstanceName, "shmem-worker", TcpCommunicationSpi.this.log);
            this.endpoint = endpoint;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected void body() throws InterruptedException {
            try {
                MessageFactory msgFactory = new MessageFactory(){
                    private MessageFactory impl;

                    @Override
                    @Nullable
                    public Message create(short type) {
                        if (this.impl == null) {
                            this.impl = TcpCommunicationSpi.this.getSpiContext().messageFactory();
                        }
                        assert (this.impl != null);
                        return this.impl.create(type);
                    }
                };
                GridNioMessageWriterFactory writerFactory = new GridNioMessageWriterFactory(){
                    private MessageFormatter formatter;

                    @Override
                    public MessageWriter writer(GridNioSession ses) throws IgniteCheckedException {
                        if (this.formatter == null) {
                            this.formatter = TcpCommunicationSpi.this.getSpiContext().messageFormatter();
                        }
                        assert (this.formatter != null);
                        ConnectionKey connKey = (ConnectionKey)ses.meta(CONN_IDX_META);
                        return connKey != null ? this.formatter.writer(connKey.nodeId()) : null;
                    }
                };
                GridNioMessageReaderFactory readerFactory = new GridNioMessageReaderFactory(){
                    private MessageFormatter formatter;

                    @Override
                    public MessageReader reader(GridNioSession ses, MessageFactory msgFactory) throws IgniteCheckedException {
                        if (this.formatter == null) {
                            this.formatter = TcpCommunicationSpi.this.getSpiContext().messageFormatter();
                        }
                        assert (this.formatter != null);
                        ConnectionKey connKey = (ConnectionKey)ses.meta(CONN_IDX_META);
                        return connKey != null ? this.formatter.reader(connKey.nodeId(), msgFactory) : null;
                    }
                };
                IpcToNioAdapter adapter = new IpcToNioAdapter(TcpCommunicationSpi.this.metricsLsnr.metricRegistry(), this.log, this.endpoint, TcpCommunicationSpi.this.srvLsnr, writerFactory, new GridNioTracerFilter(this.log, TcpCommunicationSpi.this.tracing), new GridNioCodecFilter(new GridDirectParser(this.log.getLogger(GridDirectParser.class), msgFactory, readerFactory), this.log, true), new GridConnectionBytesVerifyFilter(this.log));
                adapter.serve();
            }
            finally {
                TcpCommunicationSpi.this.shmemWorkers.remove(this);
                this.endpoint.close();
            }
        }

        @Override
        public void cancel() {
            super.cancel();
            this.endpoint.close();
        }

        @Override
        protected void cleanup() {
            super.cleanup();
            this.endpoint.close();
        }

        @Override
        public String toString() {
            return S.toString(ShmemWorker.class, this);
        }
    }

    private class DiscoveryListener
    implements GridLocalEventListener,
    HighPriorityListener {
        private DiscoveryListener() {
        }

        @Override
        public void onEvent(Event evt) {
            assert (evt instanceof DiscoveryEvent) : evt;
            assert (evt.type() == 11 || evt.type() == 12);
            ClusterNode node = ((DiscoveryEvent)evt).eventNode();
            TcpCommunicationSpi.this.onNodeLeft(node.consistentId(), node.id());
        }

        @Override
        public int order() {
            return 0;
        }
    }

    private class ShmemAcceptWorker
    extends GridWorker {
        private final IpcSharedMemoryServerEndpoint srv;

        ShmemAcceptWorker(IpcSharedMemoryServerEndpoint srv) {
            super(TcpCommunicationSpi.this.igniteInstanceName, "shmem-communication-acceptor", TcpCommunicationSpi.this.log);
            this.srv = srv;
        }

        @Override
        protected void body() throws InterruptedException {
            try {
                while (!Thread.interrupted()) {
                    ShmemWorker e = new ShmemWorker(this.srv.accept());
                    TcpCommunicationSpi.this.shmemWorkers.add(e);
                    new IgniteThread(e).start();
                }
            }
            catch (IgniteCheckedException e) {
                if (!this.isCancelled()) {
                    U.error(this.log, "Shmem server failed.", e);
                }
            }
            finally {
                this.srv.close();
            }
        }

        @Override
        public void cancel() {
            super.cancel();
            this.srv.close();
        }
    }
}

