/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.continuous;

import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.cache.event.CacheEntryUpdatedListener;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.GridComponent;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridMessageListenHandler;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteDeploymentCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
import org.apache.ignite.internal.managers.discovery.CustomEventListener;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler;
import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
import org.apache.ignite.internal.processors.continuous.GridContinuousMessage;
import org.apache.ignite.internal.processors.continuous.GridContinuousMessageType;
import org.apache.ignite.internal.processors.continuous.StartRequestData;
import org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessage;
import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage;
import org.apache.ignite.internal.processors.continuous.StopRoutineAckDiscoveryMessage;
import org.apache.ignite.internal.processors.continuous.StopRoutineDiscoveryMessage;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;

public class GridContinuousProcessor
extends GridProcessorAdapter {
    private final ConcurrentMap<UUID, LocalRoutineInfo> locInfos = new ConcurrentHashMap8<UUID, LocalRoutineInfo>();
    private final ConcurrentMap<UUID, Map<UUID, LocalRoutineInfo>> clientInfos = new ConcurrentHashMap8<UUID, Map<UUID, LocalRoutineInfo>>();
    private final ConcurrentMap<UUID, RemoteRoutineInfo> rmtInfos = new ConcurrentHashMap8<UUID, RemoteRoutineInfo>();
    private final ConcurrentMap<UUID, StartFuture> startFuts = new ConcurrentHashMap8<UUID, StartFuture>();
    private final ConcurrentMap<UUID, StopFuture> stopFuts = new ConcurrentHashMap8<UUID, StopFuture>();
    private final Map<UUID, IgniteThread> bufCheckThreads = new ConcurrentHashMap8<UUID, IgniteThread>();
    public static final IgniteProductVersion QUERY_MSG_VER_2_SINCE = IgniteProductVersion.fromString("1.5.9");
    private final ConcurrentMap<IgniteUuid, SyncMessageAckFuture> syncMsgFuts = new ConcurrentHashMap8<IgniteUuid, SyncMessageAckFuture>();
    private final Collection<UUID> stopped = new HashSet<UUID>();
    private final Lock stopLock = new ReentrantLock();
    private Marshaller marsh;
    private long retryDelay = 1000L;
    private int retryCnt = 3;
    private final ReentrantReadWriteLock processorStopLock = new ReentrantReadWriteLock();
    private boolean processorStopped;
    private final AtomicLong seq = new AtomicLong();

    public GridContinuousProcessor(GridKernalContext ctx) {
        super(ctx);
    }

    @Override
    public void start() throws IgniteCheckedException {
        if (this.ctx.config().isDaemon()) {
            return;
        }
        this.retryDelay = this.ctx.config().getNetworkSendRetryDelay();
        this.retryCnt = this.ctx.config().getNetworkSendRetryCount();
        this.marsh = this.ctx.config().getMarshaller();
        this.ctx.event().addLocalEventListener(new GridLocalEventListener(){

            @Override
            public void onEvent(Event evt) {
                assert (evt instanceof DiscoveryEvent);
                assert (evt.type() == 11 || evt.type() == 12);
                UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
                GridContinuousProcessor.this.clientInfos.remove(nodeId);
                for (Map.Entry e : GridContinuousProcessor.this.rmtInfos.entrySet()) {
                    UUID routineId = (UUID)e.getKey();
                    RemoteRoutineInfo info = (RemoteRoutineInfo)e.getValue();
                    if (!nodeId.equals(info.nodeId)) continue;
                    if (info.autoUnsubscribe) {
                        GridContinuousProcessor.this.unregisterRemote(routineId);
                    }
                    if (!info.hnd.isQuery()) continue;
                    info.hnd.onNodeLeft();
                }
                for (Map.Entry e : GridContinuousProcessor.this.syncMsgFuts.entrySet()) {
                    SyncMessageAckFuture fut0;
                    SyncMessageAckFuture fut = (SyncMessageAckFuture)e.getValue();
                    if (!fut.nodeId().equals(nodeId) || (fut0 = (SyncMessageAckFuture)GridContinuousProcessor.this.syncMsgFuts.remove(e.getKey())) == null) continue;
                    ClusterTopologyCheckedException err = new ClusterTopologyCheckedException("Node left grid while sending message to: " + nodeId);
                    fut0.onDone(err);
                }
            }
        }, 11, 12);
        this.ctx.event().addLocalEventListener(new GridLocalEventListener(){

            @Override
            public void onEvent(Event evt) {
                GridContinuousProcessor.this.cancelFutures(new IgniteCheckedException("Topology segmented"));
            }
        }, 14, new int[0]);
        this.ctx.discovery().setCustomEventListener(StartRoutineDiscoveryMessage.class, new CustomEventListener<StartRoutineDiscoveryMessage>(){

            @Override
            public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, StartRoutineDiscoveryMessage msg) {
                if (!snd.id().equals(GridContinuousProcessor.this.ctx.localNodeId()) && !GridContinuousProcessor.this.ctx.isStopping()) {
                    GridContinuousProcessor.this.processStartRequest(snd, msg);
                }
            }
        });
        this.ctx.discovery().setCustomEventListener(StartRoutineAckDiscoveryMessage.class, new CustomEventListener<StartRoutineAckDiscoveryMessage>(){

            @Override
            public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, StartRoutineAckDiscoveryMessage msg) {
                StartFuture fut = (StartFuture)GridContinuousProcessor.this.startFuts.remove(msg.routineId());
                if (fut != null) {
                    if (msg.errs().isEmpty()) {
                        LocalRoutineInfo routine = (LocalRoutineInfo)GridContinuousProcessor.this.locInfos.get(msg.routineId());
                        if (routine != null && routine.handler().isQuery()) {
                            GridCacheContext cctx;
                            Map<UUID, Map<Integer, Long>> cntrsPerNode = msg.updateCountersPerNode();
                            Map<Integer, Long> cntrs = msg.updateCounters();
                            GridCacheAdapter interCache = GridContinuousProcessor.this.ctx.cache().internalCache(routine.handler().cacheName());
                            GridCacheContext gridCacheContext = cctx = interCache != null ? interCache.context() : null;
                            if (cctx != null && cntrsPerNode != null && !cctx.isLocal() && cctx.affinityNode()) {
                                cntrsPerNode.put(GridContinuousProcessor.this.ctx.localNodeId(), cctx.topology().updateCounters(false));
                            }
                            routine.handler().updateCounters(topVer, cntrsPerNode, cntrs);
                        }
                        fut.onRemoteRegistered();
                    } else {
                        IgniteCheckedException firstEx = F.first(msg.errs().values());
                        fut.onDone(firstEx);
                        GridContinuousProcessor.this.stopRoutine(msg.routineId());
                    }
                }
            }
        });
        this.ctx.discovery().setCustomEventListener(StopRoutineDiscoveryMessage.class, new CustomEventListener<StopRoutineDiscoveryMessage>(){

            @Override
            public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, StopRoutineDiscoveryMessage msg) {
                Map clientInfo;
                if (!snd.id().equals(GridContinuousProcessor.this.ctx.localNodeId())) {
                    UUID routineId = msg.routineId();
                    GridContinuousProcessor.this.unregisterRemote(routineId);
                }
                Iterator i$ = GridContinuousProcessor.this.clientInfos.values().iterator();
                while (i$.hasNext() && (clientInfo = (Map)i$.next()).remove(msg.routineId()) == null) {
                }
            }
        });
        this.ctx.discovery().setCustomEventListener(StopRoutineAckDiscoveryMessage.class, new CustomEventListener<StopRoutineAckDiscoveryMessage>(){

            @Override
            public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, StopRoutineAckDiscoveryMessage msg) {
                StopFuture fut = (StopFuture)GridContinuousProcessor.this.stopFuts.remove(msg.routineId());
                if (fut != null) {
                    fut.onDone();
                }
            }
        });
        this.ctx.io().addMessageListener(GridTopic.TOPIC_CONTINUOUS, new GridMessageListener(){

            @Override
            public void onMessage(UUID nodeId, Object obj) {
                GridContinuousMessage msg = (GridContinuousMessage)obj;
                if (msg.data() == null && msg.dataBytes() != null) {
                    try {
                        msg.data(U.unmarshal(GridContinuousProcessor.this.marsh, msg.dataBytes(), U.resolveClassLoader(GridContinuousProcessor.this.ctx.config())));
                    }
                    catch (IgniteCheckedException e) {
                        U.error(GridContinuousProcessor.this.log, "Failed to process message (ignoring): " + msg, e);
                        return;
                    }
                }
                switch (msg.type()) {
                    case MSG_EVT_NOTIFICATION: {
                        GridContinuousProcessor.this.processNotification(nodeId, msg);
                        break;
                    }
                    case MSG_EVT_ACK: {
                        GridContinuousProcessor.this.processMessageAck(msg);
                        break;
                    }
                    default: {
                        assert (false) : "Unexpected message received: " + (Object)((Object)msg.type());
                        break;
                    }
                }
            }
        });
        this.ctx.marshallerContext().onContinuousProcessorStarted(this.ctx);
        this.ctx.cacheObjects().onContinuousProcessorStarted(this.ctx);
        this.ctx.service().onContinuousProcessorStarted(this.ctx);
        if (this.log.isDebugEnabled()) {
            this.log.debug("Continuous processor started.");
        }
    }

    private void cancelFutures(IgniteCheckedException e) {
        GridFutureAdapter fut;
        Iterator itr = this.startFuts.values().iterator();
        while (itr.hasNext()) {
            fut = (StartFuture)itr.next();
            itr.remove();
            fut.onDone(e);
        }
        itr = this.stopFuts.values().iterator();
        while (itr.hasNext()) {
            fut = (StopFuture)itr.next();
            itr.remove();
            fut.onDone(e);
        }
    }

    public boolean lockStopping() {
        this.processorStopLock.readLock().lock();
        if (this.processorStopped) {
            this.processorStopLock.readLock().unlock();
            return false;
        }
        return true;
    }

    public void unlockStopping() {
        this.processorStopLock.readLock().unlock();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onKernalStop(boolean cancel) {
        this.processorStopLock.writeLock().lock();
        try {
            this.processorStopped = true;
        }
        finally {
            this.processorStopLock.writeLock().unlock();
        }
    }

    @Override
    public void stop(boolean cancel) throws IgniteCheckedException {
        if (this.ctx.config().isDaemon()) {
            return;
        }
        this.ctx.io().removeMessageListener(GridTopic.TOPIC_CONTINUOUS);
        for (IgniteThread thread : this.bufCheckThreads.values()) {
            U.interrupt(thread);
            U.join(thread);
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Continuous processor stopped.");
        }
    }

    @Override
    @Nullable
    public GridComponent.DiscoveryDataExchangeType discoveryDataType() {
        return GridComponent.DiscoveryDataExchangeType.CONTINUOUS_PROC;
    }

    @Override
    @Nullable
    public Serializable collectDiscoveryData(UUID nodeId) {
        if (this.log.isDebugEnabled()) {
            this.log.debug("collectDiscoveryData [node=" + nodeId + ", loc=" + this.ctx.localNodeId() + ", locInfos=" + this.locInfos + ", clientInfos=" + this.clientInfos + ']');
        }
        if (!nodeId.equals(this.ctx.localNodeId()) || !this.locInfos.isEmpty()) {
            HashMap<UUID, Map<UUID, LocalRoutineInfo>> clientInfos0 = U.newHashMap(this.clientInfos.size());
            for (Map.Entry e : this.clientInfos.entrySet()) {
                HashMap cp = U.newHashMap(((Map)e.getValue()).size());
                for (Map.Entry e0 : ((Map)e.getValue()).entrySet()) {
                    cp.put(e0.getKey(), e0.getValue());
                }
                clientInfos0.put((UUID)e.getKey(), cp);
            }
            if (nodeId.equals(this.ctx.localNodeId()) && this.ctx.discovery().localNode().isClient()) {
                HashMap infos = new HashMap();
                for (Map.Entry e : this.locInfos.entrySet()) {
                    infos.put(e.getKey(), e.getValue());
                }
                clientInfos0.put(this.ctx.localNodeId(), infos);
            }
            DiscoveryData data = new DiscoveryData(this.ctx.localNodeId(), clientInfos0);
            for (Map.Entry e : this.locInfos.entrySet()) {
                UUID routineId = (UUID)e.getKey();
                LocalRoutineInfo info = (LocalRoutineInfo)e.getValue();
                data.addItem(new DiscoveryDataItem(routineId, info.prjPred, info.hnd, info.bufSize, info.interval, info.autoUnsubscribe));
            }
            return data;
        }
        return null;
    }

    @Override
    public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Serializable obj) {
        DiscoveryData data = (DiscoveryData)obj;
        if (this.log.isDebugEnabled()) {
            this.log.info("onDiscoveryDataReceived [joining=" + joiningNodeId + ", rmtNodeId=" + rmtNodeId + ", loc=" + this.ctx.localNodeId() + ", data=" + data + ']');
        }
        if (!this.ctx.isDaemon() && data != null) {
            for (DiscoveryDataItem discoveryDataItem : data.items) {
                try {
                    if (discoveryDataItem.prjPred != null) {
                        this.ctx.resource().injectGeneric(discoveryDataItem.prjPred);
                    }
                    if ((discoveryDataItem.prjPred == null || discoveryDataItem.prjPred.apply(this.ctx.discovery().localNode())) && !this.locInfos.containsKey(discoveryDataItem.routineId)) {
                        this.registerHandler(data.nodeId, discoveryDataItem.routineId, discoveryDataItem.hnd, discoveryDataItem.bufSize, discoveryDataItem.interval, discoveryDataItem.autoUnsubscribe, false);
                    }
                    if (discoveryDataItem.autoUnsubscribe) continue;
                    this.locInfos.putIfAbsent(discoveryDataItem.routineId, new LocalRoutineInfo(discoveryDataItem.prjPred, discoveryDataItem.hnd, discoveryDataItem.bufSize, discoveryDataItem.interval, discoveryDataItem.autoUnsubscribe));
                }
                catch (IgniteCheckedException e) {
                    U.error(this.log, "Failed to register continuous handler.", e);
                }
            }
            for (Map.Entry entry : data.clientInfos.entrySet()) {
                HashMap map;
                UUID clientNodeId = (UUID)entry.getKey();
                if (!this.ctx.localNodeId().equals(clientNodeId)) {
                    Map clientRoutineMap = (Map)entry.getValue();
                    for (Map.Entry e : clientRoutineMap.entrySet()) {
                        UUID routineId = (UUID)e.getKey();
                        LocalRoutineInfo info = (LocalRoutineInfo)e.getValue();
                        try {
                            if (info.prjPred != null) {
                                this.ctx.resource().injectGeneric(info.prjPred);
                            }
                            if (info.prjPred != null && !info.prjPred.apply(this.ctx.discovery().localNode())) continue;
                            this.registerHandler(clientNodeId, routineId, info.hnd, info.bufSize, info.interval, info.autoUnsubscribe, false);
                        }
                        catch (IgniteCheckedException err) {
                            U.error(this.log, "Failed to register continuous handler.", err);
                        }
                    }
                }
                if ((map = (HashMap)this.clientInfos.get(entry.getKey())) == null) {
                    map = new HashMap();
                    this.clientInfos.put((UUID)entry.getKey(), map);
                }
                map.putAll((Map)entry.getValue());
            }
        }
    }

    public void onCacheStart(GridCacheContext ctx) throws IgniteCheckedException {
        for (Map.Entry entry : this.rmtInfos.entrySet()) {
            UUID routineId = (UUID)entry.getKey();
            RemoteRoutineInfo rmtInfo = (RemoteRoutineInfo)entry.getValue();
            GridContinuousHandler hnd = rmtInfo.hnd;
            if (!hnd.isQuery() || !F.eq(ctx.name(), hnd.cacheName()) || !rmtInfo.clearDelayedRegister()) continue;
            GridContinuousHandler.RegisterStatus status = hnd.register(rmtInfo.nodeId, routineId, this.ctx);
            assert (status != GridContinuousHandler.RegisterStatus.DELAYED);
        }
    }

    public void onCacheStop(GridCacheContext ctx) {
        Iterator it = this.rmtInfos.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry entry = it.next();
            GridContinuousHandler hnd = ((RemoteRoutineInfo)entry.getValue()).hnd;
            if (!hnd.isQuery() || !F.eq(ctx.name(), hnd.cacheName())) continue;
            it.remove();
        }
    }

    public UUID registerStaticRoutine(String cacheName, CacheEntryUpdatedListener<?, ?> locLsnr, CacheEntryEventSerializableFilter rmtFilter, @Nullable IgnitePredicate<ClusterNode> prjPred) throws IgniteCheckedException {
        String topicPrefix = "CONTINUOUS_QUERY_STATIC_" + cacheName;
        CacheContinuousQueryHandler hnd = new CacheContinuousQueryHandler(cacheName, GridTopic.TOPIC_CACHE.topic(topicPrefix, this.ctx.localNodeId(), this.seq.incrementAndGet()), locLsnr, rmtFilter, true, false, true, false);
        hnd.internal(true);
        UUID routineId = UUID.randomUUID();
        LocalRoutineInfo routineInfo = new LocalRoutineInfo(prjPred, hnd, 1, 0L, true);
        this.locInfos.put(routineId, routineInfo);
        this.registerMessageListener(hnd);
        return routineId;
    }

    public IgniteInternalFuture<UUID> startRoutine(GridContinuousHandler hnd, boolean locOnly, int bufSize, long interval, boolean autoUnsubscribe, @Nullable IgnitePredicate<ClusterNode> prjPred) {
        assert (hnd != null);
        assert (bufSize > 0);
        assert (interval >= 0L);
        UUID routineId = UUID.randomUUID();
        this.locInfos.put(routineId, new LocalRoutineInfo(prjPred, hnd, bufSize, interval, autoUnsubscribe));
        if (locOnly) {
            try {
                this.registerHandler(this.ctx.localNodeId(), routineId, hnd, bufSize, interval, autoUnsubscribe, true);
                return new GridFinishedFuture<UUID>(routineId);
            }
            catch (IgniteCheckedException e) {
                this.unregisterHandler(routineId, hnd, true);
                return new GridFinishedFuture<UUID>(e);
            }
        }
        boolean locIncluded = prjPred == null || prjPred.apply(this.ctx.discovery().localNode());
        StartRequestData reqData = new StartRequestData(prjPred, hnd.clone(), bufSize, interval, autoUnsubscribe);
        try {
            if (this.ctx.config().isPeerClassLoadingEnabled()) {
                if (prjPred != null && !U.isGrid(prjPred.getClass())) {
                    Class<?> cls = U.detectClass(prjPred);
                    String clsName = cls.getName();
                    GridDeployment dep = this.ctx.deploy().deploy(cls, U.detectClassLoader(cls));
                    if (dep == null) {
                        throw new IgniteDeploymentCheckedException("Failed to deploy projection predicate: " + prjPred);
                    }
                    reqData.className(clsName);
                    reqData.deploymentInfo(new GridDeploymentInfoBean(dep));
                    reqData.p2pMarshal(this.marsh);
                }
                reqData.handler().p2pMarshal(this.ctx);
            }
        }
        catch (IgniteCheckedException e) {
            return new GridFinishedFuture<UUID>(e);
        }
        this.registerMessageListener(hnd);
        StartFuture fut = new StartFuture(this.ctx, routineId);
        this.startFuts.put(routineId, fut);
        try {
            if (locIncluded || hnd.isQuery()) {
                this.registerHandler(this.ctx.localNodeId(), routineId, hnd, bufSize, interval, autoUnsubscribe, true);
            }
            this.ctx.discovery().sendCustomEvent(new StartRoutineDiscoveryMessage(routineId, reqData, reqData.handler().keepBinary()));
        }
        catch (IgniteCheckedException e) {
            this.startFuts.remove(routineId);
            this.locInfos.remove(routineId);
            this.unregisterHandler(routineId, hnd, true);
            fut.onDone(e);
            return fut;
        }
        fut.onLocalRegistered();
        return fut;
    }

    private void registerMessageListener(GridContinuousHandler hnd) {
        if (hnd.orderedTopic() != null) {
            this.ctx.io().addMessageListener(hnd.orderedTopic(), new GridMessageListener(){

                @Override
                public void onMessage(UUID nodeId, Object obj) {
                    GridContinuousMessage msg = (GridContinuousMessage)obj;
                    assert (msg.type() == GridContinuousMessageType.MSG_EVT_NOTIFICATION);
                    if (msg.data() == null && msg.dataBytes() != null) {
                        try {
                            msg.data(U.unmarshal(GridContinuousProcessor.this.marsh, msg.dataBytes(), U.resolveClassLoader(GridContinuousProcessor.this.ctx.config())));
                        }
                        catch (IgniteCheckedException e) {
                            U.error(GridContinuousProcessor.this.log, "Failed to process message (ignoring): " + msg, e);
                            return;
                        }
                    }
                    GridContinuousProcessor.this.processNotification(nodeId, msg);
                }
            });
        }
    }

    public IgniteInternalFuture<?> stopRoutine(UUID routineId) {
        assert (routineId != null);
        boolean doStop = false;
        StopFuture fut = (StopFuture)this.stopFuts.get(routineId);
        if (fut == null) {
            fut = new StopFuture(this.ctx);
            StopFuture old = this.stopFuts.putIfAbsent(routineId, fut);
            if (old != null) {
                fut = old;
            } else {
                doStop = true;
            }
        }
        if (doStop) {
            LocalRoutineInfo routine = (LocalRoutineInfo)this.locInfos.remove(routineId);
            if (routine == null) {
                this.stopFuts.remove(routineId);
                fut.onDone();
                return fut;
            }
            this.unregisterHandler(routineId, routine.hnd, true);
            try {
                this.ctx.discovery().sendCustomEvent(new StopRoutineDiscoveryMessage(routineId));
            }
            catch (IgniteCheckedException e) {
                fut.onDone(e);
            }
            if (this.ctx.isStopping()) {
                fut.onDone();
            }
        }
        return fut;
    }

    public void addBackupNotification(UUID nodeId, UUID routineId, Collection<?> objs, @Nullable Object orderedTopic) throws IgniteCheckedException {
        if (this.processorStopped) {
            return;
        }
        RemoteRoutineInfo info = (RemoteRoutineInfo)this.rmtInfos.get(routineId);
        if (info != null) {
            GridContinuousBatch batch = info.addAll(objs);
            Collection<Object> toSnd = batch.collect();
            if (!toSnd.isEmpty()) {
                this.sendNotification(nodeId, routineId, null, toSnd, orderedTopic, true, null);
            }
        } else {
            LocalRoutineInfo localRoutineInfo = (LocalRoutineInfo)this.locInfos.get(routineId);
            if (localRoutineInfo != null) {
                localRoutineInfo.handler().notifyCallback(nodeId, routineId, objs, this.ctx);
            }
        }
    }

    public void addNotification(UUID nodeId, final UUID routineId, @Nullable Object obj, @Nullable Object orderedTopic, boolean sync, boolean msg) throws IgniteCheckedException {
        assert (nodeId != null);
        assert (routineId != null);
        assert (!msg || obj instanceof Message) : obj;
        assert (!nodeId.equals(this.ctx.localNodeId()));
        if (this.processorStopped) {
            return;
        }
        final RemoteRoutineInfo info = (RemoteRoutineInfo)this.rmtInfos.get(routineId);
        if (info != null) {
            assert (info.interval == 0L || !sync);
            if (sync) {
                SyncMessageAckFuture fut = new SyncMessageAckFuture(nodeId);
                IgniteUuid futId = IgniteUuid.randomUuid();
                this.syncMsgFuts.put(futId, fut);
                try {
                    this.sendNotification(nodeId, routineId, futId, F.asList(obj), null, msg, null);
                    info.hnd.onBatchAcknowledged(routineId, info.add(obj), this.ctx);
                }
                catch (IgniteCheckedException e) {
                    this.syncMsgFuts.remove(futId);
                    throw e;
                }
                fut.get();
            } else {
                final GridContinuousBatch batch = info.add(obj);
                if (batch != null) {
                    CI1<IgniteException> ackC = new CI1<IgniteException>(){

                        @Override
                        public void apply(IgniteException e) {
                            if (e == null) {
                                info.hnd.onBatchAcknowledged(routineId, batch, GridContinuousProcessor.this.ctx);
                            }
                        }
                    };
                    this.sendNotification(nodeId, routineId, null, batch.collect(), orderedTopic, msg, (IgniteInClosure<IgniteException>)ackC);
                }
            }
        }
    }

    @Override
    public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
        this.cancelFutures(new IgniteClientDisconnectedCheckedException(reconnectFut, "Client node disconnected."));
        if (this.log.isDebugEnabled()) {
            this.log.debug("onDisconnected [rmtInfos=" + this.rmtInfos + ", locInfos=" + this.locInfos + ", clientInfos=" + this.clientInfos + ']');
        }
        for (Map.Entry e : this.rmtInfos.entrySet()) {
            RemoteRoutineInfo info = (RemoteRoutineInfo)e.getValue();
            if (this.ctx.localNodeId().equals(info.nodeId) && !info.autoUnsubscribe) continue;
            this.unregisterRemote((UUID)e.getKey());
        }
        for (LocalRoutineInfo routine : this.locInfos.values()) {
            routine.hnd.onClientDisconnected();
        }
        this.rmtInfos.clear();
        this.clientInfos.clear();
        if (this.log.isDebugEnabled()) {
            this.log.debug("after onDisconnected [rmtInfos=" + this.rmtInfos + ", locInfos=" + this.locInfos + ", clientInfos=" + this.clientInfos + ']');
        }
    }

    private void sendNotification(UUID nodeId, UUID routineId, @Nullable IgniteUuid futId, Collection<Object> toSnd, @Nullable Object orderedTopic, boolean msg, IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
        assert (nodeId != null);
        assert (routineId != null);
        assert (toSnd != null);
        assert (!toSnd.isEmpty());
        this.sendWithRetries(nodeId, new GridContinuousMessage(GridContinuousMessageType.MSG_EVT_NOTIFICATION, routineId, futId, toSnd, msg), orderedTopic, ackC);
    }

    private void processStartRequest(ClusterNode node, StartRoutineDiscoveryMessage req) {
        GridCacheAdapter cache;
        GridCacheProcessor proc;
        GridContinuousHandler hnd0;
        UUID routineId = req.routineId();
        StartRequestData data = req.startRequestData();
        GridContinuousHandler hnd = data.handler();
        if (req.keepBinary()) {
            assert (hnd instanceof CacheContinuousQueryHandler);
            ((CacheContinuousQueryHandler)hnd).keepBinary(true);
        }
        IgniteCheckedException err = null;
        try {
            if (this.ctx.config().isPeerClassLoadingEnabled()) {
                String clsName = data.className();
                if (clsName != null) {
                    GridDeploymentInfo depInfo = data.deploymentInfo();
                    GridDeployment dep = this.ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName, depInfo.userVersion(), node.id(), depInfo.classLoaderId(), depInfo.participants(), null);
                    if (dep == null) {
                        throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName);
                    }
                    data.p2pUnmarshal(this.marsh, U.resolveClassLoader(dep.classLoader(), this.ctx.config()));
                }
                hnd.p2pUnmarshal(node.id(), this.ctx);
            }
        }
        catch (IgniteCheckedException e) {
            err = e;
            U.error(this.log, "Failed to register handler [nodeId=" + node.id() + ", routineId=" + routineId + ']', e);
        }
        GridContinuousHandler gridContinuousHandler = hnd0 = hnd instanceof GridMessageListenHandler ? new GridMessageListenHandler((GridMessageListenHandler)hnd) : hnd;
        if (node.isClient()) {
            HashMap<UUID, LocalRoutineInfo> clientRoutineMap = (HashMap<UUID, LocalRoutineInfo>)this.clientInfos.get(node.id());
            if (clientRoutineMap == null) {
                clientRoutineMap = new HashMap<UUID, LocalRoutineInfo>();
                Map old = this.clientInfos.put(node.id(), clientRoutineMap);
                assert (old == null);
            }
            clientRoutineMap.put(routineId, new LocalRoutineInfo(data.projectionPredicate(), hnd0, data.bufferSize(), data.interval(), data.autoUnsubscribe()));
        }
        if (err == null) {
            try {
                IgnitePredicate<ClusterNode> prjPred = data.projectionPredicate();
                if (prjPred != null) {
                    this.ctx.resource().injectGeneric(prjPred);
                }
                if ((prjPred == null || prjPred.apply(this.ctx.discovery().node(this.ctx.localNodeId()))) && !this.locInfos.containsKey(routineId)) {
                    this.registerHandler(node.id(), routineId, hnd0, data.bufferSize(), data.interval(), data.autoUnsubscribe(), false);
                }
                if (!data.autoUnsubscribe()) {
                    this.locInfos.putIfAbsent(routineId, new LocalRoutineInfo(prjPred, hnd0, data.bufferSize(), data.interval(), data.autoUnsubscribe()));
                }
            }
            catch (IgniteCheckedException e) {
                err = e;
                U.error(this.log, "Failed to register handler [nodeId=" + node.id() + ", routineId=" + routineId + ']', e);
            }
        }
        if (hnd0.isQuery() && (proc = this.ctx.cache()) != null && (cache = this.ctx.cache().internalCache(hnd0.cacheName())) != null && !cache.isLocal() && cache.context().userCache()) {
            req.addUpdateCounters(this.ctx.localNodeId(), cache.context().topology().updateCounters(false));
        }
        if (err != null) {
            req.addError(this.ctx.localNodeId(), err);
        }
    }

    private void processMessageAck(GridContinuousMessage msg) {
        assert (msg.futureId() != null);
        SyncMessageAckFuture fut = (SyncMessageAckFuture)this.syncMsgFuts.remove(msg.futureId());
        if (fut != null) {
            fut.onDone();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processNotification(UUID nodeId, GridContinuousMessage msg) {
        assert (nodeId != null);
        assert (msg != null);
        UUID routineId = msg.routineId();
        try {
            LocalRoutineInfo routine = (LocalRoutineInfo)this.locInfos.get(routineId);
            if (routine != null) {
                routine.hnd.notifyCallback(nodeId, routineId, (Collection)msg.data(), this.ctx);
            }
        }
        finally {
            if (msg.futureId() != null) {
                try {
                    this.sendWithRetries(nodeId, new GridContinuousMessage(GridContinuousMessageType.MSG_EVT_ACK, null, msg.futureId(), null, false), null, null);
                }
                catch (IgniteCheckedException e) {
                    this.log.error("Failed to send event acknowledgment to node: " + nodeId, e);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean registerHandler(final UUID nodeId, final UUID routineId, final GridContinuousHandler hnd, int bufSize, final long interval, boolean autoUnsubscribe, boolean loc) throws IgniteCheckedException {
        assert (nodeId != null);
        assert (routineId != null);
        assert (hnd != null);
        assert (bufSize > 0);
        assert (interval >= 0L);
        final RemoteRoutineInfo info = new RemoteRoutineInfo(nodeId, hnd, bufSize, interval, autoUnsubscribe);
        boolean doRegister = loc;
        if (!doRegister) {
            this.stopLock.lock();
            try {
                doRegister = !this.stopped.remove(routineId) && this.rmtInfos.putIfAbsent(routineId, info) == null;
            }
            finally {
                this.stopLock.unlock();
            }
        }
        if (doRegister) {
            GridContinuousHandler.RegisterStatus status;
            if (this.log.isDebugEnabled()) {
                this.log.debug("Register handler: [nodeId=" + nodeId + ", routineId=" + routineId + ", info=" + info + ']');
            }
            if (interval > 0L) {
                IgniteThread checker = new IgniteThread(new GridWorker(this.ctx.gridName(), "continuous-buffer-checker", this.log){

                    @Override
                    protected void body() {
                        long interval0 = interval;
                        while (!this.isCancelled()) {
                            try {
                                U.sleep(interval0);
                            }
                            catch (IgniteInterruptedCheckedException ignored) {
                                break;
                            }
                            IgniteBiTuple<GridContinuousBatch, Long> t = info.checkInterval();
                            final GridContinuousBatch batch = t.get1();
                            if (batch != null && batch.size() > 0) {
                                try {
                                    Collection<Object> toSnd = batch.collect();
                                    boolean msg = toSnd.iterator().next() instanceof Message;
                                    CI1<IgniteException> ackC = new CI1<IgniteException>(){

                                        @Override
                                        public void apply(IgniteException e) {
                                            if (e == null) {
                                                info.hnd.onBatchAcknowledged(routineId, batch, GridContinuousProcessor.this.ctx);
                                            }
                                        }
                                    };
                                    GridContinuousProcessor.this.sendNotification(nodeId, routineId, null, toSnd, hnd.orderedTopic(), msg, ackC);
                                }
                                catch (ClusterTopologyCheckedException ignored) {
                                    if (this.log.isDebugEnabled()) {
                                        this.log.debug("Failed to send notification to node (is node alive?): " + nodeId);
                                    }
                                }
                                catch (IgniteCheckedException e) {
                                    U.error(this.log, "Failed to send notification to node: " + nodeId, e);
                                }
                            }
                            interval0 = t.get2();
                        }
                    }
                });
                this.bufCheckThreads.put(routineId, checker);
                checker.start();
            }
            if ((status = hnd.register(nodeId, routineId, this.ctx)) == GridContinuousHandler.RegisterStatus.DELAYED) {
                info.markDelayedRegister();
                return false;
            }
            return status == GridContinuousHandler.RegisterStatus.REGISTERED;
        }
        return false;
    }

    private void unregisterHandler(UUID routineId, GridContinuousHandler hnd, boolean loc) {
        assert (routineId != null);
        assert (hnd != null);
        if (loc && hnd.orderedTopic() != null) {
            this.ctx.io().removeMessageListener(hnd.orderedTopic());
        }
        hnd.unregister(routineId, this.ctx);
        IgniteThread checker = this.bufCheckThreads.remove(routineId);
        if (checker != null) {
            checker.interrupt();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void unregisterRemote(UUID routineId) {
        LocalRoutineInfo loc;
        RemoteRoutineInfo remote;
        this.stopLock.lock();
        try {
            remote = (RemoteRoutineInfo)this.rmtInfos.remove(routineId);
            loc = (LocalRoutineInfo)this.locInfos.remove(routineId);
            if (remote == null) {
                this.stopped.add(routineId);
            }
        }
        finally {
            this.stopLock.unlock();
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("unregisterRemote [routineId=" + routineId + ", loc=" + loc + ", rmt=" + remote + ']');
        }
        if (remote != null) {
            this.unregisterHandler(routineId, remote.hnd, false);
        } else if (loc != null) {
            this.unregisterHandler(routineId, loc.hnd, false);
        }
    }

    private void sendWithRetries(UUID nodeId, GridContinuousMessage msg, @Nullable Object orderedTopic, IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
        assert (nodeId != null);
        assert (msg != null);
        ClusterNode node = this.ctx.discovery().node(nodeId);
        if (node == null) {
            throw new ClusterTopologyCheckedException("Node for provided ID doesn't exist (did it leave the grid?): " + nodeId);
        }
        this.sendWithRetries(node, msg, orderedTopic, ackC);
    }

    private void sendWithRetries(ClusterNode node, GridContinuousMessage msg, @Nullable Object orderedTopic, IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
        assert (node != null);
        assert (msg != null);
        this.sendWithRetries(F.asList(node), msg, orderedTopic, ackC);
    }

    private void sendWithRetries(Collection<? extends ClusterNode> nodes, GridContinuousMessage msg, @Nullable Object orderedTopic, IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
        assert (!F.isEmpty(nodes));
        assert (msg != null);
        if (!(msg.messages() || msg.data() == null || nodes.size() <= 1 && this.ctx.localNodeId().equals(F.first(nodes).id()))) {
            msg.dataBytes(U.marshal(this.marsh, msg.data()));
        }
        block3: for (ClusterNode clusterNode : nodes) {
            int cnt = 0;
            while (cnt <= this.retryCnt) {
                try {
                    ++cnt;
                    if (orderedTopic != null) {
                        this.ctx.io().sendOrderedMessage(clusterNode, orderedTopic, msg, (byte)2, 0L, true, ackC);
                        continue block3;
                    }
                    this.ctx.io().send(clusterNode, GridTopic.TOPIC_CONTINUOUS, (Message)msg, (byte)2, ackC);
                    continue block3;
                }
                catch (IgniteInterruptedCheckedException e) {
                    throw e;
                }
                catch (IgniteCheckedException e) {
                    if (!this.ctx.discovery().alive(clusterNode.id())) {
                        throw new ClusterTopologyCheckedException("Node left grid while sending message to: " + clusterNode.id(), e);
                    }
                    if (cnt == this.retryCnt) {
                        throw e;
                    }
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Failed to send message to node (will retry): " + clusterNode.id());
                    }
                    U.sleep(this.retryDelay);
                }
            }
        }
    }

    private static class SyncMessageAckFuture
    extends GridFutureAdapter<Object> {
        private static final long serialVersionUID = 0L;
        private UUID nodeId;

        SyncMessageAckFuture(UUID nodeId) {
            this.nodeId = nodeId;
        }

        UUID nodeId() {
            return this.nodeId;
        }

        @Override
        public String toString() {
            return S.toString(SyncMessageAckFuture.class, this);
        }
    }

    private static class StopFuture
    extends GridFutureAdapter<Object> {
        private static final long serialVersionUID = 0L;
        private volatile GridTimeoutObject timeoutObj;
        private GridKernalContext ctx;

        StopFuture(GridKernalContext ctx) {
            this.ctx = ctx;
        }

        public void addTimeoutObject(GridTimeoutObject timeoutObj) {
            assert (timeoutObj != null);
            this.timeoutObj = timeoutObj;
            this.ctx.timeout().addTimeoutObject(timeoutObj);
        }

        @Override
        public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
            if (this.timeoutObj != null) {
                this.ctx.timeout().removeTimeoutObject(this.timeoutObj);
            }
            return super.onDone(res, err);
        }

        @Override
        public String toString() {
            return S.toString(StopFuture.class, this);
        }
    }

    private static class StartFuture
    extends GridFutureAdapter<UUID> {
        private static final long serialVersionUID = 0L;
        private GridKernalContext ctx;
        private UUID routineId;
        private volatile boolean loc;
        private volatile boolean rmt;
        private volatile GridTimeoutObject timeoutObj;

        StartFuture(GridKernalContext ctx, UUID routineId) {
            this.ctx = ctx;
            this.routineId = routineId;
        }

        public void onLocalRegistered() {
            this.loc = true;
            if (this.rmt && !this.isDone()) {
                this.onDone(this.routineId);
            }
        }

        public void onRemoteRegistered() {
            this.rmt = true;
            if (this.loc && !this.isDone()) {
                this.onDone(this.routineId);
            }
        }

        public void addTimeoutObject(GridTimeoutObject timeoutObj) {
            assert (timeoutObj != null);
            this.timeoutObj = timeoutObj;
            this.ctx.timeout().addTimeoutObject(timeoutObj);
        }

        @Override
        public boolean onDone(@Nullable UUID res, @Nullable Throwable err) {
            if (this.timeoutObj != null) {
                this.ctx.timeout().removeTimeoutObject(this.timeoutObj);
            }
            return super.onDone(res, err);
        }

        @Override
        public String toString() {
            return S.toString(StartFuture.class, this);
        }
    }

    private static class DiscoveryDataItem
    implements Externalizable {
        private static final long serialVersionUID = 0L;
        private UUID routineId;
        private IgnitePredicate<ClusterNode> prjPred;
        private GridContinuousHandler hnd;
        private int bufSize;
        private long interval;
        private boolean autoUnsubscribe;

        public DiscoveryDataItem() {
        }

        DiscoveryDataItem(UUID routineId, @Nullable IgnitePredicate<ClusterNode> prjPred, GridContinuousHandler hnd, int bufSize, long interval, boolean autoUnsubscribe) {
            assert (routineId != null);
            assert (hnd != null);
            assert (bufSize > 0);
            assert (interval >= 0L);
            this.routineId = routineId;
            this.prjPred = prjPred;
            this.hnd = hnd;
            this.bufSize = bufSize;
            this.interval = interval;
            this.autoUnsubscribe = autoUnsubscribe;
        }

        @Override
        public void writeExternal(ObjectOutput out) throws IOException {
            U.writeUuid(out, this.routineId);
            out.writeObject(this.prjPred);
            out.writeObject(this.hnd);
            out.writeInt(this.bufSize);
            out.writeLong(this.interval);
            out.writeBoolean(this.autoUnsubscribe);
        }

        @Override
        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
            this.routineId = U.readUuid(in);
            this.prjPred = (IgnitePredicate)in.readObject();
            this.hnd = (GridContinuousHandler)in.readObject();
            this.bufSize = in.readInt();
            this.interval = in.readLong();
            this.autoUnsubscribe = in.readBoolean();
        }

        public String toString() {
            return S.toString(DiscoveryDataItem.class, this);
        }
    }

    private static class DiscoveryData
    implements Externalizable {
        private static final long serialVersionUID = 0L;
        private UUID nodeId;
        @GridToStringInclude
        private Collection<DiscoveryDataItem> items;
        private Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos;

        public DiscoveryData() {
        }

        DiscoveryData(UUID nodeId, Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos) {
            assert (nodeId != null);
            this.nodeId = nodeId;
            this.clientInfos = clientInfos;
            this.items = new ArrayList<DiscoveryDataItem>();
        }

        public void addItem(DiscoveryDataItem item) {
            this.items.add(item);
        }

        @Override
        public void writeExternal(ObjectOutput out) throws IOException {
            U.writeUuid(out, this.nodeId);
            U.writeCollection(out, this.items);
            U.writeMap(out, this.clientInfos);
        }

        @Override
        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
            this.nodeId = U.readUuid(in);
            this.items = U.readCollection(in);
            this.clientInfos = U.readMap(in);
        }

        public String toString() {
            return S.toString(DiscoveryData.class, this);
        }
    }

    private static class RemoteRoutineInfo {
        private UUID nodeId;
        private final GridContinuousHandler hnd;
        private final int bufSize;
        private final long interval;
        private final ReadWriteLock lock = new ReentrantReadWriteLock();
        private GridContinuousBatch batch;
        private long lastSndTime = U.currentTimeMillis();
        private boolean autoUnsubscribe;
        private boolean delayedRegister;

        RemoteRoutineInfo(UUID nodeId, GridContinuousHandler hnd, int bufSize, long interval, boolean autoUnsubscribe) {
            assert (nodeId != null);
            assert (hnd != null);
            assert (bufSize > 0);
            assert (interval >= 0L);
            this.nodeId = nodeId;
            this.hnd = hnd;
            this.bufSize = bufSize;
            this.interval = interval;
            this.autoUnsubscribe = autoUnsubscribe;
            this.batch = hnd.createBatch();
        }

        public void markDelayedRegister() {
            assert (this.hnd.isQuery());
            this.delayedRegister = true;
        }

        public boolean clearDelayedRegister() {
            if (this.delayedRegister) {
                this.delayedRegister = false;
                return true;
            }
            return false;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        GridContinuousBatch addAll(Collection<?> objs) {
            assert (objs != null);
            GridContinuousBatch toSnd = null;
            this.lock.writeLock().lock();
            try {
                for (Object obj : objs) {
                    this.batch.add(obj);
                }
                toSnd = this.batch;
                this.batch = this.hnd.createBatch();
                if (this.interval > 0L) {
                    this.lastSndTime = U.currentTimeMillis();
                }
            }
            finally {
                this.lock.writeLock().unlock();
            }
            return toSnd;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        @Nullable
        GridContinuousBatch add(Object obj) {
            assert (obj != null);
            GridContinuousBatch toSnd = null;
            if (this.batch.size() >= this.bufSize - 1) {
                this.lock.writeLock().lock();
                try {
                    this.batch.add(obj);
                    toSnd = this.batch;
                    this.batch = this.hnd.createBatch();
                    if (this.interval <= 0L) return toSnd;
                    this.lastSndTime = U.currentTimeMillis();
                    return toSnd;
                }
                finally {
                    this.lock.writeLock().unlock();
                }
            }
            this.lock.readLock().lock();
            try {
                this.batch.add(obj);
                return toSnd;
            }
            finally {
                this.lock.readLock().unlock();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        IgniteBiTuple<GridContinuousBatch, Long> checkInterval() {
            long diff;
            assert (this.interval > 0L);
            GridContinuousBatch toSnd = null;
            long now = U.currentTimeMillis();
            this.lock.writeLock().lock();
            try {
                diff = now - this.lastSndTime;
                if (diff >= this.interval && this.batch.size() > 0) {
                    toSnd = this.batch;
                    this.batch = this.hnd.createBatch();
                    this.lastSndTime = now;
                }
            }
            finally {
                this.lock.writeLock().unlock();
            }
            return F.t(toSnd, diff < this.interval ? this.interval - diff : this.interval);
        }

        public String toString() {
            return S.toString(RemoteRoutineInfo.class, this);
        }
    }

    static class LocalRoutineInfo
    implements Serializable {
        private static final long serialVersionUID = 0L;
        private final IgnitePredicate<ClusterNode> prjPred;
        private final GridContinuousHandler hnd;
        private final int bufSize;
        private final long interval;
        private boolean autoUnsubscribe;

        LocalRoutineInfo(@Nullable IgnitePredicate<ClusterNode> prjPred, GridContinuousHandler hnd, int bufSize, long interval, boolean autoUnsubscribe) {
            assert (hnd != null);
            assert (bufSize > 0);
            assert (interval >= 0L);
            this.prjPred = prjPred;
            this.hnd = hnd;
            this.bufSize = bufSize;
            this.interval = interval;
            this.autoUnsubscribe = autoUnsubscribe;
        }

        GridContinuousHandler handler() {
            return this.hnd;
        }

        public String toString() {
            return S.toString(LocalRoutineInfo.class, this);
        }
    }
}

