/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.platform.entityframework;

import java.util.HashSet;
import java.util.Map;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import javax.cache.Cache;
import javax.cache.processor.EntryProcessorResult;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.ComputeTaskInternalFuture;
import org.apache.ignite.internal.GridClosureCallMode;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.processors.platform.cache.PlatformCache;
import org.apache.ignite.internal.processors.platform.cache.PlatformCacheExtension;
import org.apache.ignite.internal.processors.platform.entityframework.PlatformDotNetEntityFrameworkCacheEntry;
import org.apache.ignite.internal.processors.platform.entityframework.PlatformDotNetEntityFrameworkCacheKey;
import org.apache.ignite.internal.processors.platform.entityframework.PlatformDotNetEntityFrameworkIncreaseVersionProcessor;
import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
import org.apache.ignite.internal.processors.task.TaskExecutionOptions;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.resources.IgniteInstanceResource;

public class PlatformDotNetEntityFrameworkCacheExtension
implements PlatformCacheExtension {
    private static final int EXT_ID = 1;
    private static final int OP_INVALIDATE_SETS = 1;
    private static final int OP_PUT_ITEM = 2;
    private static final int OP_GET_ITEM = 3;
    private static final CleanupNodeId CLEANUP_NODE_ID = new CleanupNodeId();
    private final Map<String, Boolean> cleanupFlags = new ConcurrentHashMap<String, Boolean>();

    @Override
    public int id() {
        return 1;
    }

    @Override
    public long processInOutStreamLong(PlatformCache target, int type, BinaryRawReaderEx reader, PlatformMemory mem) throws IgniteCheckedException {
        switch (type) {
            case 1: {
                IgniteCache metaCache = target.rawCache();
                String dataCacheName = reader.readString();
                int cnt = reader.readInt();
                assert (cnt > 0);
                HashSet<String> entitySetNames = new HashSet<String>(cnt);
                for (int i = 0; i < cnt; ++i) {
                    entitySetNames.add(reader.readString());
                }
                Map<String, EntryProcessorResult<Long>> curVers = metaCache.invokeAll(entitySetNames, new PlatformDotNetEntityFrameworkIncreaseVersionProcessor(), new Object[0]);
                if (curVers.size() != cnt) {
                    throw new IgniteCheckedException("Failed to update entity set versions [expected=" + cnt + ", actual=" + curVers.size() + "]");
                }
                IgniteEx grid = target.platformContext().kernalContext().grid();
                this.startBackgroundCleanup(grid, metaCache, dataCacheName, curVers);
                return target.writeResult(mem, null);
            }
            case 2: {
                String qry = reader.readString();
                long[] versions = null;
                String[] entitySets = null;
                int cnt = reader.readInt();
                if (cnt >= 0) {
                    versions = new long[cnt];
                    entitySets = new String[cnt];
                    for (int i = 0; i < cnt; ++i) {
                        versions[i] = reader.readLong();
                        entitySets[i] = reader.readString();
                    }
                }
                byte[] data = reader.readByteArray();
                PlatformDotNetEntityFrameworkCacheEntry efEntry = new PlatformDotNetEntityFrameworkCacheEntry(entitySets, data);
                IgniteCache dataCache = target.rawCache();
                PlatformDotNetEntityFrameworkCacheKey key = new PlatformDotNetEntityFrameworkCacheKey(qry, versions);
                dataCache.put(key, efEntry);
                return target.writeResult(mem, null);
            }
            case 3: {
                PlatformDotNetEntityFrameworkCacheKey key;
                IgniteCache dataCache;
                PlatformDotNetEntityFrameworkCacheEntry entry;
                String qry = reader.readString();
                long[] versions = null;
                int cnt = reader.readInt();
                if (cnt >= 0) {
                    versions = new long[cnt];
                    for (int i = 0; i < cnt; ++i) {
                        versions[i] = reader.readLong();
                    }
                }
                byte[] data = (entry = (PlatformDotNetEntityFrameworkCacheEntry)(dataCache = target.rawCache()).get(key = new PlatformDotNetEntityFrameworkCacheKey(qry, versions))) == null ? null : entry.data();
                return target.writeResult(mem, data);
            }
        }
        throw new IgniteCheckedException("Unsupported operation type: " + type);
    }

    private void startBackgroundCleanup(Ignite grid, Cache<CleanupNodeId, UUID> metaCache, String dataCacheName, Map<String, EntryProcessorResult<Long>> currentVersions) {
        if (this.cleanupFlags.containsKey(dataCacheName)) {
            return;
        }
        if (!this.trySetGlobalCleanupFlag(grid, metaCache)) {
            return;
        }
        this.cleanupFlags.put(dataCacheName, true);
        ClusterGroup dataNodes = grid.cluster().forDataNodes(dataCacheName);
        ComputeTaskInternalFuture<?> f = ((IgniteEx)grid).context().closure().runAsync(GridClosureCallMode.BROADCAST, new RemoveOldEntriesRunnable(dataCacheName, currentVersions), TaskExecutionOptions.options(dataNodes.nodes()));
        f.listen(new CleanupCompletionListener(metaCache, dataCacheName));
    }

    private boolean trySetGlobalCleanupFlag(Ignite grid, Cache<CleanupNodeId, UUID> metaCache) {
        UUID nodeId;
        UUID locNodeId = grid.cluster().localNode().id();
        while ((nodeId = metaCache.get(CLEANUP_NODE_ID)) == null) {
            if (!metaCache.putIfAbsent(CLEANUP_NODE_ID, locNodeId)) continue;
            return true;
        }
        if (nodeId.equals(locNodeId)) {
            return false;
        }
        if (grid.cluster().node(nodeId) != null) {
            return false;
        }
        return metaCache.replace(CLEANUP_NODE_ID, nodeId, locNodeId);
    }

    private static void removeOldEntries(Ignite ignite, String dataCacheName, Map<String, EntryProcessorResult<Long>> currentVersions) {
        IgniteCache cache = ignite.cache(dataCacheName);
        TreeSet<PlatformDotNetEntityFrameworkCacheKey> keysToRemove = new TreeSet<PlatformDotNetEntityFrameworkCacheKey>();
        ClusterNode locNode = ignite.cluster().localNode();
        for (Cache.Entry cacheEntry : cache.localEntries(CachePeekMode.ALL)) {
            if (!ignite.affinity(dataCacheName).isPrimary(locNode, cacheEntry.getKey())) continue;
            long[] versions = ((PlatformDotNetEntityFrameworkCacheKey)cacheEntry.getKey()).versions();
            String[] entitySets = ((PlatformDotNetEntityFrameworkCacheEntry)cacheEntry.getValue()).entitySets();
            for (int i = 0; i < entitySets.length; ++i) {
                EntryProcessorResult<Long> curVer = currentVersions.get(entitySets[i]);
                if (curVer == null || versions[i] >= curVer.get()) continue;
                keysToRemove.add((PlatformDotNetEntityFrameworkCacheKey)cacheEntry.getKey());
            }
        }
        cache.removeAll(keysToRemove);
    }

    public String toString() {
        return S.toString(PlatformDotNetEntityFrameworkCacheExtension.class, this);
    }

    private class CleanupCompletionListener
    implements IgniteInClosure<IgniteInternalFuture<?>> {
        private static final long serialVersionUID = 0L;
        private final Cache<CleanupNodeId, UUID> metaCache;
        private final String dataCacheName;

        private CleanupCompletionListener(Cache<CleanupNodeId, UUID> metaCache, String dataCacheName) {
            this.metaCache = metaCache;
            this.dataCacheName = dataCacheName;
        }

        @Override
        public void apply(IgniteInternalFuture<?> future) {
            this.metaCache.remove(CLEANUP_NODE_ID);
            PlatformDotNetEntityFrameworkCacheExtension.this.cleanupFlags.remove(this.dataCacheName);
        }
    }

    private static class RemoveOldEntriesRunnable
    implements IgniteRunnable {
        private static final long serialVersionUID = 0L;
        private final String dataCacheName;
        private final Map<String, EntryProcessorResult<Long>> currentVersions;
        @IgniteInstanceResource
        private Ignite ignite;

        private RemoveOldEntriesRunnable(String dataCacheName, Map<String, EntryProcessorResult<Long>> currentVersions) {
            this.dataCacheName = dataCacheName;
            this.currentVersions = currentVersions;
        }

        @Override
        public void run() {
            PlatformDotNetEntityFrameworkCacheExtension.removeOldEntries(this.ignite, this.dataCacheName, this.currentVersions);
        }
    }

    private static class CleanupNodeId {
        private CleanupNodeId() {
        }
    }
}

