/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.stream.impl;

import io.reactivex.Flowable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.PrimitiveIterator;
import java.util.Set;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.function.ToDoubleFunction;
import java.util.function.ToIntFunction;
import java.util.function.ToLongFunction;
import java.util.stream.BaseStream;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.DoubleStream;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import org.infinispan.BaseCacheStream;
import org.infinispan.Cache;
import org.infinispan.CacheStream;
import org.infinispan.DoubleCacheStream;
import org.infinispan.IntCacheStream;
import org.infinispan.LongCacheStream;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.marshall.Externalizer;
import org.infinispan.commons.marshall.SerializeWith;
import org.infinispan.commons.util.AbstractIterator;
import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.commons.util.Closeables;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.commons.util.IteratorMapper;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.PersistenceConfiguration;
import org.infinispan.configuration.cache.StoreConfiguration;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.remoting.transport.Address;
import org.infinispan.stream.impl.AbstractCacheStream;
import org.infinispan.stream.impl.ClusterStreamManager;
import org.infinispan.stream.impl.CompletionRehashPublisherDecorator;
import org.infinispan.stream.impl.DistributedDoubleCacheStream;
import org.infinispan.stream.impl.DistributedIntCacheStream;
import org.infinispan.stream.impl.DistributedLongCacheStream;
import org.infinispan.stream.impl.IdentityPublisherDecorator;
import org.infinispan.stream.impl.IntermediateCacheStream;
import org.infinispan.stream.impl.PriorityMergingProcessor;
import org.infinispan.stream.impl.PublisherDecorator;
import org.infinispan.stream.impl.RehashPublisherDecorator;
import org.infinispan.stream.impl.TerminalFunctions;
import org.infinispan.stream.impl.intops.IntermediateOperation;
import org.infinispan.stream.impl.intops.object.DistinctOperation;
import org.infinispan.stream.impl.intops.object.FilterOperation;
import org.infinispan.stream.impl.intops.object.FlatMapOperation;
import org.infinispan.stream.impl.intops.object.FlatMapToDoubleOperation;
import org.infinispan.stream.impl.intops.object.FlatMapToIntOperation;
import org.infinispan.stream.impl.intops.object.FlatMapToLongOperation;
import org.infinispan.stream.impl.intops.object.LimitOperation;
import org.infinispan.stream.impl.intops.object.MapOperation;
import org.infinispan.stream.impl.intops.object.MapToDoubleOperation;
import org.infinispan.stream.impl.intops.object.MapToIntOperation;
import org.infinispan.stream.impl.intops.object.MapToLongOperation;
import org.infinispan.stream.impl.intops.object.PeekOperation;
import org.infinispan.stream.impl.termop.object.ForEachBiOperation;
import org.infinispan.stream.impl.termop.object.ForEachOperation;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.reactivestreams.Publisher;

public class DistributedCacheStream<Original, R>
extends AbstractCacheStream<Original, R, Stream<R>, CacheStream<R>>
implements CacheStream<R> {
    private static final Log log = LogFactory.getLog(MethodHandles.lookup().lookupClass());
    private final boolean writeBehindShared;

    protected static <R> Supplier<CacheStream<R>> supplierStreamCast(Supplier supplier) {
        return supplier;
    }

    public DistributedCacheStream(Address localAddress, boolean parallel, DistributionManager dm, Supplier<CacheStream<R>> supplier, ClusterStreamManager csm, boolean includeLoader, int distributedBatchSize, Executor executor, ComponentRegistry registry, Function<? super Original, ?> toKeyFunction) {
        super(localAddress, parallel, dm, DistributedCacheStream.supplierStreamCast(supplier), csm, includeLoader, distributedBatchSize, executor, registry, toKeyFunction);
        Configuration configuration = registry.getComponent(Configuration.class);
        this.writeBehindShared = this.hasWriteBehindSharedStore(configuration.persistence());
    }

    protected DistributedCacheStream(AbstractCacheStream other) {
        super(other);
        Configuration configuration = this.registry.getComponent(Configuration.class);
        this.writeBehindShared = this.hasWriteBehindSharedStore(configuration.persistence());
    }

    boolean hasWriteBehindSharedStore(PersistenceConfiguration persistenceConfiguration) {
        for (StoreConfiguration storeConfiguration : persistenceConfiguration.stores()) {
            if (!storeConfiguration.shared() || !storeConfiguration.async().enabled()) continue;
            return true;
        }
        return false;
    }

    @Override
    protected Log getLog() {
        return log;
    }

    @Override
    protected CacheStream<R> unwrap() {
        return this;
    }

    @Override
    public CacheStream<R> filter(Predicate<? super R> predicate) {
        return (CacheStream)this.addIntermediateOperation(new FilterOperation<R>(predicate));
    }

    @Override
    public <R1> CacheStream<R1> map(Function<? super R, ? extends R1> mapper) {
        if (this.iteratorOperation != AbstractCacheStream.IteratorOperation.FLAT_MAP) {
            this.iteratorOperation = AbstractCacheStream.IteratorOperation.MAP;
        }
        this.addIntermediateOperationMap(new MapOperation<R, R1>(mapper));
        return this;
    }

    @Override
    public IntCacheStream mapToInt(ToIntFunction<? super R> mapper) {
        if (this.iteratorOperation != AbstractCacheStream.IteratorOperation.FLAT_MAP) {
            this.iteratorOperation = AbstractCacheStream.IteratorOperation.MAP;
        }
        this.addIntermediateOperationMap(new MapToIntOperation<R>(mapper));
        return this.intCacheStream();
    }

    @Override
    public LongCacheStream mapToLong(ToLongFunction<? super R> mapper) {
        if (this.iteratorOperation != AbstractCacheStream.IteratorOperation.FLAT_MAP) {
            this.iteratorOperation = AbstractCacheStream.IteratorOperation.MAP;
        }
        this.addIntermediateOperationMap(new MapToLongOperation<R>(mapper));
        return this.longCacheStream();
    }

    @Override
    public DoubleCacheStream mapToDouble(ToDoubleFunction<? super R> mapper) {
        if (this.iteratorOperation != AbstractCacheStream.IteratorOperation.FLAT_MAP) {
            this.iteratorOperation = AbstractCacheStream.IteratorOperation.MAP;
        }
        this.addIntermediateOperationMap(new MapToDoubleOperation<R>(mapper));
        return this.doubleCacheStream();
    }

    @Override
    public <R1> CacheStream<R1> flatMap(Function<? super R, ? extends Stream<? extends R1>> mapper) {
        this.iteratorOperation = AbstractCacheStream.IteratorOperation.FLAT_MAP;
        this.addIntermediateOperationMap(new FlatMapOperation(mapper));
        return this;
    }

    @Override
    public IntCacheStream flatMapToInt(Function<? super R, ? extends IntStream> mapper) {
        this.iteratorOperation = AbstractCacheStream.IteratorOperation.FLAT_MAP;
        this.addIntermediateOperationMap(new FlatMapToIntOperation<R>(mapper));
        return this.intCacheStream();
    }

    @Override
    public LongCacheStream flatMapToLong(Function<? super R, ? extends LongStream> mapper) {
        this.iteratorOperation = AbstractCacheStream.IteratorOperation.FLAT_MAP;
        this.addIntermediateOperationMap(new FlatMapToLongOperation<R>(mapper));
        return this.longCacheStream();
    }

    @Override
    public DoubleCacheStream flatMapToDouble(Function<? super R, ? extends DoubleStream> mapper) {
        this.iteratorOperation = AbstractCacheStream.IteratorOperation.FLAT_MAP;
        this.addIntermediateOperationMap(new FlatMapToDoubleOperation<R>(mapper));
        return this.doubleCacheStream();
    }

    @Override
    public CacheStream<R> distinct() {
        this.addIntermediateOperation(DistinctOperation.getInstance());
        return new IntermediateCacheStream(this).distinct();
    }

    @Override
    public CacheStream<R> sorted() {
        return new IntermediateCacheStream(this).sorted();
    }

    @Override
    public CacheStream<R> sorted(Comparator<? super R> comparator) {
        return new IntermediateCacheStream(this).sorted((Comparator)comparator);
    }

    @Override
    public CacheStream<R> peek(Consumer<? super R> action) {
        return (CacheStream)this.addIntermediateOperation(new PeekOperation<R>(action));
    }

    @Override
    public CacheStream<R> limit(long maxSize) {
        this.addIntermediateOperation(new LimitOperation(maxSize));
        return new IntermediateCacheStream(this).limit(maxSize);
    }

    @Override
    public CacheStream<R> skip(long n) {
        return new IntermediateCacheStream(this).skip(n);
    }

    @Override
    public R reduce(R identity, BinaryOperator<R> accumulator) {
        return this.performOperation(TerminalFunctions.reduceFunction(identity, accumulator), true, accumulator, null);
    }

    @Override
    public Optional<R> reduce(BinaryOperator<R> accumulator) {
        R value = this.performOperation(TerminalFunctions.reduceFunction(accumulator), true, (e1, e2) -> {
            if (e1 != null) {
                if (e2 != null) {
                    return accumulator.apply(e1, e2);
                }
                return e1;
            }
            return e2;
        }, null);
        return Optional.ofNullable(value);
    }

    @Override
    public <U> U reduce(U identity, BiFunction<U, ? super R, U> accumulator, BinaryOperator<U> combiner) {
        return this.performOperation(TerminalFunctions.reduceFunction(identity, accumulator, combiner), true, combiner, null);
    }

    @Override
    public <R1> R1 collect(Supplier<R1> supplier, BiConsumer<R1, ? super R> accumulator, BiConsumer<R1, R1> combiner) {
        return this.performOperation(TerminalFunctions.collectFunction(supplier, accumulator, combiner), true, (e1, e2) -> {
            combiner.accept(e1, e2);
            return e1;
        }, null);
    }

    @Override
    public <R1, A> R1 collect(Collector<? super R, A, R1> collector) {
        if (collector.characteristics().contains((Object)Collector.Characteristics.IDENTITY_FINISH)) {
            return this.performOperation(TerminalFunctions.collectorFunction(collector), true, collector.combiner(), null);
        }
        Object intermediateResult = this.performOperation(TerminalFunctions.collectorFunction(new IdentifyFinishCollector<R, A>(collector)), true, collector.combiner(), null);
        return collector.finisher().apply(intermediateResult);
    }

    @Override
    public Optional<R> min(Comparator<? super R> comparator) {
        R value = this.performOperation(TerminalFunctions.minFunction(comparator), false, (e1, e2) -> {
            if (e1 != null) {
                if (e2 != null) {
                    return comparator.compare((Object)e1, (Object)e2) > 0 ? e2 : e1;
                }
                return e1;
            }
            return e2;
        }, null);
        return Optional.ofNullable(value);
    }

    @Override
    public Optional<R> max(Comparator<? super R> comparator) {
        R value = this.performOperation(TerminalFunctions.maxFunction(comparator), false, (e1, e2) -> {
            if (e1 != null) {
                if (e2 != null) {
                    return comparator.compare((Object)e1, (Object)e2) > 0 ? e1 : e2;
                }
                return e1;
            }
            return e2;
        }, null);
        return Optional.ofNullable(value);
    }

    @Override
    public boolean anyMatch(Predicate<? super R> predicate) {
        return this.performOperation(TerminalFunctions.anyMatchFunction(predicate), false, Boolean::logicalOr, b -> b);
    }

    @Override
    public boolean allMatch(Predicate<? super R> predicate) {
        return this.performOperation(TerminalFunctions.allMatchFunction(predicate), false, Boolean::logicalAnd, b -> b == false);
    }

    @Override
    public boolean noneMatch(Predicate<? super R> predicate) {
        return this.performOperation(TerminalFunctions.noneMatchFunction(predicate), false, Boolean::logicalAnd, b -> b == false);
    }

    @Override
    public Optional<R> findFirst() {
        return this.findAny();
    }

    @Override
    public Optional<R> findAny() {
        Object value = this.performOperation(TerminalFunctions.findAnyFunction(), false, (r1, r2) -> r1 == null ? r2 : r1, Objects::nonNull);
        return Optional.ofNullable(value);
    }

    @Override
    public long count() {
        return this.performOperation(TerminalFunctions.countFunction(), true, (l1, l2) -> l1 + l2, null);
    }

    @Override
    public Iterator<R> iterator() {
        log.tracef("Distributed iterator invoked with rehash: %s", (Object)this.rehashAware);
        if (!this.rehashAware) {
            CloseableIterator closeableIterator = this.nonRehashRemoteIterator(this.dm.getReadConsistentHash(), this.segmentsToFilter, null, IdentityPublisherDecorator.getInstance(), this.intermediateOperations);
            this.onClose(closeableIterator::close);
            return closeableIterator;
        }
        Iterable<IntermediateOperation> ops = this.iteratorOperation.prepareForIteration(this.intermediateOperations, this.nonNullKeyFunction());
        RehashIterator closeableIterator = this.segmentCompletionListener != null && this.iteratorOperation != AbstractCacheStream.IteratorOperation.FLAT_MAP ? new CompletionListenerRehashIterator(ops, this.segmentCompletionListener) : new RehashIterator(ops);
        this.onClose(closeableIterator::close);
        Function function = this.iteratorOperation.getFunction();
        if (function != null) {
            return new IteratorMapper(closeableIterator, function);
        }
        return closeableIterator;
    }

    <S> Publisher<S> localPublisher(IntSet segmentsToFilter, ConsistentHash ch, Set<Object> excludedKeys, Iterable<IntermediateOperation> intermediateOperations, boolean stayLocal) {
        Supplier supplier = this.supplierForSegments(ch, segmentsToFilter, excludedKeys, stayLocal);
        BaseStream stream = supplier.get();
        for (IntermediateOperation intermediateOperation : intermediateOperations) {
            stream = intermediateOperation.perform(stream);
        }
        BaseStream innerStream = stream;
        return Flowable.fromIterable(() -> innerStream.iterator());
    }

    <S> CloseableIterator<S> nonRehashRemoteIterator(ConsistentHash ch, IntSet segmentsToFilter, IntFunction<Set<Object>> keysToExclude, PublisherDecorator<S> publisherFunction, Iterable<IntermediateOperation> intermediateOperations) {
        Publisher<Object> localPublisher;
        boolean stayLocal;
        if (ch.getMembers().contains(this.localAddress)) {
            IntSet ownedSegments = IntSets.from(ch.getSegmentsForOwner(this.localAddress));
            stayLocal = this.writeBehindShared ? false : (segmentsToFilter == null ? ownedSegments.size() == ch.getNumSegments() : ownedSegments.containsAll(segmentsToFilter));
            Publisher<S> innerPublisher = this.localPublisher(segmentsToFilter, ch, keysToExclude == null ? Collections.emptySet() : (segmentsToFilter == null ? IntStream.range(0, ch.getNumSegments()) : segmentsToFilter.intStream()).mapToObj(i -> ((Set)keysToExclude.apply(i)).stream()).flatMap(Function.identity()).collect(Collectors.toSet()), intermediateOperations, !stayLocal);
            localPublisher = publisherFunction.decorateLocal(ch, stayLocal, segmentsToFilter, innerPublisher);
        } else {
            stayLocal = false;
            localPublisher = Flowable.empty();
        }
        if (stayLocal) {
            return Closeables.iterator(Flowable.fromPublisher(localPublisher).blockingIterable().iterator());
        }
        Map<Address, IntSet> targets = this.determineTargets(ch, segmentsToFilter);
        Iterator<Map.Entry<Address, IntSet>> targetIter = targets.entrySet().iterator();
        int publisherAmount = Math.min(4, targets.size());
        if (this.parallelDistribution == Boolean.FALSE || this.distributedBatchSize < publisherAmount) {
            Supplier<Map.Entry<Address, IntSet>> supplier = () -> targetIter.hasNext() ? (Map.Entry)targetIter.next() : null;
            ClusterStreamManager.RemoteIteratorPublisher remotePublisher = this.csm.remoteIterationPublisher(false, supplier, this.keysToFilter, keysToExclude, this.includeLoader, this.toKeyFunction != null, intermediateOperations);
            Publisher<S> publisher = publisherFunction.decorateRemote(remotePublisher);
            return PriorityMergingProcessor.build(publisher, this.distributedBatchSize, localPublisher, 64).iterator();
        }
        Supplier<Map.Entry<Address, IntSet>> supplier = () -> {
            DistributedCacheStream distributedCacheStream = this;
            synchronized (distributedCacheStream) {
                return targetIter.hasNext() ? (Map.Entry)targetIter.next() : null;
            }
        };
        PriorityMergingProcessor.Builder<Object> builder = PriorityMergingProcessor.builder();
        for (int i2 = 0; i2 < publisherAmount; ++i2) {
            ClusterStreamManager.RemoteIteratorPublisher remotePublisher = this.csm.remoteIterationPublisher(false, supplier, this.keysToFilter, keysToExclude, this.includeLoader, this.toKeyFunction != null, intermediateOperations);
            Publisher<S> publisher = publisherFunction.decorateRemote(remotePublisher);
            builder.addPublisher(publisher, this.fixBatch(this.distributedBatchSize, i2 == 0, publisherAmount));
        }
        return builder.addPublisher(localPublisher, 64).build().iterator();
    }

    private int fixBatch(int distributedBatchSize, boolean first, int publisherAmount) {
        return distributedBatchSize / publisherAmount + (first ? distributedBatchSize % publisherAmount : 0);
    }

    private Map<Address, IntSet> determineTargets(ConsistentHash ch, IntSet segments) {
        if (segments == null) {
            segments = IntSets.immutableRangeSet(ch.getNumSegments());
        }
        HashMap<Address, IntSet> targets = new HashMap<Address, IntSet>();
        PrimitiveIterator.OfInt iter = segments.iterator();
        while (iter.hasNext()) {
            int segment = iter.nextInt();
            Address owner = ch.locatePrimaryOwnerForSegment(segment);
            if (owner == null || owner.equals(this.localAddress)) continue;
            IntSet targetSegments = (IntSet)targets.get(owner);
            if (targetSegments == null) {
                targetSegments = IntSets.mutableEmptySet(ch.getNumSegments());
                targets.put(owner, targetSegments);
            }
            targetSegments.set(segment);
        }
        return targets;
    }

    @Override
    public Spliterator<R> spliterator() {
        return Spliterators.spliterator(this.iterator(), Long.MAX_VALUE, 4096);
    }

    @Override
    public void forEach(Consumer<? super R> action) {
        if (!this.rehashAware) {
            this.performOperation(TerminalFunctions.forEachFunction(action), false, (v1, v2) -> null, null);
        } else {
            this.performRehashKeyTrackingOperation(s -> new ForEachOperation(this.intermediateOperations, s, this.nonNullKeyFunction(), this.distributedBatchSize, action));
        }
    }

    @Override
    public <K, V> void forEach(BiConsumer<Cache<K, V>, ? super R> action) {
        if (!this.rehashAware) {
            this.performOperation(TerminalFunctions.forEachFunction(action), false, (v1, v2) -> null, null);
        } else {
            this.performRehashKeyTrackingOperation(s -> new ForEachBiOperation(this.intermediateOperations, s, this.nonNullKeyFunction(), this.distributedBatchSize, action));
        }
    }

    @Override
    public void forEachOrdered(Consumer<? super R> action) {
        this.forEach(action);
    }

    @Override
    public Object[] toArray() {
        return this.performOperation(TerminalFunctions.toArrayFunction(), false, (v1, v2) -> {
            Object[] array = Arrays.copyOf(v1, ((Object[])v1).length + ((Object[])v2).length);
            System.arraycopy(v2, 0, array, ((Object[])v1).length, ((Object[])v2).length);
            return array;
        }, null);
    }

    @Override
    public <A> A[] toArray(IntFunction<A[]> generator) {
        return this.performOperation(TerminalFunctions.toArrayFunction(generator), false, (v1, v2) -> {
            Object[] array = (Object[])generator.apply(((Object[])v1).length + ((Object[])v2).length);
            System.arraycopy(v1, 0, array, 0, ((Object[])v1).length);
            System.arraycopy(v2, 0, array, ((Object[])v1).length, ((Object[])v2).length);
            return array;
        }, null);
    }

    @Override
    public CacheStream<R> sequentialDistribution() {
        this.parallelDistribution = false;
        return this;
    }

    @Override
    public CacheStream<R> parallelDistribution() {
        this.parallelDistribution = true;
        return this;
    }

    @Override
    public CacheStream<R> filterKeySegments(Set<Integer> segments) {
        this.segmentsToFilter = IntSets.from(segments);
        return this;
    }

    @Override
    public CacheStream<R> filterKeySegments(IntSet segments) {
        this.segmentsToFilter = segments;
        return this;
    }

    @Override
    public CacheStream<R> filterKeys(Set<?> keys) {
        this.keysToFilter = keys;
        return this;
    }

    @Override
    public CacheStream<R> distributedBatchSize(int batchSize) {
        this.distributedBatchSize = batchSize;
        return this;
    }

    @Override
    public CacheStream<R> segmentCompletionListener(BaseCacheStream.SegmentCompletionListener listener) {
        this.segmentCompletionListener = this.segmentCompletionListener == null ? listener : DistributedCacheStream.composeWithExceptions(this.segmentCompletionListener, listener);
        return this;
    }

    @Override
    public CacheStream<R> disableRehashAware() {
        this.rehashAware = false;
        return this;
    }

    @Override
    public CacheStream<R> timeout(long timeout, TimeUnit unit) {
        if (timeout <= 0L) {
            throw new IllegalArgumentException("Timeout must be greater than 0");
        }
        this.timeout = timeout;
        this.timeoutUnit = unit;
        return this;
    }

    protected DistributedIntCacheStream intCacheStream() {
        return new DistributedIntCacheStream((AbstractCacheStream)this);
    }

    protected DistributedDoubleCacheStream doubleCacheStream() {
        return new DistributedDoubleCacheStream((AbstractCacheStream)this);
    }

    protected DistributedLongCacheStream longCacheStream() {
        return new DistributedLongCacheStream((AbstractCacheStream)this);
    }

    private class CompletionListenerRehashIterator<S>
    extends RehashIterator<S> {
        private final Consumer<? super Supplier<PrimitiveIterator.OfInt>> userListener;
        private volatile CompletionRehashPublisherDecorator completionRehashPublisherDecorator;

        private CompletionListenerRehashIterator(Iterable<IntermediateOperation> intermediateOperations, Consumer<? super Supplier<PrimitiveIterator.OfInt>> userListener) {
            super(intermediateOperations);
            this.userListener = userListener;
        }

        @Override
        protected S getNext() {
            Object next = super.getNext();
            if (next != null) {
                this.completionRehashPublisherDecorator.valueIterated(next);
            } else {
                this.completionRehashPublisherDecorator.complete();
            }
            return next;
        }

        @Override
        PublisherDecorator<S> publisherDecorator(Consumer<? super Supplier<PrimitiveIterator.OfInt>> completedSegments, Consumer<? super Supplier<PrimitiveIterator.OfInt>> lostSegments, Consumer<Object> keyConsumer) {
            this.completionRehashPublisherDecorator = new CompletionRehashPublisherDecorator(DistributedCacheStream.this.iteratorOperation, DistributedCacheStream.this.dm, DistributedCacheStream.this.localAddress, this.userListener, completedSegments, lostSegments, DistributedCacheStream.this.executor, keyConsumer, DistributedCacheStream.this.nonNullKeyFunction());
            return this.completionRehashPublisherDecorator;
        }
    }

    private class RehashIterator<S>
    extends AbstractIterator<S>
    implements CloseableIterator<S> {
        private final AtomicReferenceArray<Set<Object>> receivedKeys;
        private final Iterable<IntermediateOperation> intermediateOperations;
        private final IntSet segmentsToUse;
        private final Consumer<? super Supplier<PrimitiveIterator.OfInt>> completedHandler;
        private CloseableIterator<S> currentIterator;
        private LocalizedCacheTopology cacheTopology;

        private RehashIterator(Iterable<IntermediateOperation> intermediateOperations) {
            int i;
            this.intermediateOperations = intermediateOperations;
            int maxSegment = DistributedCacheStream.this.dm.getCacheTopology().getCurrentCH().getNumSegments();
            if (DistributedCacheStream.this.segmentsToFilter == null) {
                this.segmentsToUse = IntSets.mutableEmptySet(maxSegment);
                for (i = 0; i < maxSegment; ++i) {
                    this.segmentsToUse.set(i);
                }
            } else {
                this.segmentsToUse = IntSets.mutableCopyFrom(DistributedCacheStream.this.segmentsToFilter);
            }
            this.receivedKeys = new AtomicReferenceArray(maxSegment);
            for (i = 0; i < this.receivedKeys.length(); ++i) {
                this.receivedKeys.set(i, new HashSet());
            }
            this.completedHandler = completed -> {
                IntSet intSet = log.isTraceEnabled() ? IntSets.mutableEmptySet(maxSegment) : null;
                ((PrimitiveIterator.OfInt)completed.get()).forEachRemaining((int i) -> {
                    this.receivedKeys.lazySet(i, null);
                    if (intSet != null) {
                        intSet.set(i);
                    }
                    IntSet intSet2 = this.segmentsToUse;
                    synchronized (intSet2) {
                        this.segmentsToUse.remove(i);
                    }
                });
                if (intSet != null) {
                    log.tracef("Remote rehash iterator completed segments %s", (Object)intSet);
                }
            };
        }

        @Override
        protected S getNext() {
            CloseableIterator<S> iterator;
            while ((iterator = this.currentIterator) == null || !iterator.hasNext()) {
                if (this.segmentsToUse.isEmpty()) {
                    return null;
                }
                if (iterator != null) {
                    try {
                        int nextTopology = this.cacheTopology.getTopologyId() + 1;
                        log.tracef("Waiting for topology %d to continue iterator operation with segments %s", nextTopology, (Object)this.segmentsToUse);
                        DistributedCacheStream.this.stateTransferLock.topologyFuture(nextTopology).get(DistributedCacheStream.this.timeout, DistributedCacheStream.this.timeoutUnit);
                    }
                    catch (InterruptedException | ExecutionException | TimeoutException e) {
                        throw new CacheException(e);
                    }
                }
                this.cacheTopology = DistributedCacheStream.this.dm.getCacheTopology();
                log.tracef("Creating non-rehash iterator for segments %s using topology id: %d", (Object)this.segmentsToUse, (Object)this.cacheTopology.getTopologyId());
                this.currentIterator = DistributedCacheStream.this.nonRehashRemoteIterator(this.cacheTopology.getReadConsistentHash(), this.segmentsToUse, this.receivedKeys::get, this.publisherDecorator(this.completedHandler, lostSegments -> {}, k -> {
                    Set<Object> set = this.receivedKeys.get(DistributedCacheStream.this.keyPartitioner.getSegment(k));
                    if (set != null) {
                        set.add(k);
                    }
                }), this.intermediateOperations);
            }
            return (S)iterator.next();
        }

        PublisherDecorator<S> publisherDecorator(Consumer<? super Supplier<PrimitiveIterator.OfInt>> completedSegments, Consumer<? super Supplier<PrimitiveIterator.OfInt>> lostSegments, Consumer<Object> keyConsumer) {
            return new RehashPublisherDecorator(DistributedCacheStream.this.iteratorOperation, DistributedCacheStream.this.dm, DistributedCacheStream.this.localAddress, completedSegments, lostSegments, DistributedCacheStream.this.executor, keyConsumer, DistributedCacheStream.this.nonNullKeyFunction());
        }

        @Override
        public void close() {
            if (this.currentIterator != null) {
                this.currentIterator.close();
            }
        }
    }

    @SerializeWith(value=IdentityFinishCollectorExternalizer.class)
    private static final class IdentifyFinishCollector<T, A>
    implements Collector<T, A, A> {
        private final Collector<T, A, ?> realCollector;

        IdentifyFinishCollector(Collector<T, A, ?> realCollector) {
            this.realCollector = realCollector;
        }

        @Override
        public Supplier<A> supplier() {
            return this.realCollector.supplier();
        }

        @Override
        public BiConsumer<A, T> accumulator() {
            return this.realCollector.accumulator();
        }

        @Override
        public BinaryOperator<A> combiner() {
            return this.realCollector.combiner();
        }

        @Override
        public Function<A, A> finisher() {
            return null;
        }

        @Override
        public Set<Collector.Characteristics> characteristics() {
            Set<Collector.Characteristics> characteristics = this.realCollector.characteristics();
            if (characteristics.size() == 0) {
                return EnumSet.of(Collector.Characteristics.IDENTITY_FINISH);
            }
            EnumSet<Collector.Characteristics> tweaked = EnumSet.copyOf(characteristics);
            tweaked.add(Collector.Characteristics.IDENTITY_FINISH);
            return tweaked;
        }

        public static final class IdentityFinishCollectorExternalizer
        implements Externalizer<IdentifyFinishCollector> {
            @Override
            public void writeObject(ObjectOutput output, IdentifyFinishCollector object) throws IOException {
                output.writeObject(object.realCollector);
            }

            @Override
            public IdentifyFinishCollector readObject(ObjectInput input) throws IOException, ClassNotFoundException {
                return new IdentifyFinishCollector((Collector)input.readObject());
            }
        }
    }
}

