package org.infinispan.reactive.publisher.impl;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.FlowableConverter;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.functions.Predicate;
import io.reactivex.rxjava3.parallel.ParallelFlowable;
import io.reactivex.rxjava3.processors.FlowableProcessor;
import io.reactivex.rxjava3.processors.UnicastProcessor;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.lang.invoke.MethodHandles;
import java.util.Objects;
import java.util.PrimitiveIterator;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.IntConsumer;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.CacheSet;
import org.infinispan.cache.impl.AbstractDelegatingCache;
import org.infinispan.commons.reactive.RxJavaInterop;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.commons.util.ProcessorInfo;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.ImmortalCacheEntry;
import org.infinispan.container.entries.MVCCEntry;
import org.infinispan.container.entries.NullCacheEntry;
import org.infinispan.context.Flag;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.impl.ComponentRef;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.reactive.publisher.impl.commands.reduction.PublisherResult;
import org.infinispan.reactive.publisher.impl.commands.reduction.SegmentPublisherResult;
import org.infinispan.stream.StreamMarshalling;
import org.infinispan.util.concurrent.CompletableFutures;
import org.infinispan.util.concurrent.CompletionStages;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

@Scope(Scopes.NAMED_CACHE)
/* loaded from: input_file:org/infinispan/reactive/publisher/impl/LocalPublisherManagerImpl.class */
public class LocalPublisherManagerImpl<K, V> implements LocalPublisherManager<K, V> {

    @Inject
    ComponentRef<Cache<K, V>> cacheComponentRef;

    @Inject
    DistributionManager distributionManager;
    protected AdvancedCache<K, V> remoteCache;
    protected AdvancedCache<K, V> cache;
    protected Scheduler nonBlockingScheduler;
    protected int maxSegment;
    protected CacheSet<K> keySet;
    protected CacheSet<K> keySetWithoutLoader;
    protected CacheSet<CacheEntry<K, V>> entrySet;
    protected CacheSet<CacheEntry<K, V>> entrySetWithoutLoader;
    private static final Log log = LogFactory.getLog(MethodHandles.lookup().lookupClass());
    private static Function<Object, PublisherResult<Object>> ignoreSegmentsFunction = obj -> {
        return new SegmentPublisherResult(IntSets.immutableEmptySet(), obj);
    };
    protected final int cpuCount = ProcessorInfo.availableProcessors();
    protected final Set<IntConsumer> changeListener = ConcurrentHashMap.newKeySet();

    /* loaded from: input_file:org/infinispan/reactive/publisher/impl/LocalPublisherManagerImpl$RemoveSegmentListener.class */
    class RemoveSegmentListener implements IntConsumer {
        private final IntSet segments;

        RemoveSegmentListener(IntSet intSet) {
            this.segments = intSet;
        }

        @Override // java.util.function.IntConsumer
        public void accept(int i) {
            if (this.segments.remove(i) && LocalPublisherManagerImpl.log.isTraceEnabled()) {
                LocalPublisherManagerImpl.log.tracef("Listener %s lost segment %d", this, Integer.valueOf(i));
            }
        }

        void verifyTopology(LocalizedCacheTopology localizedCacheTopology) {
            PrimitiveIterator.OfInt it = this.segments.iterator();
            while (it.hasNext()) {
                int nextInt = it.nextInt();
                if (!localizedCacheTopology.isSegmentReadOwner(nextInt)) {
                    if (LocalPublisherManagerImpl.log.isTraceEnabled()) {
                        LocalPublisherManagerImpl.log.tracef("Listener %s lost segment %d before invocation", this, Integer.valueOf(nextInt));
                    }
                    it.remove();
                }
            }
        }
    }

    /* loaded from: input_file:org/infinispan/reactive/publisher/impl/LocalPublisherManagerImpl$SegmentAwarePublisherImpl.class */
    private class SegmentAwarePublisherImpl<I, R> implements SegmentAwarePublisher<R> {
        private final IntSet segments;
        private final CacheSet<I> set;
        private final Predicate<? super I> predicate;
        private final DeliveryGuarantee deliveryGuarantee;
        private final Function<? super Publisher<I>, ? extends Publisher<R>> transformer;

        private SegmentAwarePublisherImpl(IntSet intSet, CacheSet<I> cacheSet, Function<? super I, K> function, Set<K> set, DeliveryGuarantee deliveryGuarantee, Function<? super Publisher<I>, ? extends Publisher<R>> function2) {
            this.segments = intSet;
            this.set = cacheSet;
            this.predicate = set != null ? obj -> {
                return !set.contains(function.apply(obj));
            } : null;
            this.deliveryGuarantee = deliveryGuarantee;
            this.transformer = function2;
        }

        @Override // org.infinispan.reactive.publisher.impl.SegmentAwarePublisher
        public void subscribe(Subscriber<? super R> subscriber, IntConsumer intConsumer, IntConsumer intConsumer2) {
            Flowable fromStream;
            switch (this.deliveryGuarantee) {
                case AT_MOST_ONCE:
                    fromStream = Flowable.fromStream(this.segments.intStream().mapToObj(i -> {
                        Publisher<I> localPublisher = this.set.localPublisher(i);
                        if (this.predicate != null) {
                            localPublisher = Flowable.fromPublisher(localPublisher).filter(this.predicate);
                        }
                        return Flowable.fromPublisher(this.transformer.apply(localPublisher)).doOnComplete(() -> {
                            intConsumer.accept(i);
                        });
                    }));
                    break;
                case AT_LEAST_ONCE:
                case EXACTLY_ONCE:
                    IntSet concurrentCopyFrom = IntSets.concurrentCopyFrom(this.segments, LocalPublisherManagerImpl.this.maxSegment);
                    RemoveSegmentListener removeSegmentListener = new RemoveSegmentListener(concurrentCopyFrom);
                    LocalPublisherManagerImpl.this.changeListener.add(removeSegmentListener);
                    removeSegmentListener.verifyTopology(LocalPublisherManagerImpl.this.distributionManager.getCacheTopology());
                    fromStream = Flowable.fromStream(this.segments.intStream().mapToObj(i2 -> {
                        if (!concurrentCopyFrom.contains(i2)) {
                            return Flowable.empty();
                        }
                        Publisher<I> localPublisher = this.set.localPublisher(i2);
                        if (this.predicate != null) {
                            localPublisher = Flowable.fromPublisher(localPublisher).filter(this.predicate);
                        }
                        return Flowable.fromPublisher(this.transformer.apply(localPublisher)).doOnComplete(() -> {
                            if (concurrentCopyFrom.remove(i2)) {
                                intConsumer.accept(i2);
                            } else {
                                intConsumer2.accept(i2);
                            }
                        });
                    }));
                    break;
                default:
                    throw new UnsupportedOperationException("Unsupported delivery guarantee: " + this.deliveryGuarantee);
            }
            fromStream.concatMap(RxJavaInterop.identityFunction()).subscribe(subscriber);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/infinispan/reactive/publisher/impl/LocalPublisherManagerImpl$SegmentListener.class */
    public class SegmentListener implements IntConsumer {
        protected final IntSet segments;
        protected final IntSet segmentsLost;

        SegmentListener(IntSet intSet) {
            this.segments = intSet;
            this.segmentsLost = IntSets.concurrentSet(LocalPublisherManagerImpl.this.maxSegment);
        }

        @Override // java.util.function.IntConsumer
        public void accept(int i) {
            if (this.segments.contains(i)) {
                if (LocalPublisherManagerImpl.log.isTraceEnabled()) {
                    LocalPublisherManagerImpl.log.tracef("Listener %s lost segment %d", this, Integer.valueOf(i));
                }
                this.segmentsLost.set(i);
            }
        }

        void verifyTopology(LocalizedCacheTopology localizedCacheTopology) {
            PrimitiveIterator.OfInt it = this.segments.iterator();
            while (it.hasNext()) {
                int nextInt = it.nextInt();
                if (!localizedCacheTopology.isSegmentReadOwner(nextInt)) {
                    this.segmentsLost.set(nextInt);
                }
            }
        }
    }

    @Inject
    public void inject(@ComponentName("org.infinispan.executors.non-blocking") ExecutorService executorService) {
        this.nonBlockingScheduler = Schedulers.from(executorService);
    }

    @Start
    public void start() {
        this.remoteCache = AbstractDelegatingCache.unwrapCache(this.cacheComponentRef.running()).getAdvancedCache();
        this.cache = this.remoteCache.withFlags(Flag.CACHE_MODE_LOCAL, Flag.REMOTE_ITERATION);
        this.maxSegment = this.cache.getCacheConfiguration().clustering().hash().numSegments();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.infinispan.reactive.publisher.impl.LocalPublisherManager
    public <R> CompletionStage<PublisherResult<R>> keyReduction(boolean z, IntSet intSet, Set<K> set, Set<K> set2, boolean z2, DeliveryGuarantee deliveryGuarantee, Function<? super Publisher<K>, ? extends CompletionStage<R>> function, Function<? super Publisher<R>, ? extends CompletionStage<R>> function2) {
        if (set != null) {
            return handleSpecificKeys(z, set, set2, z2, deliveryGuarantee, function, function2);
        }
        CacheSet keySet = getKeySet(z2);
        Function<I, K> identity = Function.identity();
        switch (deliveryGuarantee) {
            case AT_MOST_ONCE:
                return (CompletionStage<PublisherResult<R>>) atMostOnce(z, keySet, set2, identity, intSet, function, function2).thenApply(ignoreSegmentsFunction());
            case AT_LEAST_ONCE:
                return atLeastOnce(z, keySet, set2, identity, intSet, function, function2);
            case EXACTLY_ONCE:
                return exactlyOnce(z, keySet, set2, identity, intSet, function, function2);
            default:
                throw new UnsupportedOperationException("Unsupported delivery guarantee: " + deliveryGuarantee);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.infinispan.reactive.publisher.impl.LocalPublisherManager
    public <R> CompletionStage<PublisherResult<R>> entryReduction(boolean z, IntSet intSet, Set<K> set, Set<K> set2, boolean z2, DeliveryGuarantee deliveryGuarantee, Function<? super Publisher<CacheEntry<K, V>>, ? extends CompletionStage<R>> function, Function<? super Publisher<R>, ? extends CompletionStage<R>> function2) {
        if (set != null) {
            return handleSpecificEntries(z, set, set2, z2, deliveryGuarantee, function, function2);
        }
        CacheSet entrySet = getEntrySet(z2);
        Function<I, K> entryToKeyFunction = StreamMarshalling.entryToKeyFunction();
        switch (deliveryGuarantee) {
            case AT_MOST_ONCE:
                return (CompletionStage<PublisherResult<R>>) atMostOnce(z, entrySet, set2, entryToKeyFunction, intSet, function, function2).thenApply(ignoreSegmentsFunction());
            case AT_LEAST_ONCE:
                return atLeastOnce(z, entrySet, set2, entryToKeyFunction, intSet, function, function2);
            case EXACTLY_ONCE:
                return exactlyOnce(z, entrySet, set2, entryToKeyFunction, intSet, function, function2);
            default:
                throw new UnsupportedOperationException("Unsupported delivery guarantee: " + deliveryGuarantee);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.infinispan.reactive.publisher.impl.LocalPublisherManager
    public <R> SegmentAwarePublisher<R> keyPublisher(IntSet intSet, Set<K> set, Set<K> set2, boolean z, DeliveryGuarantee deliveryGuarantee, Function<? super Publisher<K>, ? extends Publisher<R>> function) {
        if (set == null) {
            return new SegmentAwarePublisherImpl(intSet, getKeySet(z), Function.identity(), set2, deliveryGuarantee, function);
        }
        AdvancedCache<K, V> cache = getCache(deliveryGuarantee, z);
        return specificKeyPublisher(intSet, set, flowable -> {
            Objects.requireNonNull(cache);
            return flowable.filter(cache::containsKey);
        }, function);
    }

    private Flowable<CacheEntry<K, V>> filterEntries(AdvancedCache<K, V> advancedCache, Flowable<K> flowable) {
        return flowable.concatMapMaybe(obj -> {
            return Maybe.fromCompletionStage(advancedCache.getCacheEntryAsync(obj).thenApply(cacheEntry -> {
                if (cacheEntry == null) {
                    return NullCacheEntry.getInstance();
                }
                if (cacheEntry instanceof MVCCEntry) {
                    cacheEntry = new ImmortalCacheEntry(cacheEntry.getKey(), cacheEntry.getValue());
                }
                return cacheEntry;
            }));
        }).filter(cacheEntry -> {
            return cacheEntry != NullCacheEntry.getInstance();
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.infinispan.reactive.publisher.impl.LocalPublisherManager
    public <R> SegmentAwarePublisher<R> entryPublisher(IntSet intSet, Set<K> set, Set<K> set2, boolean z, DeliveryGuarantee deliveryGuarantee, Function<? super Publisher<CacheEntry<K, V>>, ? extends Publisher<R>> function) {
        if (set == null) {
            return new SegmentAwarePublisherImpl(intSet, getEntrySet(z), StreamMarshalling.entryToKeyFunction(), set2, deliveryGuarantee, function);
        }
        AdvancedCache<K, V> cache = getCache(deliveryGuarantee, z);
        return specificKeyPublisher(intSet, set, flowable -> {
            return filterEntries(cache, flowable);
        }, function);
    }

    private <I, R> SegmentAwarePublisher<R> specificKeyPublisher(IntSet intSet, Set<K> set, FlowableConverter<K, Flowable<I>> flowableConverter, Function<? super Publisher<I>, ? extends Publisher<R>> function) {
        return (subscriber, intConsumer, intConsumer2) -> {
            Flowable doOnComplete = ((Flowable) Flowable.fromIterable(set).to(flowableConverter)).doOnComplete(() -> {
                PrimitiveIterator.OfInt it = intSet.iterator();
                while (it.hasNext()) {
                    intConsumer.accept(it.nextInt());
                }
            });
            Objects.requireNonNull(function);
            ((Publisher) doOnComplete.to((v1) -> {
                return r1.apply(v1);
            })).subscribe(subscriber);
        };
    }

    @Override // org.infinispan.reactive.publisher.impl.LocalPublisherManager
    public void segmentsLost(IntSet intSet) {
        if (log.isTraceEnabled()) {
            log.tracef("Notifying listeners of lost segments %s", intSet);
        }
        Set<IntConsumer> set = this.changeListener;
        Objects.requireNonNull(intSet);
        set.forEach(intSet::forEach);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <R> Function<R, PublisherResult<R>> ignoreSegmentsFunction() {
        return (Function<R, PublisherResult<R>>) ignoreSegmentsFunction;
    }

    private <I, R> void handleParallelSegment(PrimitiveIterator.OfInt ofInt, int i, CacheSet<I> cacheSet, Set<K> set, Function<I, K> function, Function<? super Publisher<I>, ? extends CompletionStage<R>> function2, FlowableProcessor<R> flowableProcessor, IntSet intSet, LocalPublisherManagerImpl<K, V>.SegmentListener segmentListener) {
        int nextSegment;
        AtomicInteger atomicInteger = new AtomicInteger(1);
        boolean z = false;
        while (true) {
            if (i != -1) {
                nextSegment = i;
                i = -1;
            } else {
                nextSegment = getNextSegment(ofInt);
                if (nextSegment == -1) {
                    completeTask(atomicInteger, null, flowableProcessor);
                    return;
                }
            }
            try {
                atomicInteger.getAndIncrement();
                Flowable fromPublisher = Flowable.fromPublisher(cacheSet.localPublisher(nextSegment));
                if (set != null) {
                    fromPublisher = fromPublisher.filter(obj -> {
                        return !set.contains(function.apply(obj));
                    });
                }
                CompletionStage<R> apply = function2.apply(fromPublisher);
                if (CompletionStages.isCompletedSuccessfully(apply)) {
                    intSet.remove(nextSegment);
                    completeTask(atomicInteger, segmentListener.segmentsLost.contains(nextSegment) ? null : CompletionStages.join(apply), flowableProcessor);
                } else {
                    if (!z) {
                        z = true;
                        flowableProcessor = flowableProcessor.toSerialized();
                    }
                    FlowableProcessor<R> flowableProcessor2 = flowableProcessor;
                    int i2 = nextSegment;
                    apply.whenComplete((obj2, th) -> {
                        if (th != null) {
                            flowableProcessor2.onError(th);
                        } else {
                            intSet.remove(i2);
                            completeTask(atomicInteger, segmentListener.segmentsLost.contains(i2) ? null : obj2, flowableProcessor2);
                        }
                    });
                }
            } catch (Throwable th2) {
                flowableProcessor.onError(th2);
                return;
            }
        }
    }

    private static <V> void completeTask(AtomicInteger atomicInteger, V v, FlowableProcessor<V> flowableProcessor) {
        if (v != null) {
            flowableProcessor.onNext(v);
        }
        if (atomicInteger.decrementAndGet() == 0) {
            flowableProcessor.onComplete();
        }
    }

    private int getNextSegment(PrimitiveIterator.OfInt ofInt) {
        synchronized (ofInt) {
            if (!ofInt.hasNext()) {
                return -1;
            }
            return ofInt.nextInt();
        }
    }

    private <I, R> CompletionStage<PublisherResult<R>> exactlyOnce(boolean z, CacheSet<I> cacheSet, Set<K> set, Function<I, K> function, IntSet intSet, Function<? super Publisher<I>, ? extends CompletionStage<R>> function2, Function<? super Publisher<R>, ? extends CompletionStage<R>> function3) {
        IntSet concurrentCopyFrom = IntSets.concurrentCopyFrom(intSet, this.maxSegment);
        LocalPublisherManagerImpl<K, V>.SegmentListener segmentListener = new SegmentListener(concurrentCopyFrom);
        this.changeListener.add(segmentListener);
        segmentListener.verifyTopology(this.distributionManager.getCacheTopology());
        return exactlyOnceHandleLostSegments(function3.apply(z ? exactlyOnceParallel(cacheSet, set, function, intSet, function2, segmentListener, concurrentCopyFrom) : exactlyOnceSequential(cacheSet, set, function, intSet, function2, segmentListener, concurrentCopyFrom)), segmentListener);
    }

    protected <R> CompletionStage<PublisherResult<R>> exactlyOnceHandleLostSegments(CompletionStage<R> completionStage, LocalPublisherManagerImpl<K, V>.SegmentListener segmentListener) {
        return handleLostSegments(completionStage, segmentListener);
    }

    protected <I, R> Flowable<R> exactlyOnceParallel(CacheSet<I> cacheSet, Set<K> set, Function<I, K> function, IntSet intSet, Function<? super Publisher<I>, ? extends CompletionStage<R>> function2, LocalPublisherManagerImpl<K, V>.SegmentListener segmentListener, IntSet intSet2) {
        int i = this.cpuCount - 1;
        Flowable[] flowableArr = new Flowable[i + 1];
        PrimitiveIterator.OfInt it = intSet.iterator();
        for (int i2 = 0; i2 < i; i2++) {
            int nextSegment = getNextSegment(it);
            if (nextSegment == -1) {
                flowableArr[i2] = Flowable.empty();
            } else {
                UnicastProcessor create = UnicastProcessor.create();
                flowableArr[i2] = create;
                this.nonBlockingScheduler.scheduleDirect(() -> {
                    handleParallelSegment(it, nextSegment, cacheSet, set, function, function2, create, intSet2, segmentListener);
                });
            }
        }
        int nextSegment2 = getNextSegment(it);
        if (nextSegment2 != -1) {
            UnicastProcessor create2 = UnicastProcessor.create();
            flowableArr[i] = create2;
            handleParallelSegment(it, nextSegment2, cacheSet, set, function, function2, create2, intSet2, segmentListener);
        } else {
            flowableArr[i] = Flowable.empty();
        }
        return ParallelFlowable.fromArray(flowableArr).sequential();
    }

    protected <I, R> Flowable<R> exactlyOnceSequential(CacheSet<I> cacheSet, Set<K> set, Function<I, K> function, IntSet intSet, Function<? super Publisher<I>, ? extends CompletionStage<R>> function2, LocalPublisherManagerImpl<K, V>.SegmentListener segmentListener, IntSet intSet2) {
        return combineStages(Flowable.fromStream(intSet.intStream().mapToObj(i -> {
            Flowable doOnComplete = Flowable.fromPublisher(cacheSet.localPublisher(i)).doOnComplete(() -> {
                intSet2.remove(i);
            });
            if (set != null) {
                doOnComplete = doOnComplete.filter(obj -> {
                    return !set.contains(function.apply(obj));
                });
            }
            CompletionStage completionStage = (CompletionStage) function2.apply(doOnComplete);
            return CompletionStages.isCompletedSuccessfully(completionStage) ? segmentListener.segmentsLost.contains(i) ? CompletableFutures.completedNull() : completionStage : completionStage.thenCompose(obj2 -> {
                return segmentListener.segmentsLost.contains(i) ? CompletableFutures.completedNull() : CompletableFuture.completedFuture(obj2);
            });
        })), false);
    }

    private AdvancedCache<K, V> getCache(DeliveryGuarantee deliveryGuarantee, boolean z) {
        AdvancedCache<K, V> advancedCache = deliveryGuarantee == DeliveryGuarantee.AT_MOST_ONCE ? this.cache : this.remoteCache;
        return !z ? advancedCache.withFlags(Flag.SKIP_CACHE_LOAD) : advancedCache;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <R> CompletionStage<PublisherResult<R>> handleSpecificKeys(boolean z, Set<K> set, Set<K> set2, boolean z2, DeliveryGuarantee deliveryGuarantee, Function<? super Publisher<K>, ? extends CompletionStage<R>> function, Function<? super Publisher<R>, ? extends CompletionStage<R>> function2) {
        AdvancedCache<K, V> cache = getCache(deliveryGuarantee, z2);
        return handleSpecificObjects(z, set, set2, flowable -> {
            return flowable.concatMapMaybe(obj -> {
                return Maybe.fromCompletionStage(cache.containsKeyAsync(obj).thenApply(bool -> {
                    if (bool.booleanValue()) {
                        return obj;
                    }
                    return null;
                }));
            });
        }, function, function2);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <R> CompletionStage<PublisherResult<R>> handleSpecificEntries(boolean z, Set<K> set, Set<K> set2, boolean z2, DeliveryGuarantee deliveryGuarantee, Function<? super Publisher<CacheEntry<K, V>>, ? extends CompletionStage<R>> function, Function<? super Publisher<R>, ? extends CompletionStage<R>> function2) {
        AdvancedCache<K, V> cache = getCache(deliveryGuarantee, z2);
        return handleSpecificObjects(z, set, set2, flowable -> {
            return flowable.concatMapMaybe(obj -> {
                return Maybe.fromCompletionStage(cache.getCacheEntryAsync(obj).thenApply(cacheEntry -> {
                    if (cacheEntry instanceof MVCCEntry) {
                        cacheEntry = new ImmortalCacheEntry(cacheEntry.getKey(), cacheEntry.getValue());
                    }
                    return cacheEntry;
                }));
            }).filter(cacheEntry -> {
                return cacheEntry != NullCacheEntry.getInstance();
            });
        }, function, function2);
    }

    private <I, R> CompletionStage<PublisherResult<R>> handleSpecificObjects(boolean z, Set<K> set, Set<K> set2, Function<? super Flowable<K>, ? extends Flowable<I>> function, Function<? super Publisher<I>, ? extends CompletionStage<R>> function2, Function<? super Publisher<R>, ? extends CompletionStage<R>> function3) {
        Flowable fromIterable = Flowable.fromIterable(set);
        if (set2 != null) {
            fromIterable = fromIterable.filter(obj -> {
                return !set2.contains(obj);
            });
        }
        if (z) {
            return (CompletionStage<PublisherResult<R>>) function3.apply(fromIterable.window(16L).flatMapSingle(flowable -> {
                Flowable flowable = (Flowable) function.apply(flowable.observeOn(this.nonBlockingScheduler));
                Objects.requireNonNull(function2);
                return Single.fromCompletionStage((CompletionStage) flowable.to((v1) -> {
                    return r1.apply(v1);
                }));
            })).thenApply(ignoreSegmentsFunction());
        }
        Flowable<I> apply = function.apply(fromIterable);
        Objects.requireNonNull(function2);
        return ((CompletionStage) apply.to((v1) -> {
            return r1.apply(v1);
        })).thenApply(ignoreSegmentsFunction());
    }

    private <I, R> CompletionStage<R> parallelAtMostOnce(CacheSet<I> cacheSet, Set<K> set, Function<I, K> function, IntSet intSet, Function<? super Publisher<I>, ? extends CompletionStage<R>> function2, Function<? super Publisher<R>, ? extends CompletionStage<R>> function3) {
        return combineStages(Flowable.fromIterable(intSet).parallel().runOn(this.nonBlockingScheduler).map(num -> {
            Flowable fromPublisher = Flowable.fromPublisher(cacheSet.localPublisher(num.intValue()));
            if (set != null) {
                fromPublisher = fromPublisher.filter(obj -> {
                    return !set.contains(function.apply(obj));
                });
            }
            return (CompletionStage) function2.apply(fromPublisher);
        }).sequential(), function3, true);
    }

    private <I, R> CompletionStage<R> atMostOnce(boolean z, CacheSet<I> cacheSet, Set<K> set, Function<I, K> function, IntSet intSet, Function<? super Publisher<I>, ? extends CompletionStage<R>> function2, Function<? super Publisher<R>, ? extends CompletionStage<R>> function3) {
        if (z) {
            return parallelAtMostOnce(cacheSet, set, function, intSet, function2, function3);
        }
        Flowable fromPublisher = Flowable.fromPublisher(cacheSet.localPublisher(intSet));
        if (set != null) {
            fromPublisher = fromPublisher.filter(obj -> {
                return !set.contains(function.apply(obj));
            });
        }
        return function2.apply(fromPublisher);
    }

    private <I, R> CompletionStage<PublisherResult<R>> atLeastOnce(boolean z, CacheSet<I> cacheSet, Set<K> set, Function<I, K> function, IntSet intSet, Function<? super Publisher<I>, ? extends CompletionStage<R>> function2, Function<? super Publisher<R>, ? extends CompletionStage<R>> function3) {
        LocalPublisherManagerImpl<K, V>.SegmentListener segmentListener = new SegmentListener(intSet);
        this.changeListener.add(segmentListener);
        segmentListener.verifyTopology(this.distributionManager.getCacheTopology());
        return handleLostSegments(atMostOnce(z, cacheSet, set, function, intSet, function2, function3), segmentListener);
    }

    protected <R> CompletionStage<PublisherResult<R>> handleLostSegments(CompletionStage<R> completionStage, LocalPublisherManagerImpl<K, V>.SegmentListener segmentListener) {
        return completionStage.thenApply(obj -> {
            IntSet intSet = segmentListener.segmentsLost;
            return intSet.isEmpty() ? (PublisherResult) ignoreSegmentsFunction().apply(obj) : new SegmentPublisherResult(intSet, obj);
        }).whenComplete((publisherResult, th) -> {
            this.changeListener.remove(segmentListener);
        });
    }

    protected <R> CompletionStage<R> combineStages(Flowable<? extends CompletionStage<R>> flowable, Function<? super Publisher<R>, ? extends CompletionStage<R>> function, boolean z) {
        return function.apply(combineStages(flowable, z));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <R> Flowable<R> combineStages(Flowable<? extends CompletionStage<R>> flowable, boolean z) {
        return flowable.flatMapMaybe(completionStage -> {
            if (completionStage == CompletableFutures.completedNull()) {
                return Maybe.empty();
            }
            if (!CompletionStages.isCompletedSuccessfully(completionStage)) {
                return Maybe.fromCompletionStage(completionStage);
            }
            Object join = CompletionStages.join(completionStage);
            return join == null ? Maybe.empty() : Maybe.just(join);
        }, false, z ? this.cpuCount : 1);
    }

    private CacheSet<K> getKeySet(boolean z) {
        if (z) {
            if (this.keySet == null) {
                this.keySet = this.cache.withFlags(Flag.IGNORE_TRANSACTION).keySet();
            }
            return this.keySet;
        }
        if (this.keySetWithoutLoader == null) {
            this.keySetWithoutLoader = this.cache.withFlags(Flag.SKIP_CACHE_LOAD, Flag.IGNORE_TRANSACTION).keySet();
        }
        return this.keySetWithoutLoader;
    }

    void resetKeyAndEntrySet() {
        this.keySet = null;
        this.keySetWithoutLoader = null;
        this.entrySet = null;
        this.entrySetWithoutLoader = null;
    }

    private CacheSet<CacheEntry<K, V>> getEntrySet(boolean z) {
        if (z) {
            if (this.entrySet == null) {
                this.entrySet = this.cache.withFlags(Flag.IGNORE_TRANSACTION).cacheEntrySet();
            }
            return this.entrySet;
        }
        if (this.entrySetWithoutLoader == null) {
            this.entrySetWithoutLoader = this.cache.withFlags(Flag.SKIP_CACHE_LOAD, Flag.IGNORE_TRANSACTION).cacheEntrySet();
        }
        return this.entrySetWithoutLoader;
    }
}
