/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.interceptors.distribution;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
import net.jcip.annotations.GuardedBy;
import org.infinispan.commands.FlagAffectedCommand;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.TopologyAffectedCommand;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.functional.ReadOnlyKeyCommand;
import org.infinispan.commands.functional.ReadOnlyManyCommand;
import org.infinispan.commands.read.AbstractDataCommand;
import org.infinispan.commands.read.GetAllCommand;
import org.infinispan.commands.read.GetCacheEntryCommand;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.commands.remote.ClusteredGetAllCommand;
import org.infinispan.commands.remote.ClusteredGetCommand;
import org.infinispan.commands.remote.GetKeysInGroupCommand;
import org.infinispan.commands.write.AbstractDataWriteCommand;
import org.infinispan.commands.write.ClearCommand;
import org.infinispan.commands.write.DataWriteCommand;
import org.infinispan.commands.write.ValueMatcher;
import org.infinispan.commons.CacheException;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.entries.InternalCacheValue;
import org.infinispan.container.entries.NullCacheEntry;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.distribution.DistributionInfo;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.distribution.RemoteValueRetrievedListener;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.distribution.ch.KeyPartitioner;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.interceptors.InvocationSuccessFunction;
import org.infinispan.interceptors.impl.ClusteringInterceptor;
import org.infinispan.remoting.RemoteException;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.responses.SuccessfulResponse;
import org.infinispan.remoting.responses.UnsureResponse;
import org.infinispan.remoting.responses.ValidResponse;
import org.infinispan.remoting.rpc.ResponseMode;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.AllOwnersLostException;
import org.infinispan.statetransfer.OutdatedTopologyException;
import org.infinispan.transaction.xa.GlobalTransaction;
import org.infinispan.util.concurrent.CompletableFutures;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

public abstract class BaseDistributionInterceptor
extends ClusteringInterceptor {
    private static final Log log = LogFactory.getLog(BaseDistributionInterceptor.class);
    private static final boolean trace = log.isTraceEnabled();
    private static final Object LOST_PLACEHOLDER = new Object();
    protected DistributionManager dm;
    protected RemoteValueRetrievedListener rvrl;
    protected KeyPartitioner keyPartitioner;
    protected boolean isL1Enabled;
    protected boolean isReplicated;
    private final ReadOnlyManyHelper readOnlyManyHelper = new ReadOnlyManyHelper();
    private final InvocationSuccessFunction primaryReturnHandler = this::primaryReturnHandler;

    @Override
    protected Log getLog() {
        return log;
    }

    @Inject
    public void injectDependencies(DistributionManager distributionManager, RemoteValueRetrievedListener rvrl, KeyPartitioner keyPartitioner) {
        this.dm = distributionManager;
        this.rvrl = rvrl;
        this.keyPartitioner = keyPartitioner;
    }

    @Start
    public void configure() {
        this.isL1Enabled = this.cacheConfiguration.clustering().l1().enabled();
        this.isReplicated = this.cacheConfiguration.clustering().cacheMode().isReplicated();
    }

    @Override
    public final Object visitGetKeysInGroupCommand(InvocationContext ctx, GetKeysInGroupCommand command) throws Throwable {
        if (command.isGroupOwner()) {
            return this.invokeNext(ctx, command);
        }
        Address primaryOwner = this.dm.getCacheTopology().getDistribution(command.getGroupName()).primary();
        CompletableFuture<Map<Address, Response>> future = this.rpcManager.invokeRemotelyAsync(Collections.singleton(primaryOwner), command, this.defaultSyncOptions);
        return this.asyncInvokeNext(ctx, (VisitableCommand)command, (CompletableFuture<?>)future.thenAccept(responses -> {
            Response response;
            if (!responses.isEmpty() && (response = (Response)responses.values().iterator().next()) instanceof SuccessfulResponse) {
                List cacheEntries = (List)((SuccessfulResponse)response).getResponseValue();
                for (CacheEntry entry : cacheEntries) {
                    this.wrapRemoteEntry(ctx, entry.getKey(), entry, false);
                }
            }
        }));
    }

    @Override
    public final Object visitClearCommand(InvocationContext ctx, ClearCommand command) throws Throwable {
        if (ctx.isOriginLocal() && !this.isLocalModeForced(command)) {
            RpcOptions rpcOptions = this.rpcManager.getRpcOptionsBuilder(this.isSynchronous(command) ? ResponseMode.SYNCHRONOUS_IGNORE_LEAVERS : ResponseMode.ASYNCHRONOUS).build();
            return this.asyncInvokeNext(ctx, (VisitableCommand)command, this.rpcManager.invokeRemotelyAsync(null, command, rpcOptions));
        }
        return this.invokeNext(ctx, command);
    }

    protected <C extends FlagAffectedCommand & TopologyAffectedCommand> CompletableFuture<Void> remoteGet(InvocationContext ctx, C command, Object key, boolean isWrite) {
        LocalizedCacheTopology cacheTopology = this.checkTopologyId(command);
        int topologyId = cacheTopology.getTopologyId();
        DistributionInfo info = cacheTopology.getDistribution(key);
        if (info.isReadOwner()) {
            if (trace) {
                log.tracef("Key %s became local after wrapping, retrying command. Command topology is %d, current topology is %d", key, (Object)((TopologyAffectedCommand)command).getTopologyId(), (Object)topologyId);
            }
            if (((TopologyAffectedCommand)command).getTopologyId() == topologyId) {
                throw new IllegalStateException();
            }
            throw new OutdatedTopologyException(topologyId);
        }
        if (trace) {
            log.tracef("Perform remote get for key %s. currentTopologyId=%s, owners=%s", key, (Object)topologyId, (Object)info.readOwners());
        }
        ClusteredGetCommand getCommand = this.cf.buildClusteredGetCommand(key, command.getFlagsBitSet());
        getCommand.setTopologyId(topologyId);
        getCommand.setWrite(isWrite);
        return this.rpcManager.invokeRemotelyAsync(info.readOwners(), getCommand, this.getStaggeredOptions(info.readOwners().size())).thenAccept(responses -> {
            for (Response r : responses.values()) {
                if (!(r instanceof SuccessfulResponse)) continue;
                SuccessfulResponse response = (SuccessfulResponse)r;
                Object responseValue = response.getResponseValue();
                if (responseValue == null) {
                    if (this.rvrl != null) {
                        this.rvrl.remoteValueNotFound(key);
                    }
                    this.wrapRemoteEntry(ctx, key, NullCacheEntry.getInstance(), isWrite);
                    return;
                }
                InternalCacheEntry ice = ((InternalCacheValue)responseValue).toInternalCacheEntry(key);
                if (this.rvrl != null) {
                    this.rvrl.remoteValueFound(ice);
                }
                this.wrapRemoteEntry(ctx, key, ice, isWrite);
                return;
            }
            throw BaseDistributionInterceptor.handleMissingSuccessfulResponse(responses);
        });
    }

    protected static CacheException handleMissingSuccessfulResponse(Map<Address, Response> responses) {
        if (responses.values().stream().anyMatch(UnsureResponse.class::isInstance)) {
            return OutdatedTopologyException.INSTANCE;
        }
        return AllOwnersLostException.INSTANCE;
    }

    protected void wrapRemoteEntry(InvocationContext ctx, Object key, CacheEntry ice, boolean isWrite) {
        this.entryFactory.wrapExternalEntry(ctx, key, ice, true, isWrite);
    }

    protected final Object handleNonTxWriteCommand(InvocationContext ctx, AbstractDataWriteCommand command) throws Throwable {
        Object key = command.getKey();
        CacheEntry entry = ctx.lookupEntry(key);
        if (this.isLocalModeForced(command)) {
            if (entry == null) {
                this.entryFactory.wrapExternalEntry(ctx, key, null, false, true);
            }
            return this.invokeNext(ctx, command);
        }
        LocalizedCacheTopology cacheTopology = this.checkTopologyId(command);
        DistributionInfo info = cacheTopology.getDistribution(key);
        if (entry == null) {
            boolean load = this.shouldLoad(ctx, command, info);
            if (info.isPrimary()) {
                throw new IllegalStateException("Primary owner in writeCH should always be an owner in readCH as well.");
            }
            if (ctx.isOriginLocal()) {
                return this.invokeRemotely(command, info.primary());
            }
            if (load) {
                CompletableFuture<Void> getFuture = this.remoteGet(ctx, command, command.getKey(), true);
                return this.asyncInvokeNext(ctx, (VisitableCommand)command, getFuture);
            }
            this.entryFactory.wrapExternalEntry(ctx, key, null, false, true);
            return this.invokeNext(ctx, command);
        }
        if (info.isPrimary()) {
            return this.invokeNextThenApply(ctx, command, this.primaryReturnHandler);
        }
        if (ctx.isOriginLocal()) {
            return this.invokeRemotely(command, info.primary());
        }
        return this.invokeNext(ctx, command);
    }

    private boolean shouldLoad(InvocationContext ctx, AbstractDataWriteCommand command, DistributionInfo info) {
        if (!command.hasAnyFlag(FlagBitSets.SKIP_REMOTE_LOOKUP)) {
            VisitableCommand.LoadType loadType = command.loadType();
            switch (loadType) {
                case DONT_LOAD: {
                    return false;
                }
                case OWNER: {
                    return info.isPrimary() || info.isWriteOwner() && !ctx.isOriginLocal();
                }
                case PRIMARY: {
                    return info.isPrimary();
                }
            }
            throw new IllegalStateException();
        }
        return false;
    }

    private Object invokeRemotely(DataWriteCommand command, Address primaryOwner) {
        CompletableFuture<Map<Address, Response>> remoteInvocation;
        if (trace) {
            log.tracef("I'm not the primary owner, so sending the command to the primary owner(%s) in order to be forwarded", (Object)primaryOwner);
        }
        boolean isSyncForwarding = this.isSynchronous(command) || command.isReturnValueExpected();
        try {
            remoteInvocation = this.rpcManager.invokeRemotelyAsync(Collections.singletonList(primaryOwner), command, isSyncForwarding ? this.defaultSyncOptions : this.defaultAsyncOptions);
        }
        catch (Throwable t2) {
            command.setValueMatcher(command.getValueMatcher().matcherForRetry());
            throw t2;
        }
        if (isSyncForwarding) {
            return BaseDistributionInterceptor.asyncValue(remoteInvocation.handle((responses, t) -> {
                command.setValueMatcher(command.getValueMatcher().matcherForRetry());
                CompletableFutures.rethrowException(t);
                Response response = BaseDistributionInterceptor.getSingleResponse(responses);
                if (!response.isSuccessful()) {
                    command.fail();
                } else if (!(response instanceof ValidResponse)) {
                    throw BaseDistributionInterceptor.unexpected(response);
                }
                return ((ValidResponse)response).getResponseValue();
            }));
        }
        return null;
    }

    private Object primaryReturnHandler(InvocationContext ctx, VisitableCommand visitableCommand, Object localResult) {
        DataWriteCommand command = (DataWriteCommand)visitableCommand;
        if (!command.isSuccessful()) {
            if (trace) {
                log.tracef("Skipping the replication of the conditional command as it did not succeed on primary owner (%s).", (Object)command);
            }
            return localResult;
        }
        LocalizedCacheTopology cacheTopology = this.checkTopologyId(command);
        DistributionInfo distributionInfo = cacheTopology.getDistribution(command.getKey());
        List<Address> owners = distributionInfo.writeOwners();
        if (owners.size() == 1) {
            return localResult;
        }
        List<Address> recipients = this.isReplicated ? null : owners;
        ValueMatcher originalMatcher = command.getValueMatcher();
        command.setValueMatcher(ValueMatcher.MATCH_ALWAYS);
        RpcOptions rpcOptions = this.determineRpcOptionsForBackupReplication(this.rpcManager, this.isSynchronous(command), recipients);
        CompletableFuture<Map<Address, Response>> remoteInvocation = this.rpcManager.invokeRemotelyAsync(recipients, command, rpcOptions);
        return BaseDistributionInterceptor.asyncValue(remoteInvocation.handle((responses, t) -> {
            command.setValueMatcher(originalMatcher.matcherForRetry());
            CompletableFutures.rethrowException(t instanceof RemoteException ? t.getCause() : t);
            return localResult;
        }));
    }

    private RpcOptions determineRpcOptionsForBackupReplication(RpcManager rpc, boolean isSync, Collection<Address> recipients) {
        if (isSync) {
            return recipients == null ? rpc.getRpcOptionsBuilder(ResponseMode.SYNCHRONOUS_IGNORE_LEAVERS).build() : this.defaultSyncOptions;
        }
        return this.defaultAsyncOptions;
    }

    @Override
    public Object visitGetAllCommand(InvocationContext ctx, GetAllCommand command) throws Throwable {
        if (command.hasAnyFlag(FlagBitSets.CACHE_MODE_LOCAL | FlagBitSets.SKIP_REMOTE_LOOKUP)) {
            return this.invokeNext(ctx, command);
        }
        if (!ctx.isOriginLocal()) {
            for (Object key : command.getKeys()) {
                if (ctx.lookupEntry(key) != null) continue;
                return UnsureResponse.INSTANCE;
            }
            return this.invokeNext(ctx, command);
        }
        GetAllSuccessHandler getAllSuccessHandler = new GetAllSuccessHandler(command);
        CompletableFuture<Void> allFuture = this.remoteGetAll(ctx, command, command.getKeys(), getAllSuccessHandler);
        return BaseDistributionInterceptor.asyncValue(allFuture).thenApply(ctx, command, getAllSuccessHandler);
    }

    protected <C extends FlagAffectedCommand & TopologyAffectedCommand> CompletableFuture<Void> remoteGetAll(InvocationContext ctx, C command, Collection<?> keys, RemoteGetAllHandler remoteGetAllHandler) {
        Map<Address, List<Object>> requestedKeys = this.getKeysByOwner(ctx, keys, this.checkTopologyId(command), null, null);
        if (requestedKeys.isEmpty()) {
            return CompletableFutures.completedNull();
        }
        GlobalTransaction gtx = ctx.isInTxScope() ? ((TxInvocationContext)ctx).getGlobalTransaction() : null;
        ClusteringInterceptor.ClusteredGetAllFuture allFuture = new ClusteringInterceptor.ClusteredGetAllFuture(this, requestedKeys.size());
        for (Map.Entry<Address, List<Object>> pair : requestedKeys.entrySet()) {
            ClusteredGetAllCommand clusteredGetAllCommand = this.cf.buildClusteredGetAllCommand(pair.getValue(), command.getFlagsBitSet(), gtx);
            clusteredGetAllCommand.setTopologyId(((TopologyAffectedCommand)command).getTopologyId());
            this.rpcManager.invokeRemotelyAsync(Collections.singleton(pair.getKey()), clusteredGetAllCommand, this.syncIgnoreLeavers).whenComplete((BiConsumer)new ClusteredGetAllHandler(this, pair.getKey(), allFuture, ctx, command, pair.getValue(), null, remoteGetAllHandler));
        }
        return allFuture;
    }

    protected void handleRemotelyRetrievedKeys(InvocationContext ctx, List<?> remoteKeys) {
    }

    @Override
    public Object visitReadOnlyManyCommand(InvocationContext ctx, ReadOnlyManyCommand command) throws Throwable {
        return this.handleFunctionalReadManyCommand(ctx, command, this.readOnlyManyHelper);
    }

    protected <C extends TopologyAffectedCommand & FlagAffectedCommand> Object handleFunctionalReadManyCommand(InvocationContext ctx, C command, ReadManyCommandHelper<C> helper) {
        if (((FlagAffectedCommand)command).hasAnyFlag(FlagBitSets.CACHE_MODE_LOCAL | FlagBitSets.SKIP_REMOTE_LOOKUP)) {
            return this.handleLocalOnlyReadManyCommand(ctx, command, helper.keys(command));
        }
        LocalizedCacheTopology cacheTopology = this.checkTopologyId(command);
        Collection<?> keys = helper.keys(command);
        if (!ctx.isOriginLocal()) {
            return this.handleRemoteReadManyCommand(ctx, command, keys, helper);
        }
        if (keys.isEmpty()) {
            return Stream.empty();
        }
        ConsistentHash ch = cacheTopology.getReadConsistentHash();
        int estimateForOneNode = 2 * keys.size() / ch.getMembers().size();
        ArrayList<Object> availableKeys = new ArrayList<Object>(estimateForOneNode);
        Map<Address, List<Object>> requestedKeys = this.getKeysByOwner(ctx, keys, cacheTopology, availableKeys, null);
        MergingCompletableFuture<Object> allFuture = new MergingCompletableFuture<Object>(ctx, requestedKeys.size() + (availableKeys.isEmpty() ? 0 : 1), new Object[keys.size()], helper::transformResult);
        this.handleLocallyAvailableKeys(ctx, command, availableKeys, allFuture, helper);
        int pos = availableKeys.size();
        for (Map.Entry<Address, List<Object>> addressKeys : requestedKeys.entrySet()) {
            List<Object> keysForAddress = addressKeys.getValue();
            ReplicableCommand remoteCommand = helper.copyForRemote(command, keysForAddress, ctx);
            Set<Address> target = Collections.singleton(addressKeys.getKey());
            this.rpcManager.invokeRemotelyAsync(target, remoteCommand, this.syncIgnoreLeavers).whenComplete((BiConsumer)new ReadManyHandler(this, addressKeys.getKey(), allFuture, ctx, command, keysForAddress, null, pos, helper));
            pos += keysForAddress.size();
        }
        return BaseDistributionInterceptor.asyncValue(allFuture);
    }

    private Object handleLocalOnlyReadManyCommand(InvocationContext ctx, VisitableCommand command, Collection<?> keys) {
        for (Object key : keys) {
            if (ctx.lookupEntry(key) != null) continue;
            this.entryFactory.wrapExternalEntry(ctx, key, NullCacheEntry.getInstance(), true, false);
        }
        return this.invokeNext(ctx, command);
    }

    private <C extends TopologyAffectedCommand & VisitableCommand> Object handleRemoteReadManyCommand(InvocationContext ctx, C command, Collection<?> keys, InvocationSuccessFunction remoteReturnHandler) {
        for (Object key : keys) {
            if (ctx.lookupEntry(key) != null) continue;
            return UnsureResponse.INSTANCE;
        }
        return this.invokeNextThenApply(ctx, command, remoteReturnHandler);
    }

    private <C extends VisitableCommand> void handleLocallyAvailableKeys(InvocationContext ctx, C command, List<Object> availableKeys, MergingCompletableFuture<Object> allFuture, ReadManyCommandHelper<C> helper) {
        if (availableKeys.isEmpty()) {
            return;
        }
        VisitableCommand localCommand = (VisitableCommand)helper.copyForLocal(command, availableKeys);
        this.invokeNextAndHandle(ctx, localCommand, (rCtx, rCommand, rv, throwable) -> {
            if (throwable != null) {
                allFuture.completeExceptionally(throwable);
            } else {
                try {
                    helper.applyLocalResult(allFuture, rv);
                    allFuture.countDown();
                }
                catch (Throwable t) {
                    allFuture.completeExceptionally(t);
                }
            }
            return BaseDistributionInterceptor.asyncValue(allFuture);
        });
    }

    private Map<Address, List<Object>> getKeysByOwner(InvocationContext ctx, Collection<?> keys, LocalizedCacheTopology cacheTopology, List<Object> availableKeys, Map<Object, Collection<Address>> ignoredOwners) {
        int capacity = cacheTopology.getMembers().size();
        HashMap<Address, List<Object>> requestedKeys = new HashMap<Address, List<Object>>(capacity);
        int estimateForOneNode = 2 * keys.size() / capacity;
        for (Object key : keys) {
            CacheEntry entry = ctx.lookupEntry(key);
            if (entry == null) {
                DistributionInfo distributionInfo = cacheTopology.getDistribution(key);
                boolean foundExisting = false;
                Collection<Address> ignoreForKey = null;
                for (Address address : distributionInfo.readOwners()) {
                    List list;
                    if (address.equals(this.rpcManager.getAddress())) {
                        throw new IllegalStateException("Entry should be always wrapped!");
                    }
                    if (ignoredOwners != null) {
                        if (ignoreForKey == null) {
                            ignoreForKey = ignoredOwners.get(key);
                        }
                        if (ignoreForKey != null && ignoreForKey.contains(address)) continue;
                    }
                    if ((list = (List)requestedKeys.get(address)) == null) continue;
                    list.add(key);
                    foundExisting = true;
                    break;
                }
                if (foundExisting) continue;
                Address target = null;
                if (ignoredOwners == null) {
                    target = distributionInfo.primary();
                } else {
                    for (Address address2 : distributionInfo.readOwners()) {
                        if (ignoreForKey == null) {
                            ignoreForKey = ignoredOwners.get(key);
                        }
                        if (ignoreForKey != null && ignoreForKey.contains(address2)) continue;
                        target = address2;
                        break;
                    }
                }
                if (target == null) continue;
                ArrayList arrayList = new ArrayList(estimateForOneNode);
                arrayList.add(key);
                requestedKeys.put(target, arrayList);
                continue;
            }
            if (availableKeys == null) continue;
            availableKeys.add(key);
        }
        return requestedKeys;
    }

    protected Object wrapFunctionalManyResultOnNonOrigin(InvocationContext rCtx, Collection<?> keys, Object[] values) {
        return values;
    }

    protected Object[] unwrapFunctionalManyResultOnOrigin(InvocationContext ctx, List<Object> keys, Object responseValue) {
        return responseValue instanceof Object[] ? (Object[])responseValue : null;
    }

    private Object visitGetCommand(InvocationContext ctx, AbstractDataCommand command) throws Throwable {
        return ctx.lookupEntry(command.getKey()) == null ? this.onEntryMiss(ctx, command) : this.invokeNext(ctx, command);
    }

    private Object onEntryMiss(InvocationContext ctx, AbstractDataCommand command) {
        return ctx.isOriginLocal() ? this.handleMissingEntryOnLocalRead(ctx, command) : UnsureResponse.INSTANCE;
    }

    private Object handleMissingEntryOnLocalRead(InvocationContext ctx, AbstractDataCommand command) {
        return this.readNeedsRemoteValue(command) ? this.asyncInvokeNext(ctx, (VisitableCommand)command, this.remoteGet(ctx, command, command.getKey(), false)) : null;
    }

    @Override
    public Object visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand command) throws Throwable {
        return this.visitGetCommand(ctx, command);
    }

    @Override
    public Object visitGetCacheEntryCommand(InvocationContext ctx, GetCacheEntryCommand command) throws Throwable {
        return this.visitGetCommand(ctx, command);
    }

    @Override
    public Object visitReadOnlyKeyCommand(InvocationContext ctx, ReadOnlyKeyCommand command) throws Throwable {
        Object key = command.getKey();
        CacheEntry entry = ctx.lookupEntry(key);
        if (entry != null) {
            if (ctx.isOriginLocal()) {
                return this.invokeNext(ctx, command);
            }
            return this.invokeNextThenApply(ctx, command, (rCtx, rCommand, rv) -> this.wrapFunctionalResultOnNonOriginOnReturn(rv, entry));
        }
        if (!ctx.isOriginLocal()) {
            return UnsureResponse.INSTANCE;
        }
        if (this.readNeedsRemoteValue(command)) {
            LocalizedCacheTopology cacheTopology = this.checkTopologyId(command);
            List<Address> owners = cacheTopology.getDistribution(key).readOwners();
            if (trace) {
                log.tracef("Doing a remote get for key %s in topology %d to %s", key, (Object)cacheTopology.getTopologyId(), (Object)owners);
            }
            ReadOnlyKeyCommand remoteCommand = this.remoteReadOnlyCommand(ctx, command);
            remoteCommand.setTopologyId(cacheTopology.getTopologyId());
            CompletableFuture<Map<Address, Response>> rpc = this.rpcManager.invokeRemotelyAsync(owners, remoteCommand, this.getStaggeredOptions(owners.size()));
            return BaseDistributionInterceptor.asyncValue(rpc.thenApply(responses -> {
                for (Response rsp : responses.values()) {
                    if (!rsp.isSuccessful()) continue;
                    return this.unwrapFunctionalResultOnOrigin(ctx, key, ((SuccessfulResponse)rsp).getResponseValue());
                }
                throw BaseDistributionInterceptor.handleMissingSuccessfulResponse(responses);
            }));
        }
        this.entryFactory.wrapExternalEntry(ctx, key, NullCacheEntry.getInstance(), true, false);
        return this.invokeNext(ctx, command);
    }

    protected ReadOnlyKeyCommand remoteReadOnlyCommand(InvocationContext ctx, ReadOnlyKeyCommand command) {
        return command;
    }

    protected Object wrapFunctionalResultOnNonOriginOnReturn(Object rv, CacheEntry entry) {
        return rv;
    }

    protected Object unwrapFunctionalResultOnOrigin(InvocationContext ctx, Object key, Object responseValue) {
        return responseValue;
    }

    protected LocalizedCacheTopology checkTopologyId(TopologyAffectedCommand command) {
        LocalizedCacheTopology cacheTopology = this.dm.getCacheTopology();
        int currentTopologyId = cacheTopology.getTopologyId();
        int cmdTopology = command.getTopologyId();
        if (command instanceof FlagAffectedCommand && ((FlagAffectedCommand)((Object)command)).hasAnyFlag(FlagBitSets.SKIP_OWNERSHIP_CHECK | FlagBitSets.CACHE_MODE_LOCAL)) {
            log.tracef("Skipping topology check for command %s", (Object)command);
            return cacheTopology;
        }
        if (cmdTopology >= 0 && currentTopologyId != cmdTopology) {
            throw new OutdatedTopologyException("Cache topology changed while the command was executing: expected " + cmdTopology + ", got " + currentTopologyId);
        }
        if (trace) {
            log.tracef("Current topology %d, command topology %d", currentTopologyId, cmdTopology);
        }
        return cacheTopology;
    }

    protected boolean readNeedsRemoteValue(AbstractDataCommand command) {
        return !command.hasAnyFlag(FlagBitSets.CACHE_MODE_LOCAL | FlagBitSets.SKIP_REMOTE_LOOKUP);
    }

    protected class ReadOnlyManyHelper
    implements ReadManyCommandHelper<ReadOnlyManyCommand> {
        protected ReadOnlyManyHelper() {
        }

        @Override
        public Object apply(InvocationContext rCtx, VisitableCommand rCommand, Object rv) throws Throwable {
            return BaseDistributionInterceptor.this.wrapFunctionalManyResultOnNonOrigin(rCtx, ((ReadOnlyManyCommand)rCommand).getKeys(), ((Stream)rv).toArray());
        }

        @Override
        public Collection<?> keys(ReadOnlyManyCommand command) {
            return command.getKeys();
        }

        @Override
        public ReadOnlyManyCommand copyForLocal(ReadOnlyManyCommand command, List<Object> keys) {
            return new ReadOnlyManyCommand(command).withKeys(keys);
        }

        @Override
        public ReplicableCommand copyForRemote(ReadOnlyManyCommand command, List<Object> keys, InvocationContext ctx) {
            return new ReadOnlyManyCommand(command).withKeys(keys);
        }

        @Override
        public void applyLocalResult(MergingCompletableFuture allFuture, Object rv) {
            Supplier<ArrayIterator> supplier = () -> new ArrayIterator(allFuture.results);
            BiConsumer<ArrayIterator, Object> consumer = ArrayIterator::add;
            BiConsumer<ArrayIterator, ArrayIterator> combiner = ArrayIterator::combine;
            ((Stream)rv).collect(supplier, consumer, combiner);
        }

        @Override
        public Object transformResult(Object[] results) {
            return Arrays.stream(results).filter(o -> o != LOST_PLACEHOLDER);
        }
    }

    protected static interface ReadManyCommandHelper<C>
    extends InvocationSuccessFunction {
        public Collection<?> keys(C var1);

        public C copyForLocal(C var1, List<Object> var2);

        public ReplicableCommand copyForRemote(C var1, List<Object> var2, InvocationContext var3);

        public void applyLocalResult(MergingCompletableFuture var1, Object var2);

        public Object transformResult(Object[] var1);
    }

    protected static class MergingCompletableFuture<T>
    extends CountDownCompletableFuture {
        private final Function<T[], Object> transform;
        protected final T[] results;
        protected volatile boolean hasUnsureResponse;
        protected volatile boolean lostData;

        public MergingCompletableFuture(InvocationContext ctx, int participants, T[] results, Function<T[], Object> transform) {
            super(ctx, participants);
            this.results = results;
            this.transform = transform;
        }

        @Override
        protected Object result() {
            if (this.hasUnsureResponse && this.lostData) {
                throw OutdatedTopologyException.INSTANCE;
            }
            return this.transform == null || this.results == null ? null : this.transform.apply(this.results);
        }
    }

    protected static class CountDownCompletableFuture
    extends CompletableFuture<Object> {
        protected final InvocationContext ctx;
        protected final AtomicInteger counter;

        public CountDownCompletableFuture(InvocationContext ctx, int participants) {
            if (trace) {
                log.tracef("Creating shortcut countdown with %d participants", participants);
            }
            this.ctx = ctx;
            this.counter = new AtomicInteger(participants);
        }

        public void countDown() {
            if (this.counter.decrementAndGet() == 0) {
                Object result = null;
                try {
                    result = this.result();
                }
                catch (Throwable t) {
                    this.completeExceptionally(t);
                }
                finally {
                    this.complete(result);
                }
            }
        }

        public void increment() {
            int preValue = this.counter.getAndIncrement();
            if (preValue == 0) {
                throw new IllegalStateException();
            }
        }

        protected Object result() {
            return null;
        }
    }

    protected static class ArrayIterator {
        private final Object[] array;
        private int pos = 0;

        public ArrayIterator(Object[] array) {
            this.array = array;
        }

        public void add(Object item) {
            this.array[this.pos] = item;
            ++this.pos;
        }

        public void combine(ArrayIterator other) {
            throw new UnsupportedOperationException("The stream is not supposed to be parallel");
        }
    }

    private static class ReadManyHandler<C extends FlagAffectedCommand & TopologyAffectedCommand>
    implements BiConsumer<Map<Address, Response>, Throwable> {
        private final Address target;
        private final MergingCompletableFuture<Object> allFuture;
        private final InvocationContext ctx;
        private final C command;
        private final List<Object> keys;
        private final int destinationIndex;
        private final Map<Object, Collection<Address>> contactedNodes;
        private final ReadManyCommandHelper<C> helper;
        final /* synthetic */ BaseDistributionInterceptor this$0;

        private ReadManyHandler(Address target, MergingCompletableFuture<Object> allFuture, InvocationContext ctx, C command, List<Object> keys, Map<Object, Collection<Address>> contactedNodes, int destinationIndex, ReadManyCommandHelper<C> helper) {
            this.this$0 = var1_1;
            this.target = target;
            this.allFuture = allFuture;
            this.ctx = ctx;
            this.command = command;
            this.keys = keys;
            this.destinationIndex = destinationIndex;
            this.contactedNodes = contactedNodes;
            this.helper = helper;
        }

        @Override
        public void accept(Map<Address, Response> responseMap, Throwable throwable) {
            if (throwable != null) {
                this.allFuture.completeExceptionally(throwable);
                return;
            }
            SuccessfulResponse response = BaseDistributionInterceptor.getSuccessfulResponseOrFail(responseMap, this.allFuture, this::handleMissingResponse);
            if (response == null) {
                return;
            }
            try {
                Object responseValue = response.getResponseValue();
                Object[] values = this.this$0.unwrapFunctionalManyResultOnOrigin(this.ctx, this.keys, responseValue);
                if (values != null) {
                    System.arraycopy(values, 0, this.allFuture.results, this.destinationIndex, values.length);
                    this.allFuture.countDown();
                } else {
                    this.allFuture.completeExceptionally(new IllegalStateException("Unexpected response value " + responseValue));
                }
            }
            catch (Throwable t) {
                this.allFuture.completeExceptionally(t);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void handleMissingResponse(Response response) {
            Map requestedKeys;
            Map<Object, Object> contactedNodes;
            if (response instanceof UnsureResponse) {
                this.allFuture.hasUnsureResponse = true;
            }
            Map<Object, Object> map = contactedNodes = this.contactedNodes == null ? new HashMap() : this.contactedNodes;
            synchronized (map) {
                for (Object object : this.keys) {
                    contactedNodes.computeIfAbsent(object, k -> new ArrayList(4)).add(this.target);
                }
                requestedKeys = this.this$0.getKeysByOwner(this.ctx, this.keys, this.this$0.checkTopologyId((TopologyAffectedCommand)this.command), null, contactedNodes);
            }
            int pos = this.destinationIndex;
            for (Map.Entry entry : requestedKeys.entrySet()) {
                this.allFuture.increment();
                List keysForAddress = (List)entry.getValue();
                ReplicableCommand remoteCommand = this.helper.copyForRemote(this.command, keysForAddress, this.ctx);
                Set<Address> target = Collections.singleton(entry.getKey());
                this.this$0.rpcManager.invokeRemotelyAsync(target, remoteCommand, this.this$0.syncIgnoreLeavers).whenComplete((BiConsumer)new ReadManyHandler(this.this$0, (Address)entry.getKey(), this.allFuture, this.ctx, this.command, keysForAddress, contactedNodes, pos, this.helper));
                pos += keysForAddress.size();
            }
            Arrays.fill(this.allFuture.results, pos, this.destinationIndex + this.keys.size(), LOST_PLACEHOLDER);
            this.allFuture.lostData = true;
            this.allFuture.countDown();
        }
    }

    private class GetAllSuccessHandler
    implements RemoteGetAllHandler,
    InvocationSuccessFunction {
        private GetAllCommand localCommand;
        private boolean lostData;
        private boolean hasUnsureResponse;

        public GetAllSuccessHandler(GetAllCommand localCommand) {
            this.localCommand = localCommand;
        }

        @Override
        public void onUnsureResponse() {
            this.hasUnsureResponse = true;
        }

        @Override
        @GuardedBy(value="allFuture")
        public void onKeysLost(Collection<?> lostKeys) {
            this.lostData = true;
            HashSet strippedKeys = new HashSet(this.localCommand.getKeys());
            strippedKeys.removeAll(lostKeys);
            this.localCommand = BaseDistributionInterceptor.this.cf.buildGetAllCommand(strippedKeys, this.localCommand.getFlagsBitSet(), this.localCommand.isReturnEntries());
        }

        @Override
        public Object apply(InvocationContext rCtx, VisitableCommand rCommand, Object rv) throws Throwable {
            assert (rv == null);
            if (this.hasUnsureResponse && this.lostData) {
                throw OutdatedTopologyException.INSTANCE;
            }
            return BaseDistributionInterceptor.this.invokeNext(rCtx, this.localCommand);
        }
    }

    protected static interface RemoteGetAllHandler {
        public void onUnsureResponse();

        public void onKeysLost(Collection<?> var1);
    }

    private static class ClusteredGetAllHandler<C extends FlagAffectedCommand & TopologyAffectedCommand>
    implements BiConsumer<Map<Address, Response>, Throwable> {
        private final Address target;
        private final ClusteringInterceptor.ClusteredGetAllFuture allFuture;
        private final InvocationContext ctx;
        private final C command;
        private final List<?> keys;
        private final Map<Object, Collection<Address>> contactedNodes;
        private final RemoteGetAllHandler remoteGetAllHandler;
        final /* synthetic */ BaseDistributionInterceptor this$0;

        private ClusteredGetAllHandler(Address target, ClusteringInterceptor.ClusteredGetAllFuture allFuture, InvocationContext ctx, C command, List<?> keys, Map<Object, Collection<Address>> contactedNodes, RemoteGetAllHandler remoteGetAllHandler) {
            this.this$0 = var1_1;
            this.target = target;
            this.allFuture = allFuture;
            this.keys = keys;
            this.ctx = ctx;
            this.command = command;
            this.contactedNodes = contactedNodes;
            this.remoteGetAllHandler = remoteGetAllHandler;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void accept(Map<Address, Response> responseMap, Throwable throwable) {
            if (throwable != null) {
                this.allFuture.completeExceptionally(throwable);
                return;
            }
            SuccessfulResponse response = BaseDistributionInterceptor.getSuccessfulResponseOrFail(responseMap, this.allFuture, this::handleMissingResponse);
            if (response == null) {
                return;
            }
            Object responseValue = response.getResponseValue();
            if (!(responseValue instanceof InternalCacheValue[])) {
                this.allFuture.completeExceptionally(new IllegalStateException("Unexpected response value: " + responseValue));
                return;
            }
            InternalCacheValue[] values = (InternalCacheValue[])responseValue;
            if (this.allFuture.isDone()) {
                return;
            }
            ClusteringInterceptor.ClusteredGetAllFuture clusteredGetAllFuture = this.allFuture;
            synchronized (clusteredGetAllFuture) {
                if (this.allFuture.isDone()) {
                    return;
                }
                for (int i = 0; i < this.keys.size(); ++i) {
                    Object key = this.keys.get(i);
                    InternalCacheValue value = values[i];
                    NullCacheEntry entry = value == null ? NullCacheEntry.getInstance() : value.toInternalCacheEntry(key);
                    this.this$0.wrapRemoteEntry(this.ctx, key, entry, false);
                }
                this.this$0.handleRemotelyRetrievedKeys(this.ctx, this.keys);
                if (--this.allFuture.counter == 0) {
                    this.allFuture.complete(null);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void handleMissingResponse(Response response) {
            Map requestedKeys;
            if (response instanceof UnsureResponse) {
                this.remoteGetAllHandler.onUnsureResponse();
            }
            GlobalTransaction gtx = this.ctx.isInTxScope() ? ((TxInvocationContext)this.ctx).getGlobalTransaction() : null;
            Map<Object, Object> contactedNodes = this.contactedNodes == null ? new HashMap() : this.contactedNodes;
            Object object = contactedNodes;
            synchronized (object) {
                for (Object key : this.keys) {
                    contactedNodes.computeIfAbsent(key, k -> new ArrayList(4)).add(this.target);
                }
                requestedKeys = this.this$0.getKeysByOwner(this.ctx, this.keys, this.this$0.checkTopologyId((TopologyAffectedCommand)this.command), null, contactedNodes);
            }
            object = this.allFuture;
            synchronized (object) {
                this.allFuture.counter += requestedKeys.size();
            }
            for (Map.Entry pair : requestedKeys.entrySet()) {
                ClusteredGetAllCommand clusteredGetAllCommand = this.this$0.cf.buildClusteredGetAllCommand((List)pair.getValue(), this.command.getFlagsBitSet(), gtx);
                clusteredGetAllCommand.setTopologyId(((TopologyAffectedCommand)this.command).getTopologyId());
                this.keys.removeAll((Collection)pair.getValue());
                this.this$0.rpcManager.invokeRemotelyAsync(Collections.singleton(pair.getKey()), clusteredGetAllCommand, this.this$0.syncIgnoreLeavers).whenComplete((BiConsumer)new ClusteredGetAllHandler(this.this$0, (Address)pair.getKey(), this.allFuture, this.ctx, this.command, (List)pair.getValue(), contactedNodes, this.remoteGetAllHandler));
            }
            if (!this.keys.isEmpty()) {
                object = this.allFuture;
                synchronized (object) {
                    try {
                        this.remoteGetAllHandler.onKeysLost(this.keys);
                    }
                    catch (Throwable t) {
                        this.allFuture.completeExceptionally(t);
                    }
                }
            }
            object = this.allFuture;
            synchronized (object) {
                if (--this.allFuture.counter == 0) {
                    this.allFuture.complete(null);
                }
            }
        }
    }
}

