/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.service;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.cache.Cache;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryUpdatedListener;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJobContext;
import org.apache.ignite.configuration.DeploymentMode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.GridClosureCallMode;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CacheIteratorConverter;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.query.CacheQuery;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.service.GridServiceAssignments;
import org.apache.ignite.internal.processors.service.GridServiceAssignmentsKey;
import org.apache.ignite.internal.processors.service.GridServiceDeployment;
import org.apache.ignite.internal.processors.service.GridServiceDeploymentFuture;
import org.apache.ignite.internal.processors.service.GridServiceDeploymentKey;
import org.apache.ignite.internal.processors.service.GridServiceProxy;
import org.apache.ignite.internal.processors.service.LazyServiceConfiguration;
import org.apache.ignite.internal.processors.service.ServiceContextImpl;
import org.apache.ignite.internal.processors.service.ServiceDescriptorImpl;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.GridEmptyIterator;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.SerializableTransient;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.JobContextResource;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.services.Service;
import org.apache.ignite.services.ServiceConfiguration;
import org.apache.ignite.services.ServiceDescriptor;
import org.apache.ignite.spi.IgniteNodeValidationResult;
import org.apache.ignite.thread.IgniteThreadFactory;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;

public class GridServiceProcessor
extends GridProcessorAdapter {
    public static final IgniteProductVersion LAZY_SERVICES_CFG_SINCE = IgniteProductVersion.fromString("1.5.22");
    private static final Set<IgniteProductVersion> SERVICE_TOP_CALLABLE_VER1;
    private final Boolean srvcCompatibilitySysProp;
    private static final long RETRY_TIMEOUT = 1000L;
    private static final int[] EVTS;
    private final AtomicReference<ServicesCompatibilityState> compatibilityState;
    private final Map<String, Collection<ServiceContextImpl>> locSvcs = new HashMap<String, Collection<ServiceContextImpl>>();
    private final ConcurrentMap<String, GridServiceDeploymentFuture> depFuts = new ConcurrentHashMap8<String, GridServiceDeploymentFuture>();
    private final ConcurrentMap<String, GridFutureAdapter<?>> undepFuts = new ConcurrentHashMap8();
    private final List<ComputeJobContext> pendingJobCtxs = new ArrayList<ComputeJobContext>(0);
    private final ExecutorService depExe;
    private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
    private ThreadFactory threadFactory = new IgniteThreadFactory(this.ctx.gridName());
    private ThreadLocal<String> svcName = new ThreadLocal();
    private IgniteInternalCache<Object, Object> cache;
    private GridLocalEventListener topLsnr = new TopologyListener();

    public GridServiceProcessor(GridKernalContext ctx) {
        super(ctx);
        this.depExe = Executors.newSingleThreadExecutor(new IgniteThreadFactory(ctx.gridName(), "srvc-deploy"));
        String servicesCompatibilityMode = IgniteSystemProperties.getString("IGNITE_SERVICES_COMPATIBILITY_MODE");
        this.srvcCompatibilitySysProp = servicesCompatibilityMode == null ? null : Boolean.valueOf(servicesCompatibilityMode);
        this.compatibilityState = new AtomicReference<ServicesCompatibilityState>(new ServicesCompatibilityState(this.srvcCompatibilitySysProp != null ? this.srvcCompatibilitySysProp : false, false));
    }

    public void onContinuousProcessorStarted(GridKernalContext ctx) throws IgniteCheckedException {
        if (ctx.clientNode()) {
            assert (!ctx.isDaemon());
            ctx.continuous().registerStaticRoutine("ignite-sys-cache", new ServiceEntriesListener(), null, null);
        }
    }

    @Override
    public void start() throws IgniteCheckedException {
        this.ctx.addNodeAttribute("org.apache.ignite.services.compatibility.enabled", this.srvcCompatibilitySysProp);
        if (this.ctx.isDaemon()) {
            return;
        }
        IgniteConfiguration cfg = this.ctx.config();
        DeploymentMode depMode = cfg.getDeploymentMode();
        if (cfg.isPeerClassLoadingEnabled() && (depMode == DeploymentMode.PRIVATE || depMode == DeploymentMode.ISOLATED) && !F.isEmpty(cfg.getServiceConfiguration())) {
            throw new IgniteCheckedException("Cannot deploy services in PRIVATE or ISOLATED deployment mode: " + (Object)((Object)depMode));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onKernalStart() throws IgniteCheckedException {
        if (this.ctx.isDaemon()) {
            return;
        }
        this.cache = this.ctx.cache().utilityCache();
        if (!this.ctx.clientNode()) {
            this.ctx.event().addLocalEventListener(this.topLsnr, EVTS);
        }
        try {
            if (this.ctx.deploy().enabled()) {
                this.ctx.cache().context().deploy().ignoreOwnership(true);
            }
            if (!this.ctx.clientNode()) {
                assert (this.cache.context().affinityNode());
                this.cache.context().continuousQueries().executeInternalQuery(new ServiceEntriesListener(), null, true, true, false);
            } else {
                assert (!this.ctx.isDaemon());
                this.ctx.closure().runLocalSafe(new Runnable(){

                    @Override
                    public void run() {
                        try {
                            Iterable<CacheEntryEvent<?, ?>> entries = GridServiceProcessor.this.cache.context().continuousQueries().existingEntries(false, null);
                            GridServiceProcessor.this.onSystemCacheUpdated(entries);
                        }
                        catch (IgniteCheckedException e) {
                            U.error(GridServiceProcessor.this.log, "Failed to load service entries: " + e, e);
                        }
                    }
                });
            }
        }
        finally {
            if (this.ctx.deploy().enabled()) {
                this.ctx.cache().context().deploy().ignoreOwnership(false);
            }
        }
        ServiceConfiguration[] cfgs = this.ctx.config().getServiceConfiguration();
        if (cfgs != null) {
            ArrayList futs = new ArrayList();
            for (ServiceConfiguration c : this.ctx.config().getServiceConfiguration()) {
                futs.add(this.deploy(c));
            }
            for (IgniteInternalFuture igniteInternalFuture : futs) {
                igniteInternalFuture.get();
            }
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Started service processor.");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onKernalStop(boolean cancel) {
        if (this.ctx.isDaemon()) {
            return;
        }
        this.busyLock.block();
        if (!this.ctx.clientNode()) {
            this.ctx.event().removeLocalEventListener(this.topLsnr, new int[0]);
        }
        ArrayList<ServiceContextImpl> ctxs = new ArrayList<ServiceContextImpl>();
        Map<String, Collection<ServiceContextImpl>> map = this.locSvcs;
        synchronized (map) {
            for (Collection<ServiceContextImpl> ctxs0 : this.locSvcs.values()) {
                ctxs.addAll(ctxs0);
            }
        }
        for (ServiceContextImpl ctx : ctxs) {
            ctx.setCancelled(true);
            Service svc = ctx.service();
            if (svc != null) {
                svc.cancel(ctx);
            }
            ctx.executor().shutdownNow();
        }
        for (ServiceContextImpl ctx : ctxs) {
            try {
                if (this.log.isInfoEnabled() && !ctxs.isEmpty()) {
                    this.log.info("Shutting down distributed service [name=" + ctx.name() + ", execId8=" + U.id8(ctx.executionId()) + ']');
                }
                ctx.executor().awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException ignore) {
                Thread.currentThread().interrupt();
                U.error(this.log, "Got interrupted while waiting for service to shutdown (will continue stopping node): " + ctx.name());
            }
        }
        U.shutdownNow(GridServiceProcessor.class, this.depExe, this.log);
        IgniteCheckedException err = new IgniteCheckedException("Operation has been cancelled (node is stopping).");
        this.cancelFutures(this.depFuts, err);
        this.cancelFutures(this.undepFuts, err);
        if (this.log.isDebugEnabled()) {
            this.log.debug("Stopped service processor.");
        }
    }

    @Override
    public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
        this.cancelFutures(this.depFuts, new IgniteClientDisconnectedCheckedException(this.ctx.cluster().clientReconnectFuture(), "Failed to deploy service, client node disconnected."));
        this.cancelFutures(this.undepFuts, new IgniteClientDisconnectedCheckedException(this.ctx.cluster().clientReconnectFuture(), "Failed to undeploy service, client node disconnected."));
    }

    private void cancelFutures(ConcurrentMap<String, ? extends GridFutureAdapter<?>> futs, Exception err) {
        for (Map.Entry entry : futs.entrySet()) {
            GridFutureAdapter fut = (GridFutureAdapter)entry.getValue();
            fut.onDone(err);
            futs.remove(entry.getKey(), fut);
        }
    }

    private void validate(ServiceConfiguration c) throws IgniteException {
        IgniteConfiguration cfg = this.ctx.config();
        DeploymentMode depMode = cfg.getDeploymentMode();
        if (cfg.isPeerClassLoadingEnabled() && (depMode == DeploymentMode.PRIVATE || depMode == DeploymentMode.ISOLATED)) {
            throw new IgniteException("Cannot deploy services in PRIVATE or ISOLATED deployment mode: " + (Object)((Object)depMode));
        }
        this.ensure(c.getName() != null, "getName() != null", null);
        this.ensure(c.getTotalCount() >= 0, "getTotalCount() >= 0", c.getTotalCount());
        this.ensure(c.getMaxPerNodeCount() >= 0, "getMaxPerNodeCount() >= 0", c.getMaxPerNodeCount());
        this.ensure(c.getService() != null, "getService() != null", c.getService());
        this.ensure(c.getTotalCount() > 0 || c.getMaxPerNodeCount() > 0, "c.getTotalCount() > 0 || c.getMaxPerNodeCount() > 0", null);
    }

    private void ensure(boolean cond, String desc, @Nullable Object v) {
        if (!cond) {
            if (v != null) {
                throw new IgniteException("Service configuration check failed (" + desc + "): " + v);
            }
            throw new IgniteException("Service configuration check failed (" + desc + ")");
        }
    }

    public IgniteInternalFuture<?> deployNodeSingleton(ClusterGroup prj, String name, Service svc) {
        return this.deployMultiple(prj, name, svc, 0, 1);
    }

    public IgniteInternalFuture<?> deployClusterSingleton(ClusterGroup prj, String name, Service svc) {
        return this.deployMultiple(prj, name, svc, 1, 1);
    }

    public IgniteInternalFuture<?> deployMultiple(ClusterGroup prj, String name, Service svc, int totalCnt, int maxPerNodeCnt) {
        ServiceConfiguration cfg = new ServiceConfiguration();
        cfg.setName(name);
        cfg.setService(svc);
        cfg.setTotalCount(totalCnt);
        cfg.setMaxPerNodeCount(maxPerNodeCnt);
        cfg.setNodeFilter(F.alwaysTrue() == prj.predicate() ? null : prj.predicate());
        return this.deploy(cfg);
    }

    public IgniteInternalFuture<?> deployKeyAffinitySingleton(String name, Service svc, String cacheName, Object affKey) {
        A.notNull(affKey, "affKey");
        ServiceConfiguration cfg = new ServiceConfiguration();
        cfg.setName(name);
        cfg.setService(svc);
        cfg.setCacheName(cacheName);
        cfg.setAffinityKey(affKey);
        cfg.setTotalCount(1);
        cfg.setMaxPerNodeCount(1);
        return this.deploy(cfg);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public IgniteInternalFuture<?> deploy(ServiceConfiguration cfg) {
        A.notNull(cfg, "cfg");
        ServicesCompatibilityState state = this.markCompatibilityStateAsUsed();
        this.validate(cfg);
        if (!state.srvcCompatibility) {
            LazyServiceConfiguration cfg0;
            Marshaller marsh = this.ctx.config().getMarshaller();
            try {
                byte[] srvcBytes = U.marshal(marsh, (Object)cfg.getService());
                cfg0 = new LazyServiceConfiguration(cfg, srvcBytes);
            }
            catch (IgniteCheckedException e) {
                U.error(this.log, "Failed to marshal service with configured marshaller [srvc=" + cfg.getService() + ", marsh=" + marsh + "]", e);
                return new GridFinishedFuture(e);
            }
            cfg = cfg0;
        }
        GridServiceDeploymentFuture fut = new GridServiceDeploymentFuture(cfg);
        GridServiceDeploymentFuture old = this.depFuts.putIfAbsent(cfg.getName(), fut);
        if (old != null) {
            if (!old.configuration().equalsIgnoreNodeFilter(cfg)) {
                fut.onDone(new IgniteCheckedException("Failed to deploy service (service already exists with different configuration) [deployed=" + old.configuration() + ", new=" + cfg + ']'));
                return fut;
            }
            return old;
        }
        if (this.ctx.clientDisconnected()) {
            fut.onDone(new IgniteClientDisconnectedCheckedException(this.ctx.cluster().clientReconnectFuture(), "Failed to deploy service, client node disconnected."));
            this.depFuts.remove(cfg.getName(), fut);
        }
        while (true) {
            try {
                GridServiceDeploymentKey key = new GridServiceDeploymentKey(cfg.getName());
                if (this.ctx.deploy().enabled()) {
                    this.ctx.cache().context().deploy().ignoreOwnership(true);
                }
                try {
                    GridServiceDeployment dep = (GridServiceDeployment)this.cache.getAndPutIfAbsent(key, new GridServiceDeployment(this.ctx.localNodeId(), cfg));
                    if (dep != null) {
                        if (!dep.configuration().equalsIgnoreNodeFilter(cfg)) {
                            this.depFuts.remove(cfg.getName(), fut);
                            fut.onDone(new IgniteCheckedException("Failed to deploy service (service already exists with different configuration) [deployed=" + dep.configuration() + ", new=" + cfg + ']'));
                        } else {
                            Iterator<Cache.Entry<Object, Object>> it = this.serviceEntries(ServiceAssignmentsPredicate.INSTANCE);
                            while (it.hasNext()) {
                                GridServiceAssignments assigns;
                                Cache.Entry<Object, Object> e = it.next();
                                if (!(e.getKey() instanceof GridServiceAssignmentsKey) || !(assigns = (GridServiceAssignments)e.getValue()).name().equals(cfg.getName())) continue;
                                this.depFuts.remove(cfg.getName(), fut);
                                fut.onDone();
                                break;
                            }
                            if (!dep.configuration().equalsIgnoreNodeFilter(cfg)) {
                                U.warn(this.log, "Service already deployed with different configuration (will ignore) [deployed=" + dep.configuration() + ", new=" + cfg + ']');
                            }
                        }
                    }
                }
                finally {
                    if (this.ctx.deploy().enabled()) {
                        this.ctx.cache().context().deploy().ignoreOwnership(false);
                    }
                }
                return fut;
            }
            catch (ClusterTopologyCheckedException e) {
                if (!this.log.isDebugEnabled()) continue;
                this.log.debug("Topology changed while deploying service (will retry): " + e.getMessage());
                continue;
            }
            catch (IgniteCheckedException e) {
                if (e.hasCause(ClusterTopologyCheckedException.class)) {
                    if (!this.log.isDebugEnabled()) continue;
                    this.log.debug("Topology changed while deploying service (will retry): " + e.getMessage());
                    continue;
                }
                U.error(this.log, "Failed to deploy service: " + cfg.getName(), e);
                return new GridFinishedFuture(e);
            }
            break;
        }
    }

    private ServicesCompatibilityState markCompatibilityStateAsUsed() {
        ServicesCompatibilityState newState;
        ServicesCompatibilityState state;
        do {
            if (!(state = this.compatibilityState.get()).used) continue;
            return state;
        } while (!this.compatibilityState.compareAndSet(state, newState = new ServicesCompatibilityState(state.srvcCompatibility, true)));
        return newState;
    }

    public IgniteInternalFuture<?> cancel(String name) {
        while (true) {
            try {
                GridFutureAdapter fut = new GridFutureAdapter();
                GridFutureAdapter old = this.undepFuts.putIfAbsent(name, fut);
                if (old != null) {
                    fut = old;
                } else {
                    GridServiceDeploymentKey key = new GridServiceDeploymentKey(name);
                    if (this.cache.getAndRemove(key) == null) {
                        this.undepFuts.remove(name);
                        fut.onDone();
                    }
                }
                return fut;
            }
            catch (ClusterTopologyCheckedException e) {
                if (!this.log.isDebugEnabled()) continue;
                this.log.debug("Topology changed while deploying service (will retry): " + e.getMessage());
                continue;
            }
            catch (IgniteCheckedException e) {
                this.log.error("Failed to undeploy service: " + name, e);
                return new GridFinishedFuture(e);
            }
            break;
        }
    }

    public IgniteInternalFuture<?> cancelAll() {
        Iterator<Cache.Entry<Object, Object>> it = this.serviceEntries(ServiceDeploymentPredicate.INSTANCE);
        GridCompoundFuture res = null;
        while (it.hasNext()) {
            Cache.Entry<Object, Object> e = it.next();
            if (!(e.getKey() instanceof GridServiceDeploymentKey)) continue;
            GridServiceDeployment dep = (GridServiceDeployment)e.getValue();
            if (res == null) {
                res = new GridCompoundFuture();
            }
            res.add(this.cancel(dep.configuration().getName()));
        }
        if (res != null) {
            res.markInitialized();
            return res;
        }
        return new GridFinishedFuture();
    }

    public Map<UUID, Integer> serviceTopology(String name, long timeout) throws IgniteCheckedException {
        ClusterNode node = this.cache.affinity().mapKeyToNode(name);
        if (node.version().compareTo(ServiceTopologyCallable.SINCE_VER) >= 0) {
            ServiceTopologyCallable call = new ServiceTopologyCallable(name);
            call.serialize = SERVICE_TOP_CALLABLE_VER1.contains(node.version());
            return this.ctx.closure().callAsyncNoFailover(GridClosureCallMode.BROADCAST, call, Collections.singletonList(node), false, timeout).get();
        }
        return GridServiceProcessor.serviceTopology(this.cache, name);
    }

    private static Map<UUID, Integer> serviceTopology(IgniteInternalCache<Object, Object> cache, String svcName) throws IgniteCheckedException {
        GridServiceAssignments val = (GridServiceAssignments)cache.get(new GridServiceAssignmentsKey(svcName));
        return val != null ? val.assigns() : null;
    }

    public Collection<ServiceDescriptor> serviceDescriptors() {
        ArrayList<ServiceDescriptor> descs = new ArrayList<ServiceDescriptor>();
        Iterator<Cache.Entry<Object, Object>> it = this.serviceEntries(ServiceDeploymentPredicate.INSTANCE);
        while (it.hasNext()) {
            Cache.Entry<Object, Object> e = it.next();
            if (!(e.getKey() instanceof GridServiceDeploymentKey)) continue;
            GridServiceDeployment dep = (GridServiceDeployment)e.getValue();
            ServiceDescriptorImpl desc = new ServiceDescriptorImpl(dep);
            try {
                GridServiceAssignments assigns = (GridServiceAssignments)this.cache.getForcePrimary(new GridServiceAssignmentsKey(dep.configuration().getName()));
                if (assigns == null) continue;
                desc.topologySnapshot(assigns.assigns());
                descs.add(desc);
            }
            catch (IgniteCheckedException ex) {
                this.log.error("Failed to get assignments from replicated cache for service: " + dep.configuration().getName(), ex);
            }
        }
        return descs;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public <T> T service(String name) {
        Collection<ServiceContextImpl> ctxs;
        Object object = this.locSvcs;
        synchronized (object) {
            ctxs = this.locSvcs.get(name);
        }
        if (ctxs == null) {
            return null;
        }
        object = ctxs;
        synchronized (object) {
            if (ctxs.isEmpty()) {
                return null;
            }
            for (ServiceContextImpl ctx : ctxs) {
                Service svc = ctx.service();
                if (svc == null) continue;
                return (T)svc;
            }
            return null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ServiceContextImpl serviceContext(String name) {
        Collection<ServiceContextImpl> ctxs;
        Object object = this.locSvcs;
        synchronized (object) {
            ctxs = this.locSvcs.get(name);
        }
        if (ctxs == null) {
            return null;
        }
        object = ctxs;
        synchronized (object) {
            if (ctxs.isEmpty()) {
                return null;
            }
            for (ServiceContextImpl ctx : ctxs) {
                if (ctx.service() == null) continue;
                return ctx;
            }
            return null;
        }
    }

    public <T> T serviceProxy(ClusterGroup prj, String name, Class<? super T> svcItf, boolean sticky, long timeout) throws IgniteException {
        Service svc;
        ServiceContextImpl ctx;
        if (this.hasLocalNode(prj) && (ctx = this.serviceContext(name)) != null && (svc = ctx.service()) != null) {
            if (!svcItf.isAssignableFrom(svc.getClass())) {
                throw new IgniteException("Service does not implement specified interface [svcItf=" + svcItf.getName() + ", svcCls=" + svc.getClass().getName() + ']');
            }
            return (T)svc;
        }
        return new GridServiceProxy<T>(prj, name, svcItf, sticky, timeout, this.ctx).proxy();
    }

    private boolean hasLocalNode(ClusterGroup prj) {
        for (ClusterNode n : prj.nodes()) {
            if (!n.isLocal()) continue;
            return true;
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public <T> Collection<T> services(String name) {
        Collection<ServiceContextImpl> ctxs;
        Object object = this.locSvcs;
        synchronized (object) {
            ctxs = this.locSvcs.get(name);
        }
        if (ctxs == null) {
            return null;
        }
        object = ctxs;
        synchronized (object) {
            ArrayList<Service> res = new ArrayList<Service>(ctxs.size());
            for (ServiceContextImpl ctx : ctxs) {
                Service svc = ctx.service();
                if (svc == null) continue;
                res.add(svc);
            }
            return res;
        }
    }

    private void reassign(GridServiceDeployment dep, AffinityTopologyVersion topVer) throws IgniteCheckedException {
        ServiceConfiguration cfg = dep.configuration();
        IgnitePredicate<ClusterNode> nodeFilter = cfg.getNodeFilter();
        if (nodeFilter != null) {
            this.ctx.resource().injectGeneric(nodeFilter);
        }
        int totalCnt = cfg.getTotalCount();
        int maxPerNodeCnt = cfg.getMaxPerNodeCount();
        String cacheName = cfg.getCacheName();
        Object affKey = cfg.getAffinityKey();
        while (true) {
            Collection<ClusterNode> nodes;
            GridServiceAssignments assigns = new GridServiceAssignments(cfg, dep.nodeId(), topVer.topologyVersion());
            if (affKey == null) {
                nodes = this.ctx.discovery().nodes(topVer);
                if (assigns.nodeFilter() != null) {
                    ArrayList<ClusterNode> nodes0 = new ArrayList<ClusterNode>();
                    for (ClusterNode node : nodes) {
                        if (!assigns.nodeFilter().apply(node)) continue;
                        nodes0.add(node);
                    }
                    nodes = nodes0;
                }
            } else {
                nodes = null;
            }
            try (IgniteInternalTx tx = this.cache.txStartEx(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ);){
                GridServiceAssignmentsKey key = new GridServiceAssignmentsKey(cfg.getName());
                GridServiceAssignments oldAssigns = (GridServiceAssignments)this.cache.get(key);
                HashMap<UUID, Integer> cnts = new HashMap<UUID, Integer>();
                if (affKey != null) {
                    ClusterNode n = this.ctx.affinity().mapKeyToNode(cacheName, affKey, topVer);
                    if (n != null) {
                        int cnt = maxPerNodeCnt == 0 ? (totalCnt == 0 ? 1 : totalCnt) : maxPerNodeCnt;
                        cnts.put(n.id(), cnt);
                    }
                } else if (!nodes.isEmpty()) {
                    int remainder;
                    int size = nodes.size();
                    int perNodeCnt = totalCnt != 0 ? totalCnt / size : maxPerNodeCnt;
                    int n = remainder = totalCnt != 0 ? totalCnt % size : 0;
                    if (perNodeCnt > maxPerNodeCnt && maxPerNodeCnt != 0) {
                        perNodeCnt = maxPerNodeCnt;
                        remainder = 0;
                    }
                    for (ClusterNode n2 : nodes) {
                        cnts.put(n2.id(), perNodeCnt);
                    }
                    assert (perNodeCnt >= 0);
                    assert (remainder >= 0);
                    if (remainder > 0) {
                        int cnt = perNodeCnt + 1;
                        if (oldAssigns != null) {
                            HashSet<UUID> used = new HashSet<UUID>();
                            for (Map.Entry<UUID, Integer> entry : oldAssigns.assigns().entrySet()) {
                                if (this.ctx.discovery().node(entry.getKey()) == null || entry.getValue() != cnt) continue;
                                cnts.put(entry.getKey(), cnt);
                                used.add(entry.getKey());
                                if (--remainder != 0) continue;
                                break;
                            }
                            if (remainder > 0) {
                                ArrayList entries = new ArrayList(cnts.entrySet());
                                Collections.shuffle(entries);
                                for (Map.Entry entry : entries) {
                                    if (used.contains(entry.getKey()) || (Integer)entry.getValue() >= maxPerNodeCnt && maxPerNodeCnt != 0) continue;
                                    entry.setValue((Integer)entry.getValue() + 1);
                                    if (--remainder != 0) continue;
                                    break;
                                }
                            }
                        } else {
                            ArrayList entries = new ArrayList(cnts.entrySet());
                            Collections.shuffle(entries);
                            for (Map.Entry entry : entries) {
                                entry.setValue((Integer)entry.getValue() + 1);
                                if (--remainder != 0) continue;
                                break;
                            }
                        }
                    }
                }
                assigns.assigns(cnts);
                this.cache.put(key, assigns);
                tx.commit();
            }
            catch (ClusterTopologyCheckedException e) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Topology changed while reassigning (will retry): " + e.getMessage());
                }
                U.sleep(10L);
                continue;
            }
            break;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void redeploy(GridServiceAssignments assigns) {
        Collection<ServiceContextImpl> ctxs;
        String svcName = assigns.name();
        Integer assignCnt = assigns.assigns().get(this.ctx.localNodeId());
        if (assignCnt == null) {
            assignCnt = 0;
        }
        Map<String, Collection<ServiceContextImpl>> map = this.locSvcs;
        synchronized (map) {
            ctxs = this.locSvcs.get(svcName);
            if (ctxs == null) {
                ctxs = new ArrayList<ServiceContextImpl>();
                this.locSvcs.put(svcName, ctxs);
            }
        }
        ArrayList<ServiceContextImpl> toInit = new ArrayList<ServiceContextImpl>();
        Collection<ServiceContextImpl> collection = ctxs;
        synchronized (collection) {
            if (ctxs.size() > assignCnt) {
                int cancelCnt = ctxs.size() - assignCnt;
                this.cancel(ctxs, cancelCnt);
            } else if (ctxs.size() < assignCnt) {
                int createCnt = assignCnt - ctxs.size();
                for (int i = 0; i < createCnt; ++i) {
                    ServiceContextImpl svcCtx = new ServiceContextImpl(assigns.name(), UUID.randomUUID(), assigns.cacheName(), assigns.affinityKey(), Executors.newSingleThreadExecutor(this.threadFactory));
                    ctxs.add(svcCtx);
                    toInit.add(svcCtx);
                }
            }
        }
        for (final ServiceContextImpl svcCtx : toInit) {
            Service svc;
            try {
                svc = this.copyAndInject(assigns.configuration());
                svc.init(svcCtx);
                svcCtx.service(svc);
            }
            catch (Throwable e) {
                U.error(this.log, "Failed to initialize service (service will not be deployed): " + assigns.name(), e);
                Collection<ServiceContextImpl> collection2 = ctxs;
                synchronized (collection2) {
                    ctxs.removeAll(toInit);
                }
                if (e instanceof Error) {
                    throw (Error)e;
                }
                if (e instanceof RuntimeException) {
                    throw (RuntimeException)e;
                }
                return;
            }
            if (this.log.isInfoEnabled()) {
                this.log.info("Starting service instance [name=" + svcCtx.name() + ", execId=" + svcCtx.executionId() + ']');
            }
            final ExecutorService exe = svcCtx.executor();
            exe.execute(new Runnable(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    try {
                        svc.execute(svcCtx);
                    }
                    catch (InterruptedException | IgniteInterruptedCheckedException ignore) {
                        if (GridServiceProcessor.this.log.isDebugEnabled()) {
                            GridServiceProcessor.this.log.debug("Service thread was interrupted [name=" + svcCtx.name() + ", execId=" + svcCtx.executionId() + ']');
                        }
                    }
                    catch (IgniteException e) {
                        if (e.hasCause(InterruptedException.class) || e.hasCause(IgniteInterruptedCheckedException.class)) {
                            if (GridServiceProcessor.this.log.isDebugEnabled()) {
                                GridServiceProcessor.this.log.debug("Service thread was interrupted [name=" + svcCtx.name() + ", execId=" + svcCtx.executionId() + ']');
                            }
                        } else {
                            U.error(GridServiceProcessor.this.log, "Service execution stopped with error [name=" + svcCtx.name() + ", execId=" + svcCtx.executionId() + ']', e);
                        }
                    }
                    catch (Throwable e) {
                        GridServiceProcessor.this.log.error("Service execution stopped with error [name=" + svcCtx.name() + ", execId=" + svcCtx.executionId() + ']', e);
                        if (e instanceof Error) {
                            throw (Error)e;
                        }
                    }
                    finally {
                        exe.shutdownNow();
                    }
                }
            });
        }
    }

    private Service copyAndInject(ServiceConfiguration cfg) throws IgniteCheckedException {
        Marshaller m = this.ctx.config().getMarshaller();
        if (cfg instanceof LazyServiceConfiguration) {
            byte[] bytes = ((LazyServiceConfiguration)cfg).serviceBytes();
            Service srvc = (Service)U.unmarshal(m, bytes, U.resolveClassLoader(null, this.ctx.config()));
            this.ctx.resource().inject(srvc);
            return srvc;
        }
        Service svc = cfg.getService();
        try {
            byte[] bytes = U.marshal(m, (Object)svc);
            Service cp = (Service)U.unmarshal(m, bytes, U.resolveClassLoader(svc.getClass().getClassLoader(), this.ctx.config()));
            this.ctx.resource().inject(cp);
            return cp;
        }
        catch (IgniteCheckedException e) {
            U.error(this.log, "Failed to copy service (will reuse same instance): " + svc.getClass(), e);
            return svc;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void cancel(Iterable<ServiceContextImpl> ctxs, int cancelCnt) {
        Iterator<ServiceContextImpl> it = ctxs.iterator();
        while (it.hasNext()) {
            ServiceContextImpl svcCtx = it.next();
            svcCtx.setCancelled(true);
            Service svc = svcCtx.service();
            if (svc != null) {
                try {
                    svc.cancel(svcCtx);
                }
                catch (Throwable e) {
                    this.log.error("Failed to cancel service (ignoring) [name=" + svcCtx.name() + ", execId=" + svcCtx.executionId() + ']', e);
                    if (e instanceof Error) {
                        throw e;
                    }
                }
                finally {
                    try {
                        this.ctx.resource().cleanup(svc);
                    }
                    catch (IgniteCheckedException e) {
                        U.error(this.log, "Failed to clean up service (will ignore): " + svcCtx.name(), e);
                    }
                }
            }
            svcCtx.executor().shutdownNow();
            it.remove();
            if (this.log.isInfoEnabled()) {
                this.log.info("Cancelled service instance [name=" + svcCtx.name() + ", execId=" + svcCtx.executionId() + ']');
            }
            if (--cancelCnt != 0) continue;
            break;
        }
    }

    private Iterator<Cache.Entry<Object, Object>> serviceEntries(IgniteBiPredicate<Object, Object> p) {
        try {
            if (!this.cache.context().affinityNode()) {
                ClusterNode oldestSrvNode = this.ctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
                if (oldestSrvNode == null) {
                    return new GridEmptyIterator<Cache.Entry<Object, Object>>();
                }
                GridCacheQueryManager<Object, Object> qryMgr = this.cache.context().queries();
                CacheQuery qry = qryMgr.createScanQuery(p, null, false);
                qry.keepAll(false);
                qry.projection(this.ctx.cluster().get().forNode(oldestSrvNode, new ClusterNode[0]));
                GridCloseableIterator iter = qry.executeScanQuery();
                return this.cache.context().itHolder().iterator(iter, new CacheIteratorConverter<Cache.Entry<Object, Object>, Map.Entry<Object, Object>>(){

                    @Override
                    protected Cache.Entry<Object, Object> convert(Map.Entry<Object, Object> e) {
                        return new CacheEntryImpl<Object, Object>(e.getKey(), e.getValue());
                    }

                    @Override
                    protected void remove(Cache.Entry<Object, Object> item) {
                        throw new UnsupportedOperationException();
                    }
                });
            }
            return this.cache.entrySetx(new CacheEntryPredicate[0]).iterator();
        }
        catch (IgniteCheckedException e) {
            throw new IgniteException(e);
        }
    }

    @Override
    @Nullable
    public IgniteNodeValidationResult validateNode(ClusterNode node) {
        block4: {
            ServicesCompatibilityState state;
            boolean rmtNodeIsOld;
            IgniteNodeValidationResult res = super.validateNode(node);
            if (res != null) {
                return res;
            }
            boolean bl = rmtNodeIsOld = node.version().compareToIgnoreTimestamp(LAZY_SERVICES_CFG_SINCE) < 0;
            if (!rmtNodeIsOld) {
                return null;
            }
            do {
                if ((state = this.compatibilityState.get()).srvcCompatibility) {
                    return null;
                }
                if (state.used) break block4;
            } while (!this.compatibilityState.compareAndSet(state, new ServicesCompatibilityState(true, false)));
            return null;
        }
        ClusterNode locNode = this.ctx.discovery().localNode();
        return new IgniteNodeValidationResult(node.id(), "Local node uses IgniteServices and works in not compatible mode with old nodes (IGNITE_SERVICES_COMPATIBILITY_MODE system property can be set explicitly) [locNodeId=" + locNode.id() + ", rmtNodeId=" + node.id() + "]", "Remote node uses IgniteServices and works in not compatible mode with old nodes IGNITE_SERVICES_COMPATIBILITY_MODE system property can be set explicitly[locNodeId=" + node.id() + ", rmtNodeId=" + locNode.id() + "]");
    }

    public void initCompatibilityMode(Collection<ClusterNode> nodes) {
        ServicesCompatibilityState state;
        boolean mode;
        if (this.srvcCompatibilitySysProp == null) {
            boolean clusterHasOldNode = false;
            for (ClusterNode n : nodes) {
                if (n.version().compareToIgnoreTimestamp(LAZY_SERVICES_CFG_SINCE) >= 0) continue;
                clusterHasOldNode = true;
                break;
            }
            mode = clusterHasOldNode;
        } else {
            mode = this.srvcCompatibilitySysProp;
        }
        while (!this.compatibilityState.compareAndSet(state = this.compatibilityState.get(), new ServicesCompatibilityState(mode, state.used))) {
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onUtilityCacheStarted() {
        List<ComputeJobContext> list = this.pendingJobCtxs;
        synchronized (list) {
            if (this.pendingJobCtxs.size() == 0) {
                return;
            }
            Iterator<ComputeJobContext> iter = this.pendingJobCtxs.iterator();
            while (iter.hasNext()) {
                iter.next().callcc();
                iter.remove();
            }
        }
    }

    private void onSystemCacheUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
        boolean firstTime = true;
        for (CacheEntryEvent<GridServiceDeploymentKey, GridServiceDeployment> cacheEntryEvent : evts) {
            if (cacheEntryEvent.getKey() instanceof GridServiceDeploymentKey) {
                if (firstTime) {
                    this.markCompatibilityStateAsUsed();
                    firstTime = false;
                }
                this.processDeployment(cacheEntryEvent);
                continue;
            }
            if (!(cacheEntryEvent.getKey() instanceof GridServiceAssignmentsKey)) continue;
            if (firstTime) {
                this.markCompatibilityStateAsUsed();
                firstTime = false;
            }
            this.processAssignment(cacheEntryEvent);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processDeployment(CacheEntryEvent<GridServiceDeploymentKey, GridServiceDeployment> e) {
        GridServiceDeployment dep;
        try {
            dep = (GridServiceDeployment)e.getValue();
        }
        catch (IgniteException ex) {
            if (X.hasCause(ex, ClassNotFoundException.class)) {
                return;
            }
            throw ex;
        }
        if (dep != null) {
            this.svcName.set(dep.configuration().getName());
            AffinityTopologyVersion topVer = this.ctx.discovery().topologyVersionEx();
            ClusterNode oldest = U.oldest(this.ctx.discovery().nodes(topVer), null);
            if (oldest.isLocal()) {
                this.onDeployment(dep, topVer);
            }
        } else {
            GridFutureAdapter fut;
            Collection<ServiceContextImpl> ctxs;
            String name = ((GridServiceDeploymentKey)e.getKey()).name();
            this.svcName.set(name);
            Object object = this.locSvcs;
            synchronized (object) {
                ctxs = this.locSvcs.remove(name);
            }
            if (ctxs != null) {
                object = ctxs;
                synchronized (object) {
                    this.cancel(ctxs, ctxs.size());
                }
            }
            if ((fut = (GridFutureAdapter)this.depFuts.remove(name)) != null) {
                fut.onDone();
            }
            if ((fut = (GridFutureAdapter)this.undepFuts.remove(name)) != null) {
                fut.onDone();
            }
            GridServiceAssignmentsKey key = new GridServiceAssignmentsKey(name);
            if (this.cache.cache().affinity().isPrimary(this.ctx.discovery().localNode(), key)) {
                try {
                    this.cache.getAndRemove(key);
                }
                catch (IgniteCheckedException ex) {
                    U.error(this.log, "Failed to remove assignments for undeployed service: " + name, ex);
                }
            }
        }
    }

    private void onDeployment(final GridServiceDeployment dep, final AffinityTopologyVersion topVer) {
        try {
            AffinityTopologyVersion newTopVer = this.ctx.discovery().topologyVersionEx();
            if (newTopVer.equals(topVer)) {
                this.reassign(dep, topVer);
            }
        }
        catch (IgniteCheckedException e) {
            AffinityTopologyVersion newTopVer;
            if (!(e instanceof ClusterTopologyCheckedException)) {
                this.log.error("Failed to do service reassignment (will retry): " + dep.configuration().getName(), e);
            }
            if (!(newTopVer = this.ctx.discovery().topologyVersionEx()).equals(topVer)) {
                assert (newTopVer.compareTo(topVer) > 0);
                return;
            }
            this.ctx.timeout().addTimeoutObject(new GridTimeoutObject(){
                private IgniteUuid id = IgniteUuid.randomUuid();
                private long start = System.currentTimeMillis();

                @Override
                public IgniteUuid timeoutId() {
                    return this.id;
                }

                @Override
                public long endTime() {
                    return this.start + 1000L;
                }

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void onTimeout() {
                    if (!GridServiceProcessor.this.busyLock.enterBusy()) {
                        return;
                    }
                    try {
                        GridServiceProcessor.this.onDeployment(dep, topVer);
                    }
                    finally {
                        GridServiceProcessor.this.busyLock.leaveBusy();
                    }
                }
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processAssignment(CacheEntryEvent<GridServiceAssignmentsKey, GridServiceAssignments> e) {
        GridServiceAssignments assigns;
        try {
            assigns = (GridServiceAssignments)e.getValue();
        }
        catch (IgniteException ex) {
            if (X.hasCause(ex, ClassNotFoundException.class)) {
                return;
            }
            throw ex;
        }
        if (assigns != null) {
            this.svcName.set(assigns.name());
            Throwable t = null;
            try {
                this.redeploy(assigns);
            }
            catch (Error | RuntimeException th) {
                t = th;
            }
            GridServiceDeploymentFuture fut = (GridServiceDeploymentFuture)this.depFuts.get(assigns.name());
            if (fut != null && fut.configuration().equalsIgnoreNodeFilter(assigns.configuration())) {
                this.depFuts.remove(assigns.name(), fut);
                fut.onDone(null, t);
            }
        } else {
            Collection<ServiceContextImpl> ctxs;
            String name = ((GridServiceAssignmentsKey)e.getKey()).name();
            this.svcName.set(name);
            Object object = this.locSvcs;
            synchronized (object) {
                ctxs = this.locSvcs.remove(name);
            }
            if (ctxs != null) {
                object = ctxs;
                synchronized (object) {
                    this.cancel(ctxs, ctxs.size());
                }
            }
        }
    }

    static {
        EVTS = new int[]{10, 11, 12, 18};
        TreeSet<IgniteProductVersion> versions = new TreeSet<IgniteProductVersion>(new Comparator<IgniteProductVersion>(){

            @Override
            public int compare(IgniteProductVersion o1, IgniteProductVersion o2) {
                return o1.compareToIgnoreTimestamp(o2);
            }
        });
        versions.add(IgniteProductVersion.fromString("1.5.30"));
        versions.add(IgniteProductVersion.fromString("1.5.31"));
        versions.add(IgniteProductVersion.fromString("1.5.32"));
        versions.add(IgniteProductVersion.fromString("1.6.3"));
        versions.add(IgniteProductVersion.fromString("1.6.4"));
        versions.add(IgniteProductVersion.fromString("1.6.5"));
        versions.add(IgniteProductVersion.fromString("1.6.6"));
        versions.add(IgniteProductVersion.fromString("1.6.7"));
        versions.add(IgniteProductVersion.fromString("1.6.8"));
        versions.add(IgniteProductVersion.fromString("1.6.9"));
        versions.add(IgniteProductVersion.fromString("1.6.10"));
        versions.add(IgniteProductVersion.fromString("1.7.0"));
        versions.add(IgniteProductVersion.fromString("1.7.1"));
        versions.add(IgniteProductVersion.fromString("1.7.2"));
        SERVICE_TOP_CALLABLE_VER1 = Collections.unmodifiableSet(versions);
    }

    private static class ServicesCompatibilityState {
        private final boolean srvcCompatibility;
        private final boolean used;

        ServicesCompatibilityState(boolean srvcCompatibility, boolean used) {
            this.srvcCompatibility = srvcCompatibility;
            this.used = used;
        }
    }

    @GridInternal
    @SerializableTransient(methodName="serializableTransient")
    private static class ServiceTopologyCallable
    implements IgniteCallable<Map<UUID, Integer>> {
        private static final long serialVersionUID = 0L;
        private static final IgniteProductVersion SINCE_VER = IgniteProductVersion.fromString("1.5.7");
        private static final String[] SER_FIELDS = new String[]{"waitedCacheInit", "jCtx", "log"};
        private final String svcName;
        private transient boolean waitedCacheInit;
        @IgniteInstanceResource
        private IgniteEx ignite;
        @JobContextResource
        private transient ComputeJobContext jCtx;
        @LoggerResource
        private transient IgniteLogger log;
        transient boolean serialize;

        public ServiceTopologyCallable(String svcName) {
            this.svcName = svcName;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Map<UUID, Integer> call() throws Exception {
            IgniteInternalCache cache = this.ignite.context().cache().utilityCache();
            if (cache == null) {
                List pendingCtxs;
                List list = pendingCtxs = this.ignite.context().service().pendingJobCtxs;
                synchronized (list) {
                    cache = this.ignite.context().cache().utilityCache();
                    if (cache == null) {
                        if (!this.waitedCacheInit) {
                            this.log.debug("Utility cache hasn't been initialized yet. Waiting.");
                            this.jCtx.holdcc(60000L);
                            pendingCtxs.add(this.jCtx);
                            this.waitedCacheInit = true;
                            return null;
                        }
                        this.log.error("Failed to gather service topology. Utility cache initialization is stuck.");
                        throw new IgniteCheckedException("Failed to gather service topology. Utility cache initialization is stuck.");
                    }
                }
            }
            return GridServiceProcessor.serviceTopology(cache, this.svcName);
        }

        private static String[] serializableTransient(ServiceTopologyCallable self, IgniteProductVersion ver) {
            return self != null && self.serialize || ver != null && SERVICE_TOP_CALLABLE_VER1.contains(ver) ? SER_FIELDS : null;
        }
    }

    static class ServiceAssignmentsPredicate
    implements IgniteBiPredicate<Object, Object> {
        static final ServiceAssignmentsPredicate INSTANCE = new ServiceAssignmentsPredicate();
        private static final long serialVersionUID = 0L;

        ServiceAssignmentsPredicate() {
        }

        @Override
        public boolean apply(Object key, Object val) {
            return key instanceof GridServiceAssignmentsKey;
        }

        public String toString() {
            return S.toString(ServiceAssignmentsPredicate.class, this);
        }
    }

    static class ServiceDeploymentPredicate
    implements IgniteBiPredicate<Object, Object> {
        static final ServiceDeploymentPredicate INSTANCE = new ServiceDeploymentPredicate();
        private static final long serialVersionUID = 0L;

        ServiceDeploymentPredicate() {
        }

        @Override
        public boolean apply(Object key, Object val) {
            return key instanceof GridServiceDeploymentKey;
        }

        public String toString() {
            return S.toString(ServiceDeploymentPredicate.class, this);
        }
    }

    private abstract class BusyRunnable
    implements Runnable {
        private BusyRunnable() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            if (!GridServiceProcessor.this.busyLock.enterBusy()) {
                return;
            }
            GridServiceProcessor.this.svcName.set(null);
            try {
                this.run0();
            }
            catch (Throwable t) {
                GridServiceProcessor.this.log.error("Error when executing service: " + (String)GridServiceProcessor.this.svcName.get(), t);
                if (t instanceof Error) {
                    throw t;
                }
            }
            finally {
                GridServiceProcessor.this.busyLock.leaveBusy();
                GridServiceProcessor.this.svcName.set(null);
            }
        }

        public abstract void run0();
    }

    private class TopologyListener
    implements GridLocalEventListener {
        private TopologyListener() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onEvent(Event evt) {
            if (!GridServiceProcessor.this.busyLock.enterBusy()) {
                return;
            }
            try {
                AffinityTopologyVersion topVer;
                if (evt instanceof DiscoveryCustomEvent) {
                    DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)evt).customMessage();
                    topVer = ((DiscoveryCustomEvent)evt).affinityTopologyVersion();
                    if (msg instanceof CacheAffinityChangeMessage && !((CacheAffinityChangeMessage)msg).exchangeNeeded()) {
                        return;
                    }
                } else {
                    topVer = new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion(), 0);
                }
                GridServiceProcessor.this.depExe.execute(new BusyRunnable(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    public void run0() {
                        ClusterNode oldest = GridServiceProcessor.this.ctx.discovery().oldestAliveCacheServerNode(topVer);
                        if (oldest != null && oldest.isLocal()) {
                            ConcurrentLinkedQueue<GridServiceDeployment> retries = new ConcurrentLinkedQueue<GridServiceDeployment>();
                            if (GridServiceProcessor.this.ctx.deploy().enabled()) {
                                GridServiceProcessor.this.ctx.cache().context().deploy().ignoreOwnership(true);
                            }
                            try {
                                Iterator it = GridServiceProcessor.this.serviceEntries(ServiceDeploymentPredicate.INSTANCE);
                                boolean firstTime = true;
                                while (it.hasNext()) {
                                    Cache.Entry e = (Cache.Entry)it.next();
                                    if (!(e.getKey() instanceof GridServiceDeploymentKey)) continue;
                                    if (firstTime) {
                                        GridServiceProcessor.this.markCompatibilityStateAsUsed();
                                        firstTime = false;
                                    }
                                    GridServiceDeployment dep = (GridServiceDeployment)e.getValue();
                                    try {
                                        GridServiceProcessor.this.svcName.set(dep.configuration().getName());
                                        GridServiceProcessor.this.ctx.cache().internalCache("ignite-sys-cache").context().affinity().affinityReadyFuture(topVer).get();
                                        GridServiceProcessor.this.reassign(dep, topVer);
                                    }
                                    catch (IgniteCheckedException ex) {
                                        if (!(e instanceof ClusterTopologyCheckedException)) {
                                            LT.error(GridServiceProcessor.this.log, ex, "Failed to do service reassignment (will retry): " + dep.configuration().getName());
                                        }
                                        retries.add(dep);
                                    }
                                }
                            }
                            finally {
                                if (GridServiceProcessor.this.ctx.deploy().enabled()) {
                                    GridServiceProcessor.this.ctx.cache().context().deploy().ignoreOwnership(false);
                                }
                            }
                            if (!retries.isEmpty()) {
                                TopologyListener.this.onReassignmentFailed(topVer, retries);
                            }
                        }
                        for (Cache.Entry e : GridServiceProcessor.this.cache.entrySetx(CU.cachePrimary(GridServiceProcessor.this.ctx.grid().affinity(GridServiceProcessor.this.cache.name()), GridServiceProcessor.this.ctx.grid().localNode()))) {
                            if (!(e.getKey() instanceof GridServiceAssignmentsKey)) continue;
                            String name = ((GridServiceAssignmentsKey)e.getKey()).name();
                            try {
                                if (GridServiceProcessor.this.cache.get(new GridServiceDeploymentKey(name)) != null) continue;
                                if (GridServiceProcessor.this.log.isDebugEnabled()) {
                                    GridServiceProcessor.this.log.debug("Removed zombie assignments: " + e.getValue());
                                }
                                GridServiceProcessor.this.cache.getAndRemove(e.getKey());
                            }
                            catch (IgniteCheckedException ex) {
                                U.error(GridServiceProcessor.this.log, "Failed to clean up zombie assignments for service: " + name, ex);
                            }
                        }
                    }
                });
            }
            finally {
                GridServiceProcessor.this.busyLock.leaveBusy();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void onReassignmentFailed(final AffinityTopologyVersion topVer, final Collection<GridServiceDeployment> retries) {
            if (!GridServiceProcessor.this.busyLock.enterBusy()) {
                return;
            }
            try {
                if (GridServiceProcessor.this.ctx.discovery().topologyVersionEx().equals(topVer)) {
                    return;
                }
                Iterator<GridServiceDeployment> it = retries.iterator();
                while (it.hasNext()) {
                    GridServiceDeployment dep = it.next();
                    try {
                        GridServiceProcessor.this.svcName.set(dep.configuration().getName());
                        GridServiceProcessor.this.reassign(dep, topVer);
                        it.remove();
                    }
                    catch (IgniteCheckedException e) {
                        if (e instanceof ClusterTopologyCheckedException) continue;
                        LT.error(GridServiceProcessor.this.log, e, "Failed to do service reassignment (will retry): " + dep.configuration().getName());
                    }
                }
                if (!retries.isEmpty()) {
                    GridServiceProcessor.this.ctx.timeout().addTimeoutObject(new GridTimeoutObject(){
                        private IgniteUuid id = IgniteUuid.randomUuid();
                        private long start = System.currentTimeMillis();

                        @Override
                        public IgniteUuid timeoutId() {
                            return this.id;
                        }

                        @Override
                        public long endTime() {
                            return this.start + 1000L;
                        }

                        @Override
                        public void onTimeout() {
                            TopologyListener.this.onReassignmentFailed(topVer, retries);
                        }
                    });
                }
            }
            finally {
                GridServiceProcessor.this.busyLock.leaveBusy();
            }
        }
    }

    private class ServiceEntriesListener
    implements CacheEntryUpdatedListener<Object, Object> {
        private ServiceEntriesListener() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onUpdated(final Iterable<CacheEntryEvent<?, ?>> deps) {
            if (!GridServiceProcessor.this.busyLock.enterBusy()) {
                return;
            }
            try {
                GridServiceProcessor.this.depExe.execute(new BusyRunnable(){

                    @Override
                    public void run0() {
                        GridServiceProcessor.this.onSystemCacheUpdated(deps);
                    }
                });
            }
            finally {
                GridServiceProcessor.this.busyLock.leaveBusy();
            }
        }
    }
}

