/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.stream.impl;

import java.util.Collection;
import java.util.Map;
import java.util.PrimitiveIterator;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Stream;
import org.infinispan.AdvancedCache;
import org.infinispan.BaseCacheStream;
import org.infinispan.Cache;
import org.infinispan.CacheSet;
import org.infinispan.cache.impl.AbstractDelegatingCache;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.commons.util.CollectionFactory;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.configuration.cache.ClusteringConfiguration;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.context.Flag;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.lifecycle.ComponentStatus;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.DataRehashed;
import org.infinispan.notifications.cachelistener.event.DataRehashedEvent;
import org.infinispan.remoting.responses.ValidResponse;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.impl.SingleResponseCollector;
import org.infinispan.stream.impl.IteratorHandler;
import org.infinispan.stream.impl.IteratorResponse;
import org.infinispan.stream.impl.IteratorResponses;
import org.infinispan.stream.impl.KeyTrackingTerminalOperation;
import org.infinispan.stream.impl.LocalStreamManager;
import org.infinispan.stream.impl.SegmentAwareOperation;
import org.infinispan.stream.impl.StreamResponseCommand;
import org.infinispan.stream.impl.TerminalOperation;
import org.infinispan.stream.impl.intops.IntermediateOperation;
import org.infinispan.util.ByteString;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

@Listener(observation=Listener.Observation.POST)
public class LocalStreamManagerImpl<Original, K, V>
implements LocalStreamManager<Original, K> {
    private static final Log log = LogFactory.getLog(LocalStreamManagerImpl.class);
    private static final boolean trace = log.isTraceEnabled();
    private AdvancedCache<K, V> cache;
    @Inject
    private ComponentRegistry registry;
    @Inject
    private DistributionManager dm;
    @Inject
    private RpcManager rpc;
    @Inject
    private CommandsFactory factory;
    @Inject
    private IteratorHandler iteratorHandler;
    private boolean hasLoader;
    private boolean isReplicated;
    private int maxSegment;
    private Address localAddress;
    private final ConcurrentMap<Object, SegmentListener> changeListener = CollectionFactory.makeConcurrentMap();
    private ByteString cacheName;

    @Inject
    public void inject(Cache<K, V> cache) {
        this.cache = AbstractDelegatingCache.unwrapCache(cache).getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL, Flag.REMOTE_ITERATION);
    }

    @Start
    public void start() {
        this.cacheName = ByteString.fromString(this.cache.getName());
        this.hasLoader = this.cache.getCacheConfiguration().persistence().usingStores();
        ClusteringConfiguration clusteringConfiguration = this.cache.getCacheConfiguration().clustering();
        this.maxSegment = clusteringConfiguration.hash().numSegments();
        this.localAddress = this.rpc.getAddress();
        this.isReplicated = clusteringConfiguration.cacheMode().isReplicated();
        if (!this.isReplicated) {
            this.cache.addListener(this);
        }
    }

    @DataRehashed
    public void dataRehashed(DataRehashedEvent<K, V> event) {
        ConsistentHash startHash = event.getConsistentHashAtStart();
        ConsistentHash endHash = event.getConsistentHashAtEnd();
        boolean trace = log.isTraceEnabled();
        if (startHash != null && endHash != null) {
            if (trace) {
                log.tracef("Data rehash occurred startHash: %s and endHash: %s with new topology %s and was pre %s", startHash, endHash, event.getNewTopologyId(), event.isPre());
            }
            if (!this.changeListener.isEmpty()) {
                if (trace) {
                    log.tracef("Previous segments %s ", (Object)startHash.getSegmentsForOwner(this.localAddress));
                    log.tracef("After segments %s ", (Object)endHash.getSegmentsForOwner(this.localAddress));
                }
                IntSet beforeSegments = IntSets.mutableFrom(startHash.getSegmentsForOwner(this.localAddress));
                beforeSegments.removeAll(endHash.getSegmentsForOwner(this.localAddress));
                if (!beforeSegments.isEmpty()) {
                    for (Map.Entry entry : this.changeListener.entrySet()) {
                        if (trace) {
                            log.tracef("Notifying %s through SegmentChangeListener", entry.getKey());
                        }
                        ((SegmentListener)entry.getValue()).lostSegments(beforeSegments);
                    }
                } else if (trace) {
                    log.tracef("No segments have been removed from data rehash, no notification required", new Object[0]);
                }
            } else if (trace) {
                log.tracef("No change listeners present!", new Object[0]);
            }
        }
    }

    private AdvancedCache<K, V> getCacheRespectingLoader(boolean includeLoader) {
        if (this.hasLoader && !includeLoader) {
            return this.cache.getAdvancedCache().withFlags(Flag.SKIP_CACHE_LOAD);
        }
        return this.cache;
    }

    private CacheSet<Original> toOriginalSet(boolean includeLoader, boolean entryStream) {
        if (entryStream) {
            return this.getCacheRespectingLoader(includeLoader).cacheEntrySet();
        }
        return this.getCacheRespectingLoader(includeLoader).keySet();
    }

    private Stream<Original> getStream(CacheSet<Original> cacheEntrySet, boolean parallelStream, boolean entryStream, IntSet segments, Set<K> keysToInclude, Set<K> keysToExclude) {
        BaseCacheStream stream = (parallelStream ? cacheEntrySet.parallelStream() : cacheEntrySet.stream()).filterKeys(keysToInclude).filterKeySegments(segments);
        if (!keysToExclude.isEmpty()) {
            if (entryStream) {
                return stream.filter(e -> !keysToExclude.contains(((CacheEntry)e).getKey()));
            }
            return stream.filter(k -> !keysToExclude.contains(k));
        }
        return stream;
    }

    private Stream<Original> getRehashStream(CacheSet<Original> cacheEntrySet, Object requestId, SegmentListener listener, boolean parallelStream, boolean entryStream, IntSet segments, Set<K> keysToInclude, Set<K> keysToExclude) {
        if (listener != null) {
            this.handleSuspectSegmentsBeforeStream(requestId, listener, segments);
        }
        return this.getStream(cacheEntrySet, parallelStream, entryStream, segments, keysToInclude, keysToExclude);
    }

    private void handleSuspectSegmentsBeforeStream(Object requestId, SegmentListener listener, IntSet segments) {
        LocalizedCacheTopology topology = this.dm.getCacheTopology();
        if (trace) {
            log.tracef("Topology for supplier is %s for id %s", (Object)topology, requestId);
        }
        ConsistentHash readCH = topology.getCurrentCH();
        ConsistentHash pendingCH = topology.getPendingCH();
        if (pendingCH != null) {
            IntSet lostSegments = IntSets.mutableEmptySet(topology.getCurrentCH().getNumSegments());
            PrimitiveIterator.OfInt iterator = segments.iterator();
            while (iterator.hasNext()) {
                int segment = iterator.nextInt();
                if (pendingCH.isSegmentLocalToNode(this.localAddress, segment) && readCH.isSegmentLocalToNode(this.localAddress, segment)) continue;
                iterator.remove();
                lostSegments.set(segment);
            }
            if (!lostSegments.isEmpty()) {
                if (trace) {
                    log.tracef("Lost segments %s during rehash for id %s", (Object)lostSegments, requestId);
                }
                listener.lostSegments(lostSegments);
            } else if (trace) {
                log.tracef("Currently in the middle of a rehash for id %s", requestId);
            }
        } else {
            IntSet ourSegments = topology.getLocalReadSegments();
            if (segments.retainAll(ourSegments)) {
                if (trace) {
                    log.tracef("We found to be missing some segments requested for id %s", requestId);
                }
                listener.localSegments(ourSegments);
            } else if (trace) {
                log.tracef("Hash %s for id %s", (Object)readCH, requestId);
            }
        }
    }

    private void handleResponseError(CompletionStage<ValidResponse> rpcFuture, Object requestId, Address origin) {
        if (trace) {
            rpcFuture.whenComplete((response, e) -> {
                if (e != null) {
                    log.tracef((Throwable)e, "Encountered exception for %s sending response to %s", requestId, (Object)origin);
                } else if (response != null && !response.isSuccessful()) {
                    log.tracef("Unsuccessful response for %s sending response to %s", requestId, (Object)origin);
                } else {
                    log.tracef("Response successfully sent for %s", requestId);
                }
            });
        }
    }

    @Override
    public <R> void streamOperation(Object requestId, Address origin, boolean parallelStream, IntSet segments, Set<K> keysToInclude, Set<K> keysToExclude, boolean includeLoader, boolean entryStream, TerminalOperation<Original, R> operation) {
        if (trace) {
            log.tracef("Received operation request for id %s from %s for segments %s", requestId, (Object)origin, (Object)segments);
        }
        CacheSet<Original> originalSet = this.toOriginalSet(includeLoader, entryStream);
        operation.setSupplier(() -> this.getStream(originalSet, parallelStream, entryStream, segments, keysToInclude, keysToExclude));
        operation.handleInjection(this.registry);
        R value = operation.performOperation();
        StreamResponseCommand<R> command = this.factory.buildStreamResponseCommand(requestId, true, IntSets.immutableEmptySet(), value);
        CompletionStage<ValidResponse> completableFuture = this.rpc.invokeCommand(origin, command, SingleResponseCollector.validOnly(), this.rpc.getSyncRpcOptions());
        this.handleResponseError(completableFuture, requestId, origin);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public <R> void streamOperationRehashAware(Object requestId, Address origin, boolean parallelStream, IntSet segments, Set<K> keysToInclude, Set<K> keysToExclude, boolean includeLoader, boolean entryStream, TerminalOperation<Original, R> operation) {
        IntSet lostSegments;
        R value;
        SegmentListener listener;
        if (trace) {
            log.tracef("Received rehash aware operation request for id %s from %s for segments %s", requestId, (Object)origin, (Object)segments);
        }
        CacheSet<Original> originalSet = this.toOriginalSet(includeLoader, entryStream);
        operation.handleInjection(this.registry);
        if (this.isReplicated) {
            listener = null;
        } else {
            listener = new SegmentListener(segments, operation);
            this.changeListener.put(requestId, listener);
            if (trace) {
                log.tracef("Registered change listener for %s", requestId);
            }
        }
        try {
            operation.setSupplier(() -> this.getRehashStream(originalSet, requestId, listener, parallelStream, entryStream, segments, keysToInclude, keysToExclude));
            value = operation.performOperation();
            lostSegments = listener != null ? listener.segmentsLost : IntSets.immutableEmptySet();
            if (trace) {
                log.tracef("Request %s completed for segments %s with %s suspected segments", requestId, (Object)segments, (Object)lostSegments);
            }
        }
        finally {
            if (listener != null) {
                this.changeListener.remove(requestId);
                if (trace) {
                    log.tracef("UnRegistered change listener for %s", requestId);
                }
            }
        }
        this.sendRehashAwareResponse(requestId, origin, lostSegments, segments, value);
    }

    @Override
    public <R> void streamOperation(Object requestId, Address origin, boolean parallelStream, IntSet segments, Set<K> keysToInclude, Set<K> keysToExclude, boolean includeLoader, boolean entryStream, KeyTrackingTerminalOperation<Original, K, R> operation) {
        if (trace) {
            log.tracef("Received key aware operation request for id %s from %s for segments %s", requestId, (Object)origin, (Object)segments);
        }
        CacheSet<Original> originalSet = this.toOriginalSet(includeLoader, entryStream);
        operation.setSupplier(() -> this.getStream(originalSet, parallelStream, entryStream, segments, keysToInclude, keysToExclude));
        operation.handleInjection(this.registry);
        Collection<R> value = operation.performOperation(new NonRehashIntermediateCollector(origin, requestId, parallelStream));
        StreamResponseCommand<Collection<R>> command = this.factory.buildStreamResponseCommand(requestId, true, IntSets.immutableEmptySet(), value);
        CompletionStage<ValidResponse> completableFuture = this.rpc.invokeCommand(origin, command, SingleResponseCollector.validOnly(), this.rpc.getSyncRpcOptions());
        this.handleResponseError(completableFuture, requestId, origin);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void streamOperationRehashAware(Object requestId, Address origin, boolean parallelStream, IntSet segments, Set<K> keysToInclude, Set<K> keysToExclude, boolean includeLoader, boolean entryStream, KeyTrackingTerminalOperation<Original, K, ?> operation) {
        IntSet lostSegments;
        Collection<K> results;
        SegmentListener listener;
        if (trace) {
            log.tracef("Received key rehash aware operation request for id %s from %s for segments %s", requestId, (Object)origin, (Object)segments);
        }
        CacheSet<Original> originalSet = this.toOriginalSet(includeLoader, entryStream);
        operation.handleInjection(this.registry);
        if (this.isReplicated) {
            listener = null;
        } else {
            listener = new SegmentListener(segments, operation);
            this.changeListener.put(requestId, listener);
            if (trace) {
                log.tracef("Registered change listener for %s", requestId);
            }
        }
        try {
            operation.setSupplier(() -> this.getRehashStream(originalSet, requestId, listener, parallelStream, entryStream, segments, keysToInclude, keysToExclude));
            results = operation.performForEachOperation(new NonRehashIntermediateCollector(origin, requestId, parallelStream));
            lostSegments = listener != null ? listener.segmentsLost : IntSets.immutableEmptySet();
            if (trace) {
                log.tracef("Request %s completed segments %s with %s suspected segments", requestId, (Object)segments, (Object)lostSegments);
            }
        }
        finally {
            if (listener != null) {
                this.changeListener.remove(requestId);
                if (trace) {
                    log.tracef("UnRegistered change listener for %s", requestId);
                }
            }
        }
        this.sendRehashAwareResponse(requestId, origin, lostSegments, segments, results);
    }

    private <R> void sendRehashAwareResponse(Object requestId, Address origin, IntSet lostSegments, IntSet originalSegments, R value) {
        if (this.cache.getStatus() != ComponentStatus.RUNNING && this.cache.getStatus() != ComponentStatus.INITIALIZING) {
            if (lostSegments.isEmpty()) {
                lostSegments = originalSegments;
            } else {
                lostSegments.addAll(originalSegments);
            }
            if (trace) {
                log.tracef("Cache status is no longer running, all segments are now suspect for %s", requestId);
            }
            value = null;
        }
        StreamResponseCommand<R> command = this.factory.buildStreamResponseCommand(requestId, true, lostSegments, value);
        CompletionStage<ValidResponse> completableFuture = this.rpc.invokeCommand(origin, command, SingleResponseCollector.validOnly(), this.rpc.getSyncRpcOptions());
        this.handleResponseError(completableFuture, requestId, origin);
    }

    @Override
    public IteratorResponse startIterator(Object requestId, Address origin, IntSet segments, Set<K> keysToInclude, Set<K> keysToExclude, boolean includeLoader, boolean entryStream, Iterable<IntermediateOperation> intermediateOperations, long batchSize) {
        if (trace) {
            log.tracef("Received rehash aware operation request to start iterator for id %s from %s for segments %s", requestId, (Object)origin, (Object)segments);
        }
        CacheSet<Original> originalSet = this.toOriginalSet(includeLoader, entryStream);
        SegmentListener listener = new SegmentListener(segments, i -> true);
        Runnable listenerClosedRunnable = () -> {
            this.changeListener.remove(requestId);
            if (this.cache.getStatus() != ComponentStatus.RUNNING && this.cache.getStatus() != ComponentStatus.INITIALIZING) {
                if (trace) {
                    log.tracef("Cache status is no longer running after completing iterator, all segments are now suspect for %s", requestId);
                }
                listener.segmentsLost.addAll(segments);
            }
        };
        if (this.changeListener.putIfAbsent(requestId, listener) != null) {
            throw new IllegalStateException("Iterator was already created for id " + requestId);
        }
        if (trace) {
            log.tracef("Registered change listener for %s", requestId);
        }
        IteratorHandler.OnCloseIterator<Object> iterator = this.iteratorHandler.start(origin, () -> this.getRehashStream(originalSet, requestId, this.isReplicated ? null : listener, false, entryStream, segments, keysToInclude, keysToExclude), intermediateOperations, requestId);
        iterator.onClose(listenerClosedRunnable);
        return new IteratorResponses.RemoteResponse(iterator, listener.segmentsLost, batchSize);
    }

    @Override
    public IteratorResponse continueIterator(Object requestId, long batchSize) {
        CloseableIterator<Object> iterator = this.iteratorHandler.getIterator(requestId);
        return new IteratorResponses.RemoteResponse(iterator, ((SegmentListener)this.changeListener.get(requestId)).segmentsLost, batchSize);
    }

    class NonRehashIntermediateCollector<R>
    implements KeyTrackingTerminalOperation.IntermediateCollector<R> {
        private final Address origin;
        private final Object requestId;
        private final boolean useManagedBlocker;

        NonRehashIntermediateCollector(Address origin, Object requestId, boolean useManagedBlocker) {
            this.origin = origin;
            this.requestId = requestId;
            this.useManagedBlocker = useManagedBlocker;
        }

        @Override
        public void sendDataResonse(R response) {
            if (this.useManagedBlocker) {
                try {
                    ForkJoinPool.managedBlock(new ResponseBlocker(response));
                }
                catch (InterruptedException e) {
                    throw new CacheException(e);
                }
            } else {
                StreamResponseCommand<R> command = new StreamResponseCommand<R>(LocalStreamManagerImpl.this.cacheName, LocalStreamManagerImpl.this.localAddress, this.requestId, false, response);
                LocalStreamManagerImpl.this.rpc.blocking(LocalStreamManagerImpl.this.rpc.invokeCommand(this.origin, command, SingleResponseCollector.validOnly(), LocalStreamManagerImpl.this.rpc.getSyncRpcOptions()));
            }
        }

        class ResponseBlocker
        implements ForkJoinPool.ManagedBlocker {
            private final R response;
            private boolean completed = false;

            ResponseBlocker(R response) {
                this.response = response;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public boolean block() throws InterruptedException {
                if (!this.completed) {
                    NonRehashIntermediateCollector nonRehashIntermediateCollector = NonRehashIntermediateCollector.this;
                    synchronized (nonRehashIntermediateCollector) {
                        StreamResponseCommand command = new StreamResponseCommand(LocalStreamManagerImpl.this.cacheName, LocalStreamManagerImpl.this.localAddress, NonRehashIntermediateCollector.this.requestId, false, this.response);
                        LocalStreamManagerImpl.this.rpc.blocking(LocalStreamManagerImpl.this.rpc.invokeCommand(NonRehashIntermediateCollector.this.origin, command, SingleResponseCollector.validOnly(), LocalStreamManagerImpl.this.rpc.getSyncRpcOptions()));
                    }
                }
                this.completed = true;
                return this.completed;
            }

            @Override
            public boolean isReleasable() {
                return this.completed;
            }
        }
    }

    class SegmentListener {
        private final IntSet segments;
        private final SegmentAwareOperation op;
        private final IntSet segmentsLost;

        SegmentListener(IntSet segments, SegmentAwareOperation op) {
            this.segments = IntSets.mutableCopyFrom(segments);
            this.op = op;
            this.segmentsLost = IntSets.concurrentSet(LocalStreamManagerImpl.this.maxSegment);
        }

        public void localSegments(IntSet localSegments) {
            PrimitiveIterator.OfInt segmentIterator = this.segments.iterator();
            while (segmentIterator.hasNext()) {
                int segment = segmentIterator.nextInt();
                if (localSegments.contains(segment)) continue;
                if (trace) {
                    log.tracef("Could not process segment %s", segment);
                }
                this.segmentsLost.add(segment);
            }
        }

        public void lostSegments(IntSet lostSegments) {
            PrimitiveIterator.OfInt segmentIterator = lostSegments.iterator();
            while (segmentIterator.hasNext()) {
                int segment = segmentIterator.nextInt();
                if (!this.segments.contains(segment)) continue;
                if (trace) {
                    log.tracef("Lost segment %s", segment);
                }
                if (!this.op.lostSegment(false) || !this.segmentsLost.add(segment) || this.segmentsLost.size() != this.segments.size()) continue;
                if (trace) {
                    log.tracef("All segments %s are now lost", (Object)this.segments);
                }
                this.op.lostSegment(true);
            }
        }
    }
}

