/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.cache.query.continuous;

import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.CacheEntryUpdatedListener;
import javax.cache.event.EventType;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.CacheQueryExecutedEvent;
import org.apache.ignite.events.CacheQueryReadEvent;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteDeploymentCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryBatchAck;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEntry;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEvent;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager;
import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
import org.apache.ignite.internal.processors.continuous.GridContinuousQueryBatch;
import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryFilter;
import org.apache.ignite.internal.util.GridConcurrentSkipListSet;
import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteAsyncCallback;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentLinkedDeque8;

public class CacheContinuousQueryHandler<K, V>
implements GridContinuousHandler {
    private static final long serialVersionUID = 0L;
    private static final int BACKUP_ACK_THRESHOLD = 100;
    private String cacheName;
    private Object topic;
    private transient CacheEntryUpdatedListener<K, V> locLsnr;
    private CacheEntryEventSerializableFilter<K, V> rmtFilter;
    private DeployableObject rmtFilterDep;
    private boolean internal;
    private boolean notifyExisting;
    private boolean oldValRequired;
    private boolean sync;
    private boolean ignoreExpired;
    private int taskHash;
    private transient boolean skipPrimaryCheck;
    private volatile transient Collection<CacheContinuousQueryEntry> backupQueue;
    private boolean locCache;
    private transient boolean keepBinary;
    private transient ConcurrentMap<Integer, PartitionRecovery> rcvs;
    private transient ConcurrentMap<Integer, EntryBuffer> entryBufs;
    private transient AcknowledgeBuffer ackBuf;
    private transient int cacheId;
    private volatile transient Map<Integer, Long> initUpdCntrs;
    private volatile transient Map<UUID, Map<Integer, Long>> initUpdCntrsPerNode;
    private volatile transient AffinityTopologyVersion initTopVer;
    private transient boolean ignoreClsNotFound;
    private transient boolean asyncCallback;
    private transient UUID nodeId;
    private transient UUID routineId;
    private transient GridKernalContext ctx;
    private transient IgniteLogger log;

    public CacheContinuousQueryHandler() {
    }

    public CacheContinuousQueryHandler(String cacheName, Object topic, CacheEntryUpdatedListener<K, V> locLsnr, CacheEntryEventSerializableFilter<K, V> rmtFilter, boolean oldValRequired, boolean sync, boolean ignoreExpired, boolean ignoreClsNotFound) {
        assert (topic != null);
        assert (locLsnr != null);
        this.cacheName = cacheName;
        this.topic = topic;
        this.locLsnr = locLsnr;
        this.rmtFilter = rmtFilter;
        this.oldValRequired = oldValRequired;
        this.sync = sync;
        this.ignoreExpired = ignoreExpired;
        this.ignoreClsNotFound = ignoreClsNotFound;
        this.cacheId = CU.cacheId(cacheName);
    }

    public void internal(boolean internal) {
        this.internal = internal;
    }

    public void notifyExisting(boolean notifyExisting) {
        this.notifyExisting = notifyExisting;
    }

    public void localCache(boolean locCache) {
        this.locCache = locCache;
    }

    public void taskNameHash(int taskHash) {
        this.taskHash = taskHash;
    }

    public void skipPrimaryCheck(boolean skipPrimaryCheck) {
        this.skipPrimaryCheck = skipPrimaryCheck;
    }

    @Override
    public boolean isEvents() {
        return false;
    }

    @Override
    public boolean isMessaging() {
        return false;
    }

    @Override
    public boolean isQuery() {
        return true;
    }

    @Override
    public boolean keepBinary() {
        return this.keepBinary;
    }

    public void keepBinary(boolean keepBinary) {
        this.keepBinary = keepBinary;
    }

    @Override
    public String cacheName() {
        return this.cacheName;
    }

    @Override
    public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, Long>> cntrsPerNode, Map<Integer, Long> cntrs) {
        this.initUpdCntrsPerNode = cntrsPerNode;
        this.initUpdCntrs = cntrs;
        this.initTopVer = topVer;
    }

    @Override
    public GridContinuousHandler.RegisterStatus register(final UUID nodeId, final UUID routineId, final GridKernalContext ctx) throws IgniteCheckedException {
        CacheEntryEventFilter filter;
        assert (nodeId != null);
        assert (routineId != null);
        assert (ctx != null);
        if (this.locLsnr != null) {
            if (this.locLsnr instanceof CacheContinuousQueryManager.JCacheQueryLocalListener) {
                ctx.resource().injectGeneric(((CacheContinuousQueryManager.JCacheQueryLocalListener)this.locLsnr).impl);
                this.asyncCallback = ((CacheContinuousQueryManager.JCacheQueryLocalListener)this.locLsnr).async();
            } else {
                ctx.resource().injectGeneric(this.locLsnr);
                this.asyncCallback = U.hasAnnotation(this.locLsnr, IgniteAsyncCallback.class);
            }
        }
        if ((filter = this.getEventFilter()) != null) {
            if (filter instanceof CacheContinuousQueryManager.JCacheQueryRemoteFilter) {
                if (((CacheContinuousQueryManager.JCacheQueryRemoteFilter)filter).impl != null) {
                    ctx.resource().injectGeneric(((CacheContinuousQueryManager.JCacheQueryRemoteFilter)filter).impl);
                }
                if (!this.asyncCallback) {
                    this.asyncCallback = ((CacheContinuousQueryManager.JCacheQueryRemoteFilter)filter).async();
                }
            } else {
                ctx.resource().injectGeneric(filter);
                if (!this.asyncCallback) {
                    this.asyncCallback = U.hasAnnotation(filter, IgniteAsyncCallback.class);
                }
            }
        }
        this.entryBufs = new ConcurrentHashMap<Integer, EntryBuffer>();
        this.backupQueue = new ConcurrentLinkedDeque8<CacheContinuousQueryEntry>();
        this.ackBuf = new AcknowledgeBuffer();
        this.rcvs = new ConcurrentHashMap<Integer, PartitionRecovery>();
        this.nodeId = nodeId;
        this.routineId = routineId;
        this.ctx = ctx;
        final boolean loc = nodeId.equals(ctx.localNodeId());
        assert (!this.skipPrimaryCheck || loc);
        this.log = ctx.log("org.apache.ignite.continuous.query");
        CacheContinuousQueryListener lsnr = new CacheContinuousQueryListener<K, V>(){

            @Override
            public void onExecution() {
                if (ctx.event().isRecordable(96)) {
                    ctx.event().record(new CacheQueryExecutedEvent(ctx.discovery().localNode(), "Continuous query executed.", 96, CacheQueryType.CONTINUOUS.name(), CacheContinuousQueryHandler.this.cacheName, null, null, null, filter instanceof CacheEntryEventSerializableFilter ? (CacheEntryEventSerializableFilter)filter : null, null, nodeId, this.taskName()));
                }
            }

            @Override
            public boolean keepBinary() {
                return CacheContinuousQueryHandler.this.keepBinary;
            }

            @Override
            public void onEntryUpdated(final CacheContinuousQueryEvent<K, V> evt, boolean primary, final boolean recordIgniteEvt, GridDhtAtomicAbstractUpdateFuture fut) {
                GridCacheContext cctx;
                if (CacheContinuousQueryHandler.this.ignoreExpired && evt.getEventType() == EventType.EXPIRED) {
                    return;
                }
                if (CacheContinuousQueryHandler.this.log.isDebugEnabled()) {
                    CacheContinuousQueryHandler.this.log.debug("Entry updated on affinity node [evt=" + evt + ", primary=" + primary + ']');
                }
                if ((cctx = CacheContinuousQueryHandler.this.cacheContext(ctx)) == null) {
                    return;
                }
                assert (!CacheContinuousQueryHandler.this.skipPrimaryCheck || cctx.isReplicated() && ctx.localNodeId().equals(nodeId));
                if (CacheContinuousQueryHandler.this.asyncCallback) {
                    ContinuousQueryAsyncClosure clsr = new ContinuousQueryAsyncClosure(primary, evt, recordIgniteEvt, fut);
                    ctx.asyncCallbackPool().execute(clsr, evt.partitionId());
                } else {
                    final boolean notify = CacheContinuousQueryHandler.this.filter(evt, primary);
                    if (CacheContinuousQueryHandler.this.log.isDebugEnabled()) {
                        CacheContinuousQueryHandler.this.log.debug("Filter invoked for event [evt=" + evt + ", primary=" + primary + ", notify=" + notify + ']');
                    }
                    if (primary || CacheContinuousQueryHandler.this.skipPrimaryCheck) {
                        if (fut == null) {
                            CacheContinuousQueryHandler.this.onEntryUpdate(evt, notify, loc, recordIgniteEvt);
                        } else {
                            fut.addContinuousQueryClosure(new CI1<Boolean>(){

                                @Override
                                public void apply(Boolean suc) {
                                    if (!suc.booleanValue()) {
                                        evt.entry().markFiltered();
                                    }
                                    CacheContinuousQueryHandler.this.onEntryUpdate(evt, notify, loc, recordIgniteEvt);
                                }
                            });
                        }
                    }
                }
            }

            @Override
            public void onUnregister() {
                if (filter instanceof PlatformContinuousQueryFilter) {
                    ((PlatformContinuousQueryFilter)filter).onQueryUnregister();
                }
            }

            @Override
            public void cleanupBackupQueue(Map<Integer, Long> updateCntrs) {
                Collection backupQueue0 = CacheContinuousQueryHandler.this.backupQueue;
                if (backupQueue0 != null) {
                    Iterator it = backupQueue0.iterator();
                    while (it.hasNext()) {
                        CacheContinuousQueryEntry backupEntry = (CacheContinuousQueryEntry)it.next();
                        Long updateCntr = updateCntrs.get(backupEntry.partition());
                        if (updateCntr == null || backupEntry.updateCounter() > updateCntr) continue;
                        it.remove();
                    }
                }
            }

            @Override
            public void flushBackupQueue(GridKernalContext ctx2, AffinityTopologyVersion topVer) {
                Collection backupQueue0 = CacheContinuousQueryHandler.this.backupQueue;
                if (backupQueue0 == null) {
                    return;
                }
                try {
                    ClusterNode nodeId0 = ctx2.discovery().node(nodeId);
                    if (nodeId0 != null) {
                        GridCacheContext cctx = CacheContinuousQueryHandler.this.cacheContext(ctx2);
                        for (CacheContinuousQueryEntry e : backupQueue0) {
                            if (!e.isFiltered()) {
                                CacheContinuousQueryHandler.this.prepareEntry(cctx, nodeId, e);
                            }
                            e.topologyVersion(topVer);
                        }
                        ctx2.continuous().addBackupNotification(nodeId, routineId, backupQueue0, CacheContinuousQueryHandler.this.topic);
                    } else {
                        CacheContinuousQueryHandler.this.backupQueue = null;
                    }
                    backupQueue0.clear();
                }
                catch (IgniteCheckedException e) {
                    U.error(ctx2.log("org.apache.ignite.continuous.query"), "Failed to send backup event notification to node: " + nodeId, e);
                }
            }

            @Override
            public void acknowledgeBackupOnTimeout(GridKernalContext ctx2) {
                CacheContinuousQueryHandler.this.sendBackupAcknowledge(CacheContinuousQueryHandler.this.ackBuf.acknowledgeOnTimeout(), routineId, ctx2);
            }

            @Override
            public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer, boolean primary) {
                assert (evt != null);
                CacheContinuousQueryEntry e = evt.entry();
                e.markFiltered();
                this.onEntryUpdated(evt, primary, false, null);
            }

            @Override
            public void onPartitionEvicted(int part) {
                Collection backupQueue0 = CacheContinuousQueryHandler.this.backupQueue;
                if (backupQueue0 != null) {
                    Iterator it = backupQueue0.iterator();
                    while (it.hasNext()) {
                        if (((CacheContinuousQueryEntry)it.next()).partition() != part) continue;
                        it.remove();
                    }
                }
            }

            @Override
            public boolean oldValueRequired() {
                return CacheContinuousQueryHandler.this.oldValRequired;
            }

            @Override
            public boolean notifyExisting() {
                return CacheContinuousQueryHandler.this.notifyExisting;
            }

            private String taskName() {
                return ctx.security().enabled() ? ctx.task().resolveTaskName(CacheContinuousQueryHandler.this.taskHash) : null;
            }
        };
        CacheContinuousQueryManager mgr = this.manager(ctx);
        if (mgr == null) {
            return GridContinuousHandler.RegisterStatus.DELAYED;
        }
        return mgr.registerListener(routineId, lsnr, this.internal);
    }

    public CacheEntryEventFilter getEventFilter() {
        return this.rmtFilter;
    }

    private void prepareEntry(GridCacheContext cctx, UUID nodeId, CacheContinuousQueryEntry entry) throws IgniteCheckedException {
        if (cctx.kernalContext().config().isPeerClassLoadingEnabled() && cctx.discovery().node(nodeId) != null) {
            entry.prepareMarshal(cctx);
            cctx.deploy().prepare(entry);
        } else {
            entry.prepareMarshal(cctx);
        }
    }

    public void waitTopologyFuture(GridKernalContext ctx) throws IgniteCheckedException {
        GridCacheContext<K, V> cctx = this.cacheContext(ctx);
        if (!cctx.isLocal()) {
            this.cacheContext(ctx).affinity().affinityReadyFuture(this.initTopVer).get();
            for (int partId = 0; partId < this.cacheContext(ctx).affinity().partitions(); ++partId) {
                this.getOrCreatePartitionRecovery(ctx, partId);
            }
        }
    }

    @Override
    public void unregister(UUID routineId, GridKernalContext ctx) {
        assert (routineId != null);
        assert (ctx != null);
        GridCacheAdapter cache = ctx.cache().internalCache(this.cacheName);
        if (cache != null) {
            cache.context().continuousQueries().unregisterListener(this.internal, routineId);
        }
    }

    private CacheContinuousQueryManager manager(GridKernalContext ctx) {
        GridCacheContext<K, V> cacheCtx = this.cacheContext(ctx);
        return cacheCtx == null ? null : cacheCtx.continuousQueries();
    }

    @Override
    public void notifyCallback(final UUID nodeId, UUID routineId, Collection<?> objs, final GridKernalContext ctx) {
        assert (nodeId != null);
        assert (routineId != null);
        assert (objs != null);
        assert (ctx != null);
        if (objs.isEmpty()) {
            return;
        }
        if (this.asyncCallback) {
            final List<Object> entries = objs instanceof List ? (List<Object>)objs : new ArrayList<CacheContinuousQueryEntry>(objs);
            IgniteStripedThreadPoolExecutor asyncPool = ctx.asyncCallbackPool();
            int threadId = asyncPool.threadId(((CacheContinuousQueryEntry)entries.get(0)).partition());
            int startIdx = 0;
            if (entries.size() != 1) {
                for (int i = 1; i < entries.size(); ++i) {
                    int curThreadId = asyncPool.threadId(((CacheContinuousQueryEntry)entries.get(i)).partition());
                    if (curThreadId == threadId) continue;
                    final int i0 = i;
                    final int startIdx0 = startIdx;
                    asyncPool.execute(new Runnable(){

                        @Override
                        public void run() {
                            CacheContinuousQueryHandler.this.notifyCallback0(nodeId, ctx, entries.subList(startIdx0, i0));
                        }
                    }, threadId);
                    startIdx = i0;
                    threadId = curThreadId;
                }
            }
            final int startIdx0 = startIdx;
            asyncPool.execute(new Runnable(){

                @Override
                public void run() {
                    CacheContinuousQueryHandler.this.notifyCallback0(nodeId, ctx, startIdx0 == 0 ? entries : entries.subList(startIdx0, entries.size()));
                }
            }, threadId);
        } else {
            this.notifyCallback0(nodeId, ctx, objs);
        }
    }

    private void notifyCallback0(UUID nodeId, GridKernalContext ctx, Collection<CacheContinuousQueryEntry> entries) {
        GridCacheContext<K, V> cctx = this.cacheContext(ctx);
        if (cctx == null) {
            IgniteLogger log = ctx.log("org.apache.ignite.continuous.query");
            if (log.isDebugEnabled()) {
                log.debug("Failed to notify callback, cache is not found: " + this.cacheId);
            }
            return;
        }
        ArrayList entries0 = new ArrayList(entries.size());
        for (CacheContinuousQueryEntry e : entries) {
            GridDeploymentInfo depInfo;
            GridCacheDeploymentManager<K, V> depMgr = cctx.deploy();
            ClassLoader ldr = depMgr.globalLoader();
            if (ctx.config().isPeerClassLoadingEnabled() && (depInfo = e.deployInfo()) != null) {
                depMgr.p2pContext(nodeId, depInfo.classLoaderId(), depInfo.userVersion(), depInfo.deployMode(), depInfo.participants(), depInfo.localDeploymentOwner());
            }
            try {
                e.unmarshal(cctx, ldr);
                Collection<CacheEntryEvent<K, V>> evts = this.handleEvent(ctx, e);
                if (evts == null || evts.isEmpty()) continue;
                entries0.addAll(evts);
            }
            catch (IgniteCheckedException ex) {
                if (this.ignoreClsNotFound) {
                    assert (this.internal);
                    continue;
                }
                U.error(ctx.log("org.apache.ignite.continuous.query"), "Failed to unmarshal entry.", ex);
            }
        }
        if (!entries0.isEmpty()) {
            this.locLsnr.onUpdated(entries0);
        }
    }

    private Collection<CacheEntryEvent<? extends K, ? extends V>> handleEvent(GridKernalContext ctx, CacheContinuousQueryEntry e) {
        assert (e != null);
        GridCacheContext<K, V> cctx = this.cacheContext(ctx);
        IgniteCacheProxy cache = cctx.kernalContext().cache().jcache(cctx.name());
        if (this.internal) {
            if (e.isFiltered()) {
                return Collections.emptyList();
            }
            return F.asList(new CacheContinuousQueryEvent(cache, cctx, e));
        }
        if (e.updateCounter() == -1L) {
            return !e.isFiltered() ? F.asList(new CacheContinuousQueryEvent(cache, cctx, e)) : Collections.emptyList();
        }
        PartitionRecovery rec = this.getOrCreatePartitionRecovery(ctx, e.partition());
        return rec.collectEntries(e, cctx, cache);
    }

    public boolean filter(CacheContinuousQueryEvent evt, boolean primary) {
        CacheContinuousQueryEntry entry = evt.entry();
        boolean notify = !entry.isFiltered();
        try {
            if (notify && this.getEventFilter() != null) {
                notify = this.getEventFilter().evaluate(evt);
            }
        }
        catch (Exception e) {
            U.error(this.log, "CacheEntryEventFilter failed: " + e);
        }
        if (!notify) {
            entry.markFiltered();
        }
        if (!primary && !this.internal && entry.updateCounter() != -1L) {
            entry.markBackup();
            Collection<CacheContinuousQueryEntry> backupQueue0 = this.backupQueue;
            if (backupQueue0 != null) {
                backupQueue0.add(entry.forBackupQueue());
            }
        }
        return notify;
    }

    private void onEntryUpdate(CacheContinuousQueryEvent evt, boolean notify, boolean loc, boolean recordIgniteEvt) {
        try {
            GridCacheContext<K, V> cctx = this.cacheContext(this.ctx);
            if (cctx == null) {
                return;
            }
            CacheContinuousQueryEntry entry = evt.entry();
            if (loc) {
                if (!this.locCache) {
                    Collection evts = this.handleEvent(this.ctx, entry);
                    if (!evts.isEmpty()) {
                        this.locLsnr.onUpdated(evts);
                    }
                    if (!this.internal && !this.skipPrimaryCheck) {
                        this.sendBackupAcknowledge(this.ackBuf.onAcknowledged(entry), this.routineId, this.ctx);
                    }
                } else if (!entry.isFiltered()) {
                    this.locLsnr.onUpdated(F.asList(evt));
                }
            } else {
                CacheContinuousQueryEntry e;
                if (!entry.isFiltered()) {
                    this.prepareEntry(cctx, this.nodeId, entry);
                }
                if ((e = this.handleEntry(entry)) != null) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Send the following event to listener: " + e);
                    }
                    this.ctx.continuous().addNotification(this.nodeId, this.routineId, entry, this.topic, this.sync, true);
                }
            }
        }
        catch (ClusterTopologyCheckedException ex) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Failed to send event notification to node, node left cluster [node=" + this.nodeId + ", err=" + ex + ']');
            }
        }
        catch (IgniteCheckedException ex) {
            U.error(this.ctx.log("org.apache.ignite.continuous.query"), "Failed to send event notification to node: " + this.nodeId, ex);
        }
        if (recordIgniteEvt && notify) {
            this.ctx.event().record(new CacheQueryReadEvent(this.ctx.discovery().localNode(), "Continuous query executed.", 97, CacheQueryType.CONTINUOUS.name(), this.cacheName, null, null, null, this.getEventFilter() instanceof CacheEntryEventSerializableFilter ? (CacheEntryEventSerializableFilter)this.getEventFilter() : null, null, this.nodeId, this.taskName(), evt.getKey(), evt.getValue(), evt.getOldValue(), null));
        }
    }

    private String taskName() {
        return this.ctx.security().enabled() ? this.ctx.task().resolveTaskName(this.taskHash) : null;
    }

    @Override
    public void onClientDisconnected() {
        if (this.internal) {
            return;
        }
        for (PartitionRecovery rec : this.rcvs.values()) {
            rec.resetTopologyCache();
        }
    }

    @NotNull
    private PartitionRecovery getOrCreatePartitionRecovery(GridKernalContext ctx, int partId) {
        PartitionRecovery rec = (PartitionRecovery)this.rcvs.get(partId);
        if (rec == null) {
            Long partCntr = null;
            AffinityTopologyVersion initTopVer0 = this.initTopVer;
            if (initTopVer0 != null) {
                GridCacheContext<K, V> cctx = this.cacheContext(ctx);
                GridCacheAffinityManager aff = cctx.affinity();
                if (this.initUpdCntrsPerNode != null) {
                    for (ClusterNode node : aff.nodesByPartition(partId, this.initTopVer)) {
                        Map<Integer, Long> map = this.initUpdCntrsPerNode.get(node.id());
                        if (map == null) continue;
                        partCntr = map.get(partId);
                        break;
                    }
                } else if (this.initUpdCntrs != null) {
                    partCntr = this.initUpdCntrs.get(partId);
                }
            }
            rec = new PartitionRecovery(ctx.log("org.apache.ignite.continuous.query"), initTopVer0, partCntr);
            PartitionRecovery oldRec = this.rcvs.putIfAbsent(partId, rec);
            if (oldRec != null) {
                rec = oldRec;
            }
        }
        return rec;
    }

    private CacheContinuousQueryEntry handleEntry(CacheContinuousQueryEntry e) {
        assert (e != null);
        assert (this.entryBufs != null);
        if (this.internal) {
            if (e.isFiltered()) {
                return null;
            }
            return e;
        }
        if (e.updateCounter() == -1L) {
            return e;
        }
        EntryBuffer buf = (EntryBuffer)this.entryBufs.get(e.partition());
        if (buf == null) {
            buf = new EntryBuffer();
            EntryBuffer oldRec = this.entryBufs.putIfAbsent(e.partition(), buf);
            if (oldRec != null) {
                buf = oldRec;
            }
        }
        return buf.handle(e);
    }

    @Override
    public void onNodeLeft() {
        Collection<CacheContinuousQueryEntry> backupQueue0 = this.backupQueue;
        if (backupQueue0 != null) {
            this.backupQueue = null;
        }
    }

    @Override
    public void p2pMarshal(GridKernalContext ctx) throws IgniteCheckedException {
        assert (ctx != null);
        assert (ctx.config().isPeerClassLoadingEnabled());
        if (this.rmtFilter != null && !U.isGrid(this.rmtFilter.getClass())) {
            this.rmtFilterDep = new DeployableObject(this.rmtFilter, ctx);
        }
    }

    @Override
    public void p2pUnmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException {
        assert (nodeId != null);
        assert (ctx != null);
        assert (ctx.config().isPeerClassLoadingEnabled());
        if (this.rmtFilterDep != null) {
            this.rmtFilter = (CacheEntryEventSerializableFilter)this.rmtFilterDep.unmarshal(nodeId, ctx);
        }
    }

    @Override
    public GridContinuousBatch createBatch() {
        return new GridContinuousQueryBatch();
    }

    @Override
    public void onBatchAcknowledged(UUID routineId, GridContinuousBatch batch, GridKernalContext ctx) {
        this.sendBackupAcknowledge(this.ackBuf.onAcknowledged(batch), routineId, ctx);
    }

    private void sendBackupAcknowledge(final IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> t, final UUID routineId, final GridKernalContext ctx) {
        if (t != null) {
            ctx.closure().runLocalSafe(new Runnable(){

                @Override
                public void run() {
                    GridCacheContext cctx = CacheContinuousQueryHandler.this.cacheContext(ctx);
                    CacheContinuousQueryBatchAck msg = new CacheContinuousQueryBatchAck(cctx.cacheId(), routineId, (Map)t.get1());
                    for (AffinityTopologyVersion topVer : (Set)t.get2()) {
                        for (ClusterNode node : ctx.discovery().cacheAffinityNodes(cctx.name(), topVer)) {
                            IgniteLogger log;
                            if (node.isLocal() || node.version().compareTo(CacheContinuousQueryBatchAck.SINCE_VER) < 0) continue;
                            try {
                                cctx.io().send(node, (GridCacheMessage)msg, (byte)2);
                            }
                            catch (ClusterTopologyCheckedException ignored) {
                                log = ctx.log("org.apache.ignite.continuous.query");
                                if (!log.isDebugEnabled()) continue;
                                log.debug("Failed to send acknowledge message, node left [msg=" + msg + ", node=" + node + ']');
                            }
                            catch (IgniteCheckedException e) {
                                log = ctx.log("org.apache.ignite.continuous.query");
                                U.error(log, "Failed to send acknowledge message [msg=" + msg + ", node=" + node + ']', e);
                            }
                        }
                    }
                }
            });
        }
    }

    @Override
    @Nullable
    public Object orderedTopic() {
        return this.topic;
    }

    @Override
    public GridContinuousHandler clone() {
        try {
            return (GridContinuousHandler)super.clone();
        }
        catch (CloneNotSupportedException e) {
            throw new IllegalStateException(e);
        }
    }

    public String toString() {
        return S.toString(CacheContinuousQueryHandler.class, this);
    }

    @Override
    public void writeExternal(ObjectOutput out) throws IOException {
        U.writeString(out, this.cacheName);
        out.writeObject(this.topic);
        boolean b = this.rmtFilterDep != null;
        out.writeBoolean(b);
        if (b) {
            out.writeObject(this.rmtFilterDep);
        } else {
            out.writeObject(this.rmtFilter);
        }
        out.writeBoolean(this.internal);
        out.writeBoolean(this.notifyExisting);
        out.writeBoolean(this.oldValRequired);
        out.writeBoolean(this.sync);
        out.writeBoolean(this.ignoreExpired);
        out.writeInt(this.taskHash);
    }

    @Override
    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
        this.cacheName = U.readString(in);
        this.topic = in.readObject();
        boolean b = in.readBoolean();
        if (b) {
            this.rmtFilterDep = (DeployableObject)in.readObject();
        } else {
            this.rmtFilter = (CacheEntryEventSerializableFilter)in.readObject();
        }
        this.internal = in.readBoolean();
        this.notifyExisting = in.readBoolean();
        this.oldValRequired = in.readBoolean();
        this.sync = in.readBoolean();
        this.ignoreExpired = in.readBoolean();
        this.taskHash = in.readInt();
        this.cacheId = CU.cacheId(this.cacheName);
    }

    private GridCacheContext<K, V> cacheContext(GridKernalContext ctx) {
        assert (ctx != null);
        return ctx.cache().context().cacheContext(this.cacheId);
    }

    protected static class DeployableObject
    implements Externalizable {
        private static final long serialVersionUID = 0L;
        private byte[] bytes;
        private String clsName;
        private GridDeploymentInfo depInfo;

        public DeployableObject() {
        }

        protected DeployableObject(Object obj, GridKernalContext ctx) throws IgniteCheckedException {
            assert (obj != null);
            assert (ctx != null);
            Class<?> cls = U.detectClass(obj);
            this.clsName = cls.getName();
            GridDeployment dep = ctx.deploy().deploy(cls, U.detectClassLoader(cls));
            if (dep == null) {
                throw new IgniteDeploymentCheckedException("Failed to deploy object: " + obj);
            }
            this.depInfo = new GridDeploymentInfoBean(dep);
            this.bytes = U.marshal(ctx, obj);
        }

        <T> T unmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException {
            assert (ctx != null);
            GridDeployment dep = ctx.deploy().getGlobalDeployment(this.depInfo.deployMode(), this.clsName, this.clsName, this.depInfo.userVersion(), nodeId, this.depInfo.classLoaderId(), this.depInfo.participants(), null);
            if (dep == null) {
                throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + this.clsName);
            }
            return U.unmarshal(ctx, this.bytes, U.resolveClassLoader(dep.classLoader(), ctx.config()));
        }

        @Override
        public void writeExternal(ObjectOutput out) throws IOException {
            U.writeByteArray(out, this.bytes);
            U.writeString(out, this.clsName);
            out.writeObject(this.depInfo);
        }

        @Override
        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
            this.bytes = U.readByteArray(in);
            this.clsName = U.readString(in);
            this.depInfo = (GridDeploymentInfo)in.readObject();
        }
    }

    private class ContinuousQueryAsyncClosure
    implements Runnable {
        private final CacheContinuousQueryEvent<K, V> evt;
        private final boolean primary;
        private final boolean recordIgniteEvt;
        private final IgniteInternalFuture<?> fut;

        ContinuousQueryAsyncClosure(boolean primary, CacheContinuousQueryEvent<K, V> evt, boolean recordIgniteEvt, IgniteInternalFuture<?> fut) {
            this.primary = primary;
            this.evt = evt;
            this.recordIgniteEvt = recordIgniteEvt;
            this.fut = fut;
        }

        @Override
        public void run() {
            final boolean notify = CacheContinuousQueryHandler.this.filter(this.evt, this.primary);
            if (!this.primary()) {
                return;
            }
            if (this.fut == null) {
                CacheContinuousQueryHandler.this.onEntryUpdate(this.evt, notify, CacheContinuousQueryHandler.this.nodeId.equals(CacheContinuousQueryHandler.this.ctx.localNodeId()), this.recordIgniteEvt);
                return;
            }
            if (this.fut.isDone()) {
                if (this.fut.error() != null) {
                    this.evt.entry().markFiltered();
                }
                CacheContinuousQueryHandler.this.onEntryUpdate(this.evt, notify, CacheContinuousQueryHandler.this.nodeId.equals(CacheContinuousQueryHandler.this.ctx.localNodeId()), this.recordIgniteEvt);
            } else {
                this.fut.listen(new CI1<IgniteInternalFuture<?>>(){

                    @Override
                    public void apply(IgniteInternalFuture<?> f) {
                        if (f.error() != null) {
                            ContinuousQueryAsyncClosure.this.evt.entry().markFiltered();
                        }
                        CacheContinuousQueryHandler.this.ctx.asyncCallbackPool().execute(new Runnable(){

                            @Override
                            public void run() {
                                CacheContinuousQueryHandler.this.onEntryUpdate(ContinuousQueryAsyncClosure.this.evt, notify, CacheContinuousQueryHandler.this.nodeId.equals(CacheContinuousQueryHandler.this.ctx.localNodeId()), ContinuousQueryAsyncClosure.this.recordIgniteEvt);
                            }
                        }, ContinuousQueryAsyncClosure.this.evt.entry().partition());
                    }
                });
            }
        }

        private boolean primary() {
            return this.primary || CacheContinuousQueryHandler.this.skipPrimaryCheck;
        }

        public String toString() {
            return S.toString(ContinuousQueryAsyncClosure.class, this);
        }
    }

    private static class AcknowledgeBuffer {
        private int size;
        @GridToStringInclude
        private Map<Integer, Long> updateCntrs = new HashMap<Integer, Long>();
        @GridToStringInclude
        private Set<AffinityTopologyVersion> topVers = U.newHashSet(1);

        private AcknowledgeBuffer() {
        }

        @Nullable
        synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> onAcknowledged(GridContinuousBatch batch) {
            assert (batch instanceof GridContinuousQueryBatch);
            this.size += ((GridContinuousQueryBatch)batch).entriesCount();
            Collection<Object> entries = batch.collect();
            for (CacheContinuousQueryEntry cacheContinuousQueryEntry : entries) {
                this.addEntry(cacheContinuousQueryEntry);
            }
            return this.size >= 100 ? this.acknowledgeData() : null;
        }

        @Nullable
        synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> onAcknowledged(CacheContinuousQueryEntry e) {
            ++this.size;
            this.addEntry(e);
            return this.size >= 100 ? this.acknowledgeData() : null;
        }

        private void addEntry(CacheContinuousQueryEntry e) {
            this.topVers.add(e.topologyVersion());
            Long cntr0 = this.updateCntrs.get(e.partition());
            if (cntr0 == null || e.updateCounter() > cntr0) {
                this.updateCntrs.put(e.partition(), e.updateCounter());
            }
        }

        @Nullable
        synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> acknowledgeOnTimeout() {
            return this.size > 0 ? this.acknowledgeData() : null;
        }

        private IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> acknowledgeData() {
            assert (this.size > 0);
            HashMap<Integer, Long> cntrs = new HashMap<Integer, Long>(this.updateCntrs);
            IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> res = new IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>>(cntrs, this.topVers);
            this.topVers = U.newHashSet(1);
            this.size = 0;
            return res;
        }

        public String toString() {
            return S.toString(AcknowledgeBuffer.class, this);
        }
    }

    private static class EntryBuffer {
        private static final int MAX_BUFF_SIZE = 100;
        private final GridConcurrentSkipListSet<Long> buf = new GridConcurrentSkipListSet();
        private AtomicLong lastFiredCntr = new AtomicLong();

        private EntryBuffer() {
        }

        private long updateFiredCounter(long newVal) {
            long prevVal = this.lastFiredCntr.get();
            while (prevVal < newVal) {
                if (this.lastFiredCntr.compareAndSet(prevVal, newVal)) {
                    return prevVal;
                }
                prevVal = this.lastFiredCntr.get();
            }
            return prevVal >= newVal ? -1L : prevVal;
        }

        public CacheContinuousQueryEntry handle(CacheContinuousQueryEntry e) {
            Long cntr;
            assert (e != null);
            if (e.isFiltered()) {
                Long last = this.buf.lastx();
                Long first = this.buf.firstx();
                if (last != null && first != null && last - first >= 100L) {
                    Long cntr2;
                    NavigableSet<Long> prevHoles = this.buf.subSet(first, true, last, true);
                    GridLongList filteredEvts = new GridLongList((int)(last - first));
                    int size = 0;
                    while ((cntr2 = prevHoles.pollFirst()) != null) {
                        filteredEvts.add(cntr2);
                        ++size;
                    }
                    filteredEvts.truncate(size, true);
                    e.filteredEvents(filteredEvts);
                    return e;
                }
                if (this.lastFiredCntr.get() > e.updateCounter() || e.updateCounter() == 1L) {
                    return e;
                }
                this.buf.add(e.updateCounter());
                if (this.lastFiredCntr.get() > e.updateCounter() && this.buf.contains(e.updateCounter())) {
                    this.buf.remove(e.updateCounter());
                    return e;
                }
                return null;
            }
            long prevVal = this.updateFiredCounter(e.updateCounter());
            if (prevVal == -1L) {
                return e;
            }
            NavigableSet<Long> prevHoles = this.buf.subSet(prevVal, true, e.updateCounter(), true);
            GridLongList filteredEvts = new GridLongList((int)(e.updateCounter() - prevVal));
            int size = 0;
            while ((cntr = prevHoles.pollFirst()) != null) {
                filteredEvts.add(cntr);
                ++size;
            }
            filteredEvts.truncate(size, true);
            e.filteredEvents(filteredEvts);
            return e;
        }
    }

    private static class PartitionRecovery {
        private static final CacheContinuousQueryEntry HOLE = new CacheContinuousQueryEntry();
        private static final int MAX_BUFF_SIZE = 100;
        private IgniteLogger log;
        private long lastFiredEvt;
        private AffinityTopologyVersion curTop = AffinityTopologyVersion.NONE;
        private final Map<Long, CacheContinuousQueryEntry> pendingEvts = new TreeMap<Long, CacheContinuousQueryEntry>();

        PartitionRecovery(IgniteLogger log, AffinityTopologyVersion topVer, @Nullable Long initCntr) {
            this.log = log;
            if (initCntr != null) {
                assert (topVer.topologyVersion() > 0L) : topVer;
                this.lastFiredEvt = initCntr;
                this.curTop = topVer;
            }
        }

        void resetTopologyCache() {
            this.curTop = AffinityTopologyVersion.NONE;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        <K, V> Collection<CacheEntryEvent<? extends K, ? extends V>> collectEntries(CacheContinuousQueryEntry entry, GridCacheContext cctx, IgniteCache cache) {
            ArrayList<CacheEntryEvent<K, V>> entries;
            assert (entry != null);
            if (entry.topologyVersion() == null) {
                assert (entry.updateCounter() == 0L) : entry;
                return F.asList(new CacheContinuousQueryEvent(cache, cctx, entry));
            }
            Map<Long, CacheContinuousQueryEntry> map = this.pendingEvts;
            synchronized (map) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Handling event [lastFiredEvt=" + this.lastFiredEvt + ", curTop=" + this.curTop + ", entUpdCnt=" + entry.updateCounter() + ", partId=" + entry.partition() + ", pendingEvts=" + this.pendingEvts + ']');
                }
                if (this.curTop == AffinityTopologyVersion.NONE) {
                    this.lastFiredEvt = entry.updateCounter();
                    this.curTop = entry.topologyVersion();
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("First event [lastFiredEvt=" + this.lastFiredEvt + ", curTop=" + this.curTop + ", entUpdCnt=" + entry.updateCounter() + ", partId=" + entry.partition() + ']');
                    }
                    return !entry.isFiltered() ? F.asList(new CacheContinuousQueryEvent(cache, cctx, entry)) : Collections.emptyList();
                }
                if (this.curTop.compareTo(entry.topologyVersion()) < 0) {
                    if (entry.updateCounter() == 1L && !entry.isBackup()) {
                        ArrayList<CacheEntryEvent<K, V>> entries2 = new ArrayList<CacheEntryEvent<K, V>>(this.pendingEvts.size());
                        for (CacheContinuousQueryEntry evt : this.pendingEvts.values()) {
                            if (evt == HOLE || evt.isFiltered()) continue;
                            entries2.add(new CacheContinuousQueryEvent(cache, cctx, evt));
                        }
                        this.pendingEvts.clear();
                        this.curTop = entry.topologyVersion();
                        this.lastFiredEvt = entry.updateCounter();
                        if (!entry.isFiltered()) {
                            entries2.add(new CacheContinuousQueryEvent(cache, cctx, entry));
                        }
                        if (this.log.isDebugEnabled()) {
                            this.log.debug("Partition was lost [lastFiredEvt=" + this.lastFiredEvt + ", curTop=" + this.curTop + ", entUpdCnt=" + entry.updateCounter() + ", partId=" + entry.partition() + ", pendingEvts=" + this.pendingEvts + ']');
                        }
                        return entries2;
                    }
                    this.curTop = entry.topologyVersion();
                }
                if (entry.updateCounter() > this.lastFiredEvt) {
                    this.pendingEvts.put(entry.updateCounter(), entry);
                    if (entry.filteredEvents() != null) {
                        for (long cnrt : entry.filteredEvents()) {
                            if (cnrt <= this.lastFiredEvt) continue;
                            this.pendingEvts.put(cnrt, HOLE);
                        }
                    }
                } else {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Skip duplicate continuous query message: " + entry);
                    }
                    return Collections.emptyList();
                }
                if (this.pendingEvts.isEmpty()) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Nothing sent to listener [lastFiredEvt=" + this.lastFiredEvt + ", curTop=" + this.curTop + ", entUpdCnt=" + entry.updateCounter() + ", partId=" + entry.partition() + ']');
                    }
                    return Collections.emptyList();
                }
                Iterator<Map.Entry<Long, CacheContinuousQueryEntry>> iter = this.pendingEvts.entrySet().iterator();
                entries = new ArrayList<CacheEntryEvent<K, V>>();
                if (this.pendingEvts.size() >= 100) {
                    for (int i = 0; i < 90; ++i) {
                        Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next();
                        if (e.getValue() != HOLE && !e.getValue().isFiltered()) {
                            entries.add(new CacheContinuousQueryEvent(cache, cctx, e.getValue()));
                        }
                        this.lastFiredEvt = e.getKey();
                        iter.remove();
                    }
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Pending events reached max of buffer size [lastFiredEvt=" + this.lastFiredEvt + ", curTop=" + this.curTop + ", entUpdCnt=" + entry.updateCounter() + ", partId=" + entry.partition() + ", pendingEvts=" + this.pendingEvts + ']');
                    }
                } else {
                    Map.Entry<Long, CacheContinuousQueryEntry> e;
                    while (iter.hasNext() && (e = iter.next()).getKey() == this.lastFiredEvt + 1L) {
                        ++this.lastFiredEvt;
                        if (e.getValue() != HOLE && !e.getValue().isFiltered()) {
                            entries.add(new CacheContinuousQueryEvent(cache, cctx, e.getValue()));
                        }
                        iter.remove();
                    }
                }
            }
            if (this.log.isDebugEnabled()) {
                this.log.debug("Will send to listener the following events [entries=" + entries + ", lastFiredEvt=" + this.lastFiredEvt + ", curTop=" + this.curTop + ", entUpdCnt=" + entry.updateCounter() + ", partId=" + entry.partition() + ", pendingEvts=" + this.pendingEvts + ']');
            }
            return entries;
        }
    }
}

