/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;

import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfoCollectSwapListener;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgnitePredicate;

class GridDhtPartitionSupplier {
    private final GridCacheContext<?, ?> cctx;
    private final IgniteLogger log;
    private GridDhtPartitionTopology top;
    private final boolean depEnabled;
    private IgnitePredicate<GridCacheEntryInfo> preloadPred;
    private final Map<T3<UUID, Integer, AffinityTopologyVersion>, SupplyContext> scMap = new HashMap<T3<UUID, Integer, AffinityTopologyVersion>, SupplyContext>();

    GridDhtPartitionSupplier(GridCacheContext<?, ?> cctx) {
        assert (cctx != null);
        this.cctx = cctx;
        this.log = cctx.logger(this.getClass());
        this.top = cctx.dht().topology();
        this.depEnabled = cctx.gridDeploy().enabled();
    }

    void start() {
        this.startOldListeners();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void stop() {
        Map<T3<UUID, Integer, AffinityTopologyVersion>, SupplyContext> map = this.scMap;
        synchronized (map) {
            Iterator<T3<UUID, Integer, AffinityTopologyVersion>> it = this.scMap.keySet().iterator();
            while (it.hasNext()) {
                T3<UUID, Integer, AffinityTopologyVersion> t = it.next();
                GridDhtPartitionSupplier.clearContext(this.scMap.get(t), this.log);
                it.remove();
            }
        }
        this.stopOldListeners();
    }

    private static void clearContext(SupplyContext sc, IgniteLogger log) {
        if (sc != null) {
            GridDhtLocalPartition loc;
            Iterator it = sc.entryIt;
            if (it != null && it instanceof GridCloseableIterator && !((GridCloseableIterator)it).isClosed()) {
                try {
                    ((GridCloseableIterator)it).close();
                }
                catch (IgniteCheckedException e) {
                    U.error(log, "Iterator close failed.", e);
                }
            }
            if ((loc = sc.loc) != null) {
                assert (loc.reservations() > 0);
                loc.release();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onTopologyChanged(AffinityTopologyVersion topVer) {
        Map<T3<UUID, Integer, AffinityTopologyVersion>, SupplyContext> map = this.scMap;
        synchronized (map) {
            Iterator<T3<UUID, Integer, AffinityTopologyVersion>> it = this.scMap.keySet().iterator();
            while (it.hasNext()) {
                T3<UUID, Integer, AffinityTopologyVersion> t = it.next();
                if (topVer.compareTo((AffinityTopologyVersion)t.get3()) <= 0) continue;
                GridDhtPartitionSupplier.clearContext(this.scMap.get(t), this.log);
                it.remove();
                if (!this.log.isDebugEnabled()) continue;
                this.log.debug("Supply context removed [node=" + t.get1() + "]");
            }
        }
    }

    void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
        this.preloadPred = preloadPred;
    }

    /*
     * Exception decompiling
     */
    public void handleDemandMessage(int idx, UUID id, GridDhtPartitionDemandMessage d) {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Tried to end blocks [3[TRYBLOCK]], but top level block is 20[WHILELOOP]
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.processEndingBlocks(Op04StructuredStatement.java:435)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:484)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessageV2 s, T3<UUID, Integer, AffinityTopologyVersion> scId) throws IgniteCheckedException {
        try {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
            }
            this.cctx.io().sendOrderedMessage(n, d.topic(), s, this.cctx.ioPolicy(), d.timeout());
            if (this.cctx.config().getRebalanceThrottle() > 0L) {
                U.sleep(this.cctx.config().getRebalanceThrottle());
            }
            return true;
        }
        catch (ClusterTopologyCheckedException ignore) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Failed to send partition supply message because node left grid: " + n.id());
            }
            Map<T3<UUID, Integer, AffinityTopologyVersion>, SupplyContext> map = this.scMap;
            synchronized (map) {
                GridDhtPartitionSupplier.clearContext(this.scMap.remove(scId), this.log);
            }
            return false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void saveSupplyContext(T3<UUID, Integer, AffinityTopologyVersion> t, SupplyContextPhase phase, Iterator<Integer> partIt, int part, Iterator<?> entryIt, GridCacheEntryInfoCollectSwapListener swapLsnr, GridDhtLocalPartition loc, AffinityTopologyVersion topVer, long updateSeq) {
        Map<T3<UUID, Integer, AffinityTopologyVersion>, SupplyContext> map = this.scMap;
        synchronized (map) {
            if (this.cctx.affinity().affinityTopologyVersion().equals(topVer)) {
                assert (this.scMap.get(t) == null);
                this.scMap.put(t, new SupplyContext(phase, partIt, entryIt, swapLsnr, part, loc, updateSeq));
            } else if (loc != null) {
                assert (loc.reservations() > 0);
                loc.release();
            }
        }
    }

    @Deprecated
    public void startOldListeners() {
        if (!this.cctx.kernalContext().clientNode() && this.cctx.rebalanceEnabled()) {
            this.cctx.io().addHandler(this.cctx.cacheId(), GridDhtPartitionDemandMessage.class, (IgniteBiInClosure<UUID, ? extends GridCacheMessage>)new CI2<UUID, GridDhtPartitionDemandMessage>(){

                @Override
                public void apply(UUID id, GridDhtPartitionDemandMessage m) {
                    GridDhtPartitionSupplier.this.processOldDemandMessage(m, id);
                }
            });
        }
    }

    @Deprecated
    public void stopOldListeners() {
        if (!this.cctx.kernalContext().clientNode() && this.cctx.rebalanceEnabled()) {
            this.cctx.io().removeHandler(this.cctx.cacheId(), GridDhtPartitionDemandMessage.class);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Could not resolve type clashes
     */
    @Deprecated
    private void processOldDemandMessage(GridDhtPartitionDemandMessage d, UUID id) {
        GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(), this.cctx.cacheId(), this.cctx.deploymentEnabled());
        ClusterNode node = this.cctx.node(id);
        if (node == null) {
            return;
        }
        long preloadThrottle = this.cctx.config().getRebalanceThrottle();
        boolean ack = false;
        try {
            for (int part : d.partitions()) {
                GridDhtLocalPartition loc = this.top.localPartition(part, d.topologyVersion(), false);
                if (loc == null || loc.state() != GridDhtPartitionState.OWNING || !loc.reserve()) {
                    s.missed(part);
                    if (!this.log.isDebugEnabled()) continue;
                    this.log.debug("Requested partition is not owned by local node [part=" + part + ", demander=" + id + ']');
                    continue;
                }
                GridCacheEntryInfoCollectSwapListener swapLsnr = null;
                try {
                    GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter;
                    if (this.cctx.isSwapOrOffheapEnabled()) {
                        swapLsnr = new GridCacheEntryInfoCollectSwapListener(this.log);
                        this.cctx.swap().addOffHeapListener(part, swapLsnr);
                        this.cctx.swap().addSwapListener(part, swapLsnr);
                    }
                    boolean partMissing = false;
                    for (GridCacheEntryEx e : loc.allEntries(new CacheEntryPredicate[0])) {
                        GridCacheEntryInfo info;
                        if (!this.cctx.affinity().partitionBelongs(node, part, d.topologyVersion())) {
                            s.missed(part);
                            if (this.log.isDebugEnabled()) {
                                this.log.debug("Demanding node does not need requested partition [part=" + part + ", nodeId=" + id + ']');
                            }
                            partMissing = true;
                            break;
                        }
                        if (s.messageSize() >= this.cctx.config().getRebalanceBatchSize()) {
                            ack = true;
                            if (!this.replyOld(node, d, s)) {
                                return;
                            }
                            if (preloadThrottle > 0L) {
                                U.sleep(preloadThrottle);
                            }
                            s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(), this.cctx.cacheId(), this.cctx.deploymentEnabled());
                        }
                        if ((info = e.info()) == null || info.isNew()) continue;
                        if (this.preloadPred == null || this.preloadPred.apply(info)) {
                            s.addEntry(part, info, this.cctx);
                            continue;
                        }
                        if (!this.log.isDebugEnabled()) continue;
                        this.log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " + info);
                    }
                    if (partMissing) continue;
                    if (this.cctx.isSwapOrOffheapEnabled() && (iter = this.cctx.swap().iterator(part)) != null) {
                        try {
                            boolean prepared = false;
                            for (Map.Entry e : iter) {
                                ClassLoader ldr;
                                if (!this.cctx.affinity().partitionBelongs(node, part, d.topologyVersion())) {
                                    s.missed(part);
                                    if (this.log.isDebugEnabled()) {
                                        this.log.debug("Demanding node does not need requested partition [part=" + part + ", nodeId=" + id + ']');
                                    }
                                    partMissing = true;
                                    break;
                                }
                                if (s.messageSize() >= this.cctx.config().getRebalanceBatchSize()) {
                                    ack = true;
                                    if (!this.replyOld(node, d, s)) {
                                        return;
                                    }
                                    if (preloadThrottle > 0L) {
                                        U.sleep(preloadThrottle);
                                    }
                                    s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(), this.cctx.cacheId(), this.cctx.deploymentEnabled());
                                }
                                GridCacheSwapEntry swapEntry = (GridCacheSwapEntry)e.getValue();
                                GridCacheEntryInfo info = new GridCacheEntryInfo();
                                info.keyBytes((byte[])e.getKey());
                                info.ttl(swapEntry.ttl());
                                info.expireTime(swapEntry.expireTime());
                                info.version(swapEntry.version());
                                info.value(swapEntry.value());
                                if (this.preloadPred != null && !this.preloadPred.apply(info)) {
                                    if (!this.log.isDebugEnabled()) continue;
                                    this.log.debug("Rebalance predicate evaluated to false (will not send cache entry): " + info);
                                    continue;
                                }
                                s.addEntry0(part, info, this.cctx);
                                if (!this.depEnabled || prepared || (ldr = swapEntry.keyClassLoaderId() != null ? this.cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) : (swapEntry.valueClassLoaderId() != null ? this.cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) : null)) == null || !(ldr instanceof GridDeploymentInfo)) continue;
                                s.prepare((GridDeploymentInfo)((Object)ldr));
                                prepared = true;
                            }
                            if (partMissing) {
                                continue;
                            }
                        }
                        finally {
                            iter.close();
                            continue;
                        }
                    }
                    if (swapLsnr != null) {
                        this.cctx.swap().removeOffHeapListener(part, swapLsnr);
                        this.cctx.swap().removeSwapListener(part, swapLsnr);
                    }
                    if (swapLsnr != null) {
                        Collection<GridCacheEntryInfo> entries = swapLsnr.entries();
                        swapLsnr = null;
                        for (GridCacheEntryInfo info : entries) {
                            if (!this.cctx.affinity().partitionBelongs(node, part, d.topologyVersion())) {
                                s.missed(part);
                                if (!this.log.isDebugEnabled()) break;
                                this.log.debug("Demanding node does not need requested partition [part=" + part + ", nodeId=" + id + ']');
                                break;
                            }
                            if (s.messageSize() >= this.cctx.config().getRebalanceBatchSize()) {
                                ack = true;
                                if (!this.replyOld(node, d, s)) {
                                    return;
                                }
                                s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(), this.cctx.cacheId(), this.cctx.deploymentEnabled());
                            }
                            if (this.preloadPred == null || this.preloadPred.apply(info)) {
                                s.addEntry(part, info, this.cctx);
                                continue;
                            }
                            if (!this.log.isDebugEnabled()) continue;
                            this.log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " + info);
                        }
                    }
                    s.last(part);
                    if (!ack) continue;
                    s.markAck();
                    break;
                }
                finally {
                    loc.release();
                    if (swapLsnr == null) continue;
                    this.cctx.swap().removeOffHeapListener(part, swapLsnr);
                    this.cctx.swap().removeSwapListener(part, swapLsnr);
                }
            }
            this.replyOld(node, d, s);
        }
        catch (IgniteCheckedException e) {
            U.error(this.log, "Failed to send partition supply message to node: " + node.id(), e);
        }
    }

    @Deprecated
    private boolean replyOld(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s) throws IgniteCheckedException {
        try {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
            }
            this.cctx.io().sendOrderedMessage(n, d.topic(), s, this.cctx.ioPolicy(), d.timeout());
            return true;
        }
        catch (ClusterTopologyCheckedException ignore) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Failed to send partition supply message because node left grid: " + n.id());
            }
            return false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void dumpDebugInfo() {
        Map<T3<UUID, Integer, AffinityTopologyVersion>, SupplyContext> map = this.scMap;
        synchronized (map) {
            if (!this.scMap.isEmpty()) {
                U.warn(this.log, "Rebalancing supplier reserved following partitions:");
                for (SupplyContext sc : this.scMap.values()) {
                    if (sc.loc == null) continue;
                    U.warn(this.log, ">>> " + sc.loc);
                }
            }
        }
    }

    private static class SupplyContext {
        private final SupplyContextPhase phase;
        @GridToStringExclude
        private final Iterator<Integer> partIt;
        @GridToStringExclude
        private final Iterator<?> entryIt;
        @GridToStringExclude
        private final GridCacheEntryInfoCollectSwapListener swapLsnr;
        private final int part;
        private final GridDhtLocalPartition loc;
        private final long updateSeq;

        public SupplyContext(SupplyContextPhase phase, Iterator<Integer> partIt, Iterator<?> entryIt, GridCacheEntryInfoCollectSwapListener swapLsnr, int part, GridDhtLocalPartition loc, long updateSeq) {
            this.phase = phase;
            this.partIt = partIt;
            this.entryIt = entryIt;
            this.swapLsnr = swapLsnr;
            this.part = part;
            this.loc = loc;
            this.updateSeq = updateSeq;
        }

        public String toString() {
            return S.toString(SupplyContext.class, this);
        }

        static /* synthetic */ long access$200(SupplyContext x0) {
            return x0.updateSeq;
        }

        static /* synthetic */ SupplyContextPhase access$300(SupplyContext x0) {
            return x0.phase;
        }

        static /* synthetic */ Iterator access$400(SupplyContext x0) {
            return x0.partIt;
        }

        static /* synthetic */ int access$500(SupplyContext x0) {
            return x0.part;
        }

        static /* synthetic */ GridCacheEntryInfoCollectSwapListener access$600(SupplyContext x0) {
            return x0.swapLsnr;
        }
    }

    private static enum SupplyContextPhase {
        NEW,
        ONHEAP,
        SWAP,
        EVICTED;

    }
}

