/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.managers.eventstorage;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteDeploymentCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.managers.GridManagerAdapter;
import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageMessage;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener;
import org.apache.ignite.internal.util.GridConcurrentLinkedHashSet;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.GPR;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.security.SecurityPermission;
import org.apache.ignite.spi.IgniteSpi;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.eventstorage.EventStorageSpi;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;

public class GridEventStorageManager
extends GridManagerAdapter<EventStorageSpi> {
    private final ConcurrentMap<Integer, Set<GridLocalEventListener>> lsnrs = new ConcurrentHashMap8<Integer, Set<GridLocalEventListener>>();
    private final ReadWriteLock busyLock = new ReentrantReadWriteLock();
    private final boolean isDaemon;
    private final int len;
    private final Marshaller marsh;
    private RequestListener msgLsnr;
    private final int[] cfgInclEvtTypes;
    private volatile int[] inclEvtTypes;
    private volatile boolean[] recordableEvts;
    private volatile boolean[] userRecordableEvts;

    public GridEventStorageManager(GridKernalContext ctx) {
        super(ctx, (IgniteSpi[])new EventStorageSpi[]{ctx.config().getEventStorageSpi()});
        this.marsh = ctx.config().getMarshaller();
        this.isDaemon = ctx.isDaemon();
        int[] cfgInclEvtTypes0 = ctx.config().getIncludeEventTypes();
        if (F.isEmpty(cfgInclEvtTypes0)) {
            this.cfgInclEvtTypes = U.EMPTY_INTS;
        } else {
            cfgInclEvtTypes0 = this.copy(cfgInclEvtTypes0);
            Arrays.sort(cfgInclEvtTypes0);
            if (cfgInclEvtTypes0[0] < 0) {
                throw new IllegalArgumentException("Invalid event type: " + cfgInclEvtTypes0[0]);
            }
            this.cfgInclEvtTypes = this.compact(cfgInclEvtTypes0, cfgInclEvtTypes0.length);
        }
        int maxIdx = 0;
        for (int type : EventType.EVTS_ALL) {
            if (type <= maxIdx) continue;
            maxIdx = type;
        }
        assert (maxIdx <= 1000) : "Invalid max index: " + maxIdx;
        this.len = maxIdx + 1;
        boolean[] recordableEvts = new boolean[this.len];
        boolean[] userRecordableEvts = new boolean[this.len];
        HashSet<Integer> inclEvtTypes0 = new HashSet<Integer>(U.toIntList(this.cfgInclEvtTypes, new IgnitePredicate[0]));
        for (int type : EventType.EVTS_ALL) {
            boolean userRecordable = inclEvtTypes0.remove(type);
            if (userRecordable) {
                userRecordableEvts[type] = true;
            }
            if (this.isInternalEvent(type) || userRecordable) {
                recordableEvts[type] = true;
            }
            if (!this.log.isDebugEnabled()) continue;
            this.log.debug("Event recordable status [type=" + U.gridEventName(type) + ", recordable=" + recordableEvts[type] + ", userRecordable=" + userRecordableEvts[type] + ']');
        }
        this.recordableEvts = recordableEvts;
        this.userRecordableEvts = userRecordableEvts;
        int[] inclEvtTypes = U.toIntArray(inclEvtTypes0);
        Arrays.sort(inclEvtTypes);
        this.inclEvtTypes = inclEvtTypes;
    }

    @Override
    public void printMemoryStats() {
        int lsnrsCnt = 0;
        for (Set lsnrs0 : this.lsnrs.values()) {
            lsnrsCnt += lsnrs0.size();
        }
        X.println(">>>", new Object[0]);
        X.println(">>> Event storage manager memory stats [grid=" + this.ctx.gridName() + ']', new Object[0]);
        X.println(">>>  Total listeners: " + lsnrsCnt, new Object[0]);
        X.println(">>>  Recordable events size: " + this.recordableEvts.length, new Object[0]);
        X.println(">>>  User recordable events size: " + this.userRecordableEvts.length, new Object[0]);
    }

    private boolean enterBusy() {
        return this.busyLock.readLock().tryLock();
    }

    private void leaveBusy() {
        this.busyLock.readLock().unlock();
    }

    @Override
    public void onKernalStop0(boolean cancel) {
        this.busyLock.writeLock().lock();
        if (this.msgLsnr != null) {
            this.ctx.io().removeMessageListener(GridTopic.TOPIC_EVENT, (GridMessageListener)this.msgLsnr);
        }
        this.msgLsnr = null;
        this.lsnrs.clear();
    }

    @Override
    public void stop(boolean cancel) throws IgniteCheckedException {
        this.stopSpi();
        if (this.log.isDebugEnabled()) {
            this.log.debug(this.stopInfo());
        }
    }

    @Override
    public void start() throws IgniteCheckedException {
        Map<IgnitePredicate<? extends Event>, int[]> evtLsnrs = this.ctx.config().getLocalEventListeners();
        if (evtLsnrs != null) {
            for (IgnitePredicate<? extends Event> lsnr : evtLsnrs.keySet()) {
                this.addLocalEventListener(lsnr, evtLsnrs.get(lsnr));
            }
        }
        this.startSpi();
        this.msgLsnr = new RequestListener();
        this.ctx.io().addMessageListener(GridTopic.TOPIC_EVENT, (GridMessageListener)this.msgLsnr);
        if (this.log.isDebugEnabled()) {
            this.log.debug(this.startInfo());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void record(Event evt) {
        assert (evt != null);
        if (!this.enterBusy()) {
            return;
        }
        try {
            int type = evt.type();
            if (!this.isRecordable(type)) {
                LT.warn(this.log, "Trying to record event without checking if it is recordable: " + U.gridEventName(type));
            }
            if ((this.isDaemon || this.isUserRecordable(type)) && !this.isHiddenEvent(type)) {
                try {
                    ((EventStorageSpi)this.getSpi()).record(evt);
                }
                catch (IgniteSpiException e) {
                    U.error(this.log, "Failed to record event: " + evt, e);
                }
            }
            if (this.isRecordable(type)) {
                this.notifyListeners(evt);
            }
        }
        finally {
            this.leaveBusy();
        }
    }

    public int[] enabledEvents() {
        boolean[] userRecordableEvts0 = this.userRecordableEvts;
        int[] enabledEvts = new int[this.len];
        int enabledEvtsLen = 0;
        for (int type = 0; type < this.len; ++type) {
            if (!userRecordableEvts0[type]) continue;
            enabledEvts[enabledEvtsLen++] = type;
        }
        return U.unique(enabledEvts, enabledEvtsLen, this.inclEvtTypes, this.inclEvtTypes.length);
    }

    public synchronized void enableEvents(int[] types) {
        assert (types != null);
        this.ctx.security().authorize(null, SecurityPermission.EVENTS_ENABLE, null);
        boolean[] userRecordableEvts0 = this.userRecordableEvts;
        boolean[] recordableEvts0 = this.recordableEvts;
        int[] inclEvtTypes0 = this.inclEvtTypes;
        int[] userTypes = new int[types.length];
        int userTypesLen = 0;
        for (int type : types) {
            if (type < this.len) {
                userRecordableEvts0[type] = true;
                recordableEvts0[type] = true;
                continue;
            }
            userTypes[userTypesLen++] = type;
        }
        if (userTypesLen > 0) {
            Arrays.sort(userTypes, 0, userTypesLen);
            userTypes = this.compact(userTypes, userTypesLen);
            inclEvtTypes0 = U.unique(inclEvtTypes0, inclEvtTypes0.length, userTypes, userTypesLen);
        }
        this.userRecordableEvts = userRecordableEvts0;
        this.recordableEvts = recordableEvts0;
        this.inclEvtTypes = inclEvtTypes0;
    }

    public synchronized void disableEvents(int[] types) {
        assert (types != null);
        this.ctx.security().authorize(null, SecurityPermission.EVENTS_DISABLE, null);
        boolean[] userRecordableEvts0 = this.userRecordableEvts;
        boolean[] recordableEvts0 = this.recordableEvts;
        int[] inclEvtTypes0 = this.inclEvtTypes;
        int[] userTypes = new int[types.length];
        int userTypesLen = 0;
        for (int type : types) {
            if (this.binarySearch(this.cfgInclEvtTypes, type)) {
                U.warn(this.log, "Can't disable event since it was enabled in configuration: " + U.gridEventName(type));
                continue;
            }
            if (type < this.len) {
                userRecordableEvts0[type] = false;
                if (this.isInternalEvent(type)) continue;
                recordableEvts0[type] = false;
                continue;
            }
            userTypes[userTypesLen++] = type;
        }
        if (userTypesLen > 0) {
            Arrays.sort(userTypes, 0, userTypesLen);
            userTypes = this.compact(userTypes, userTypesLen);
            inclEvtTypes0 = U.difference(inclEvtTypes0, inclEvtTypes0.length, userTypes, userTypesLen);
        }
        this.userRecordableEvts = userRecordableEvts0;
        this.recordableEvts = recordableEvts0;
        this.inclEvtTypes = inclEvtTypes0;
    }

    private int[] compact(int[] arr, int len) {
        assert (arr != null);
        assert (U.isNonDecreasingArray(arr, len));
        if (arr.length <= 1) {
            return U.copyIfExceeded(arr, len);
        }
        int newLen = 1;
        for (int i = 1; i < len; ++i) {
            if (arr[i] == arr[newLen - 1]) continue;
            arr[newLen++] = arr[i];
        }
        return U.copyIfExceeded(arr, len);
    }

    private boolean isHiddenEvent(int type) {
        return type == 13;
    }

    private boolean isInternalEvent(int type) {
        return type == 18 || F.contains(EventType.EVTS_DISCOVERY_ALL, type);
    }

    public boolean isUserRecordable(int type) {
        assert (type > 0) : "Invalid event type: " + type;
        return type < this.len ? this.userRecordableEvts[type] : this.isUserRecordable0(type);
    }

    public boolean isRecordable(int type) {
        assert (type > 0) : "Invalid event type: " + type;
        return type < this.len ? this.recordableEvts[type] : this.isUserRecordable0(type);
    }

    public boolean isAllUserRecordable(int[] types) {
        assert (types != null);
        boolean[] userRecordableEvts0 = this.userRecordableEvts;
        for (int type : types) {
            if (type < 0 || type >= this.len) {
                throw new IllegalArgumentException("Invalid event type: " + type);
            }
            if (userRecordableEvts0[type]) continue;
            return false;
        }
        return true;
    }

    private boolean isUserRecordable0(int type) {
        return this.binarySearch(this.inclEvtTypes, type);
    }

    private boolean binarySearch(@Nullable int[] arr, int val) {
        if (F.isEmpty(arr)) {
            return false;
        }
        return arr.length <= 128 ? F.contains(arr, val) : Arrays.binarySearch(arr, val) >= 0;
    }

    public void addLocalEventListener(IgnitePredicate<? extends Event> lsnr, int[] types) {
        try {
            this.ctx.resource().injectGeneric(lsnr);
        }
        catch (IgniteCheckedException e) {
            throw new IgniteException("Failed to inject resources to event listener: " + lsnr, e);
        }
        this.addLocalEventListener(new UserListenerWrapper(lsnr), types);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addLocalEventListener(GridLocalEventListener lsnr, int[] types) {
        assert (lsnr != null);
        assert (types != null);
        assert (types.length > 0);
        if (!this.enterBusy()) {
            return;
        }
        try {
            for (int t : types) {
                this.getOrCreate(t).add(lsnr);
                if (this.isRecordable(t)) continue;
                U.warn(this.log, "Added listener for disabled event type: " + U.gridEventName(t));
            }
        }
        finally {
            this.leaveBusy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addLocalEventListener(GridLocalEventListener lsnr, int type, int ... types) {
        assert (lsnr != null);
        if (!this.enterBusy()) {
            return;
        }
        try {
            this.getOrCreate(type).add(lsnr);
            if (!this.isRecordable(type)) {
                U.warn(this.log, "Added listener for disabled event type: " + U.gridEventName(type));
            }
            if (types != null) {
                for (int t : types) {
                    this.getOrCreate(t).add(lsnr);
                    if (this.isRecordable(t)) continue;
                    U.warn(this.log, "Added listener for disabled event type: " + U.gridEventName(t));
                }
            }
        }
        finally {
            this.leaveBusy();
        }
    }

    private Collection<GridLocalEventListener> getOrCreate(Integer type) {
        Set prev;
        Set<GridLocalEventListener> set = (GridConcurrentLinkedHashSet<GridLocalEventListener>)this.lsnrs.get(type);
        if (set == null && (prev = (Set)this.lsnrs.putIfAbsent(type, set = new GridConcurrentLinkedHashSet<GridLocalEventListener>())) != null) {
            set = prev;
        }
        assert (set != null);
        return set;
    }

    public boolean removeLocalEventListener(IgnitePredicate<? extends Event> lsnr, int ... types) {
        return this.removeLocalEventListener(new UserListenerWrapper(lsnr), types);
    }

    public boolean removeLocalEventListener(GridLocalEventListener lsnr, int ... types) {
        IgnitePredicate p;
        assert (lsnr != null);
        boolean found = false;
        if (F.isEmpty(types)) {
            for (Set set : this.lsnrs.values()) {
                if (!set.remove(lsnr)) continue;
                found = true;
            }
        } else {
            assert (types != null);
            for (int type : types) {
                Set set = (Set)this.lsnrs.get(type);
                if (set == null || !set.remove(lsnr)) continue;
                found = true;
            }
        }
        if (lsnr instanceof UserListenerWrapper && (p = ((UserListenerWrapper)lsnr).listener()) instanceof PlatformEventFilterListener) {
            ((PlatformEventFilterListener)p).onClose();
        }
        return found;
    }

    public <T extends Event> IgniteInternalFuture<T> waitForEvent(final @Nullable IgnitePredicate<T> p, int ... types) {
        final GridFutureAdapter fut = new GridFutureAdapter();
        this.addLocalEventListener(new GridLocalEventListener(){

            @Override
            public void onEvent(Event evt) {
                if (p == null || p.apply(evt)) {
                    fut.onDone(evt);
                    GridEventStorageManager.this.removeLocalEventListener(this, new int[0]);
                }
            }
        }, F.isEmpty(types) ? EventType.EVTS_ALL : types);
        return fut;
    }

    public Event waitForEvent(long timeout, @Nullable Runnable c, final @Nullable IgnitePredicate<? super Event> p, int ... types) throws IgniteCheckedException {
        assert (timeout >= 0L);
        final GridFutureAdapter fut = new GridFutureAdapter();
        this.addLocalEventListener(new GridLocalEventListener(){

            @Override
            public void onEvent(Event evt) {
                if (p == null || p.apply(evt)) {
                    fut.onDone(evt);
                    GridEventStorageManager.this.removeLocalEventListener(this, new int[0]);
                }
            }
        }, types);
        try {
            if (c != null) {
                c.run();
            }
        }
        catch (Exception e) {
            throw new IgniteCheckedException(e);
        }
        return (Event)fut.get(timeout);
    }

    private void notifyListeners(Event evt) {
        assert (evt != null);
        this.notifyListeners((Collection)this.lsnrs.get(evt.type()), evt);
    }

    private void notifyListeners(@Nullable Collection<GridLocalEventListener> set, Event evt) {
        assert (evt != null);
        if (!F.isEmpty(set)) {
            assert (set != null);
            for (GridLocalEventListener lsnr : set) {
                try {
                    lsnr.onEvent(evt);
                }
                catch (Throwable e) {
                    U.error(this.log, "Unexpected exception in listener notification for event: " + evt, e);
                    if (!(e instanceof Error)) continue;
                    throw (Error)e;
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public <T extends Event> Collection<T> localEvents(IgnitePredicate<T> p) {
        assert (p != null);
        if (p instanceof PlatformEventFilterListener) {
            PlatformEventFilterListener p0 = (PlatformEventFilterListener)p;
            p0.initialize(this.ctx);
            try {
                Collection<Event> collection = ((EventStorageSpi)this.getSpi()).localEvents(p0);
                return collection;
            }
            finally {
                p0.onClose();
            }
        }
        return ((EventStorageSpi)this.getSpi()).localEvents(p);
    }

    public <T extends Event> IgniteInternalFuture<List<T>> remoteEventsAsync(final IgnitePredicate<T> p, final Collection<? extends ClusterNode> nodes, final long timeout) {
        assert (p != null);
        assert (nodes != null);
        final GridFutureAdapter<List<T>> fut = new GridFutureAdapter<List<T>>();
        this.ctx.closure().runLocalSafe((Runnable)new GPR(){

            @Override
            public void run() {
                try {
                    fut.onDone(GridEventStorageManager.this.query(p, nodes, timeout));
                }
                catch (IgniteCheckedException e) {
                    fut.onDone(e);
                }
            }
        }, true);
        return fut;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T extends Event> List<T> query(IgnitePredicate<T> p, Collection<? extends ClusterNode> nodes, long timeout) throws IgniteCheckedException {
        assert (p != null);
        assert (nodes != null);
        if (nodes.isEmpty()) {
            U.warn(this.log, "Failed to query events for empty nodes collection.");
            return Collections.emptyList();
        }
        GridIoManager ioMgr = this.ctx.io();
        final ArrayList evts = new ArrayList();
        final AtomicReference err = new AtomicReference();
        final HashSet<UUID> uids = new HashSet<UUID>();
        final Object qryMux = new Object();
        for (ClusterNode clusterNode : nodes) {
            uids.add(clusterNode.id());
        }
        GridLocalEventListener evtLsnr = new GridLocalEventListener(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void onEvent(Event evt) {
                assert (evt instanceof DiscoveryEvent);
                Object object = qryMux;
                synchronized (object) {
                    uids.remove(((DiscoveryEvent)evt).eventNode().id());
                    if (uids.isEmpty()) {
                        qryMux.notifyAll();
                    }
                }
            }
        };
        GridMessageListener gridMessageListener = new GridMessageListener(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void onMessage(UUID nodeId, Object msg) {
                assert (nodeId != null);
                assert (msg != null);
                if (!(msg instanceof GridEventStorageMessage)) {
                    U.error(GridEventStorageManager.this.log, "Received unknown message: " + msg);
                    return;
                }
                GridEventStorageMessage res = (GridEventStorageMessage)msg;
                try {
                    if (res.eventsBytes() != null) {
                        res.events((Collection)U.unmarshal(GridEventStorageManager.this.marsh, res.eventsBytes(), U.resolveClassLoader(GridEventStorageManager.this.ctx.config())));
                    }
                    if (res.exceptionBytes() != null) {
                        res.exception((Throwable)U.unmarshal(GridEventStorageManager.this.marsh, res.exceptionBytes(), U.resolveClassLoader(GridEventStorageManager.this.ctx.config())));
                    }
                }
                catch (IgniteCheckedException e) {
                    U.error(GridEventStorageManager.this.log, "Failed to unmarshal events query response: " + msg, e);
                    return;
                }
                Object object = qryMux;
                synchronized (object) {
                    if (uids.remove(nodeId)) {
                        if (res.events() != null) {
                            evts.addAll(res.events());
                        }
                    } else {
                        U.warn(GridEventStorageManager.this.log, "Received duplicate response (ignoring) [nodeId=" + nodeId + ", msg=" + res + ']');
                    }
                    if (res.exception() != null) {
                        err.set(res.exception());
                    }
                    if (uids.isEmpty() || err.get() != null) {
                        qryMux.notifyAll();
                    }
                }
            }
        };
        Object resTopic = GridTopic.TOPIC_EVENT.topic(IgniteUuid.fromUuid(this.ctx.localNodeId()));
        try {
            long now;
            this.addLocalEventListener(evtLsnr, new int[]{11, 12});
            ioMgr.addMessageListener(resTopic, gridMessageListener);
            byte[] serFilter = U.marshal(this.marsh, p);
            GridDeployment dep = this.ctx.deploy().deploy(p.getClass(), U.detectClassLoader(p.getClass()));
            if (dep == null) {
                throw new IgniteDeploymentCheckedException("Failed to deploy event filter: " + p);
            }
            GridEventStorageMessage msg = new GridEventStorageMessage(resTopic, serFilter, p.getClass().getName(), dep.classLoaderId(), dep.deployMode(), dep.userVersion(), dep.participants());
            this.sendMessage(nodes, GridTopic.TOPIC_EVENT, msg, (byte)0);
            if (timeout == 0L) {
                timeout = Long.MAX_VALUE;
            }
            long endTime = (now = U.currentTimeMillis()) + timeout <= 0L ? Long.MAX_VALUE : now + timeout;
            long delta = timeout;
            LinkedList uidsCp = null;
            Object object = qryMux;
            synchronized (object) {
                try {
                    while (!uids.isEmpty() && err.get() == null && delta > 0L) {
                        qryMux.wait(delta);
                        delta = endTime - U.currentTimeMillis();
                    }
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new IgniteCheckedException("Got interrupted while waiting for event query responses.", e);
                }
                if (err.get() != null) {
                    throw new IgniteCheckedException("Failed to query events due to exception on remote node.", (Throwable)err.get());
                }
                if (!uids.isEmpty()) {
                    uidsCp = new LinkedList(uids);
                }
            }
            if (uidsCp != null) {
                Iterator iter = uidsCp.iterator();
                while (iter.hasNext()) {
                    if (this.ctx.discovery().node((UUID)iter.next()) != null) continue;
                    iter.remove();
                }
                if (!uidsCp.isEmpty()) {
                    throw new IgniteCheckedException("Failed to receive event query response from following nodes: " + uidsCp);
                }
            }
        }
        finally {
            ioMgr.removeMessageListener(resTopic, gridMessageListener);
            this.removeLocalEventListener(evtLsnr, new int[0]);
        }
        return evts;
    }

    private void sendMessage(Collection<? extends ClusterNode> nodes, GridTopic topic, GridEventStorageMessage msg, byte plc) throws IgniteCheckedException {
        ClusterNode locNode = F.find(nodes, null, F.localNode(this.ctx.localNodeId()));
        Collection<? extends ClusterNode> rmtNodes = F.view(nodes, F.remoteNodes(this.ctx.localNodeId()));
        if (locNode != null) {
            this.ctx.io().send(locNode, topic, (Message)msg, plc);
        }
        if (!rmtNodes.isEmpty()) {
            msg.responseTopicBytes(U.marshal(this.marsh, msg.responseTopic()));
            this.ctx.io().send(rmtNodes, topic, (Message)msg, plc);
        }
    }

    private int[] copy(int[] arr) {
        assert (arr != null);
        return Arrays.copyOf(arr, arr.length);
    }

    private boolean[] copy(boolean[] arr) {
        assert (arr != null);
        return Arrays.copyOf(arr, arr.length);
    }

    private class UserListenerWrapper
    implements GridLocalEventListener {
        private final IgnitePredicate<Event> lsnr;

        private UserListenerWrapper(IgnitePredicate<? extends Event> lsnr) {
            this.lsnr = lsnr;
        }

        private IgnitePredicate<? extends Event> listener() {
            return this.lsnr;
        }

        @Override
        public void onEvent(Event evt) {
            if (!this.lsnr.apply(evt)) {
                GridEventStorageManager.this.removeLocalEventListener(this, new int[0]);
            }
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            UserListenerWrapper that = (UserListenerWrapper)o;
            return this.lsnr.equals(that.lsnr);
        }

        public int hashCode() {
            return this.lsnr.hashCode();
        }
    }

    private class RequestListener
    implements GridMessageListener {
        private RequestListener() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onMessage(UUID nodeId, Object msg) {
            assert (nodeId != null);
            assert (msg != null);
            if (!GridEventStorageManager.this.enterBusy()) {
                return;
            }
            try {
                Collection<Event> evts;
                Throwable ex;
                ClusterNode node;
                GridEventStorageMessage req;
                block20: {
                    if (!(msg instanceof GridEventStorageMessage)) {
                        U.warn(GridEventStorageManager.this.log, "Received unknown message: " + msg);
                        return;
                    }
                    req = (GridEventStorageMessage)msg;
                    node = GridEventStorageManager.this.ctx.discovery().node(nodeId);
                    if (node == null) {
                        U.warn(GridEventStorageManager.this.log, "Failed to resolve sender node that does not exist: " + nodeId);
                        return;
                    }
                    if (GridEventStorageManager.this.log.isDebugEnabled()) {
                        GridEventStorageManager.this.log.debug("Received event query request: " + req);
                    }
                    ex = null;
                    IgnitePredicate filter = null;
                    try {
                        GridDeployment dep;
                        if (req.responseTopicBytes() != null) {
                            req.responseTopic(U.unmarshal(GridEventStorageManager.this.marsh, req.responseTopicBytes(), U.resolveClassLoader(GridEventStorageManager.this.ctx.config())));
                        }
                        if ((dep = GridEventStorageManager.this.ctx.deploy().getGlobalDeployment(req.deploymentMode(), req.filterClassName(), req.filterClassName(), req.userVersion(), nodeId, req.classLoaderId(), req.loaderParticipants(), null)) == null) {
                            throw new IgniteDeploymentCheckedException("Failed to obtain deployment for event filter (is peer class loading turned on?): " + req);
                        }
                        filter = (IgnitePredicate)U.unmarshal(GridEventStorageManager.this.marsh, req.filter(), U.resolveClassLoader(dep.classLoader(), GridEventStorageManager.this.ctx.config()));
                        GridEventStorageManager.this.ctx.resource().inject(dep, dep.deployedClass(req.filterClassName(), new String[0]), (Object)filter);
                        evts = GridEventStorageManager.this.localEvents(filter);
                    }
                    catch (IgniteCheckedException e) {
                        U.error(GridEventStorageManager.this.log, "Failed to query events [nodeId=" + nodeId + ", filter=" + filter + ']', e);
                        evts = Collections.emptyList();
                        ex = e;
                    }
                    catch (Throwable e) {
                        U.error(GridEventStorageManager.this.log, "Failed to query events due to user exception [nodeId=" + nodeId + ", filter=" + filter + ']', e);
                        evts = Collections.emptyList();
                        ex = e;
                        if (!(e instanceof Error)) break block20;
                        throw (Error)e;
                    }
                }
                GridEventStorageMessage res = new GridEventStorageMessage(evts, ex);
                try {
                    if (GridEventStorageManager.this.log.isDebugEnabled()) {
                        GridEventStorageManager.this.log.debug("Sending event query response to node [nodeId=" + nodeId + "res=" + res + ']');
                    }
                    if (!GridEventStorageManager.this.ctx.localNodeId().equals(nodeId)) {
                        res.eventsBytes(U.marshal(GridEventStorageManager.this.marsh, res.events()));
                        res.exceptionBytes(U.marshal(GridEventStorageManager.this.marsh, (Object)res.exception()));
                    }
                    GridEventStorageManager.this.ctx.io().send(node, req.responseTopic(), (Message)res, (byte)0);
                }
                catch (IgniteCheckedException e) {
                    U.error(GridEventStorageManager.this.log, "Failed to send event query response to node [node=" + nodeId + ", res=" + res + ']', e);
                }
            }
            finally {
                GridEventStorageManager.this.leaveBusy();
            }
        }
    }
}

