/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.interceptors.impl;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.infinispan.commands.DataCommand;
import org.infinispan.commands.FlagAffectedCommand;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.functional.ReadWriteKeyCommand;
import org.infinispan.commands.functional.ReadWriteKeyValueCommand;
import org.infinispan.commands.functional.ReadWriteManyCommand;
import org.infinispan.commands.functional.ReadWriteManyEntriesCommand;
import org.infinispan.commands.functional.WriteOnlyKeyCommand;
import org.infinispan.commands.functional.WriteOnlyKeyValueCommand;
import org.infinispan.commands.functional.WriteOnlyManyCommand;
import org.infinispan.commands.functional.WriteOnlyManyEntriesCommand;
import org.infinispan.commands.read.GetAllCommand;
import org.infinispan.commands.read.GetCacheEntryCommand;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.commands.write.ComputeCommand;
import org.infinispan.commands.write.ComputeIfAbsentCommand;
import org.infinispan.commands.write.DataWriteCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.PutMapCommand;
import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.commands.write.ReplaceCommand;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.commons.time.TimeService;
import org.infinispan.commons.util.Util;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.versioning.EntryVersion;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.distribution.DistributionInfo;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.interceptors.InvocationSuccessFunction;
import org.infinispan.interceptors.impl.CacheWriterInterceptor;
import org.infinispan.metadata.Metadata;
import org.infinispan.persistence.manager.OrderedUpdatesManager;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

public class ScatteredCacheWriterInterceptor
extends CacheWriterInterceptor {
    private static final Log log = LogFactory.getLog(ScatteredCacheWriterInterceptor.class);
    @Inject
    private DistributionManager dm;
    @Inject
    private TimeService timeService;
    @Inject
    @ComponentName(value="org.infinispan.executors.timeout")
    private ScheduledExecutorService timeoutExecutor;
    @Inject
    private OrderedUpdatesManager orderedUpdatesManager;
    private long lockTimeout;
    private final InvocationSuccessFunction handleDataWriteReturn = this::handleDataWriteReturn;
    private final InvocationSuccessFunction handleManyWriteReturn = this::handleManyWriteReturn;

    @Override
    protected Log getLog() {
        return log;
    }

    @Override
    public void start() {
        super.start();
        this.lockTimeout = TimeUnit.MILLISECONDS.toNanos(this.cacheConfiguration.locking().lockAcquisitionTimeout());
    }

    private Object handleReadCommand(InvocationContext ctx, DataCommand command) {
        CompletableFuture<?> wf = this.orderedUpdatesManager.waitFuture(command.getKey());
        if (wf != null) {
            return this.asyncInvokeNext(ctx, (VisitableCommand)command, wf);
        }
        return this.invokeNext(ctx, command);
    }

    @Override
    public Object visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand command) throws Throwable {
        return this.handleReadCommand(ctx, command);
    }

    @Override
    public Object visitGetCacheEntryCommand(InvocationContext ctx, GetCacheEntryCommand command) throws Throwable {
        return this.handleReadCommand(ctx, command);
    }

    @Override
    public Object visitGetAllCommand(InvocationContext ctx, GetAllCommand command) throws Throwable {
        ArrayList wfs = null;
        for (Object key : command.getKeys()) {
            CompletableFuture<?> wf = this.orderedUpdatesManager.waitFuture(key);
            if (wf == null) continue;
            if (wfs == null) {
                wfs = new ArrayList();
            }
            wfs.add(wf);
        }
        return this.asyncInvokeNext(ctx, (VisitableCommand)command, wfs);
    }

    private Object handleDataWriteReturn(InvocationContext ctx, VisitableCommand command, Object rv) {
        EntryVersion version;
        DataWriteCommand dataWriteCommand = (DataWriteCommand)command;
        Object key = dataWriteCommand.getKey();
        if (!this.isStoreEnabled(dataWriteCommand) || !dataWriteCommand.isSuccessful()) {
            return rv;
        }
        CacheEntry cacheEntry = ctx.lookupEntry(key);
        if (cacheEntry == null) {
            throw new IllegalStateException();
        }
        Metadata metadata = cacheEntry.getMetadata();
        EntryVersion entryVersion = version = metadata == null ? null : metadata.version();
        if (version != null) {
            long deadline = this.timeService.expectedEndTime(this.lockTimeout, TimeUnit.NANOSECONDS);
            CompletableFuture<?> future = this.orderedUpdatesManager.checkLockAndStore(key, version, wf -> this.scheduleTimeout((CompletableFuture<?>)wf, deadline, key), k -> this.storeAndUpdateStats(ctx, k, dataWriteCommand));
            if (future == null) {
                return rv;
            }
            return ScatteredCacheWriterInterceptor.asyncValue(future.thenApply(nil -> rv));
        }
        this.storeAndUpdateStats(ctx, key, dataWriteCommand);
        return rv;
    }

    private void storeAndUpdateStats(InvocationContext ctx, Object key, WriteCommand command) {
        this.storeEntry(ctx, key, command);
        if (this.getStatisticsEnabled()) {
            this.cacheStores.incrementAndGet();
        }
    }

    @Override
    public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
        return this.invokeNextThenApply(ctx, command, this.handleDataWriteReturn);
    }

    @Override
    public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) throws Throwable {
        return this.invokeNextThenApply(ctx, command, this.handleDataWriteReturn);
    }

    @Override
    public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable {
        return this.invokeNextThenApply(ctx, command, this.handleDataWriteReturn);
    }

    @Override
    public Object visitComputeCommand(InvocationContext ctx, ComputeCommand command) throws Throwable {
        return this.invokeNextThenApply(ctx, command, this.handleDataWriteReturn);
    }

    @Override
    public Object visitComputeIfAbsentCommand(InvocationContext ctx, ComputeIfAbsentCommand command) throws Throwable {
        return this.invokeNextThenApply(ctx, command, this.handleDataWriteReturn);
    }

    private Object handleManyWriteReturn(InvocationContext ctx, VisitableCommand rcommand, Object rv) {
        WriteCommand command = (WriteCommand)rcommand;
        if (!this.isStoreEnabled(command)) {
            return rv;
        }
        Collection<?> keys = command.getAffectedKeys();
        long deadline = this.timeService.expectedEndTime(this.lockTimeout, TimeUnit.NANOSECONDS);
        WaitFutures waitFutures = new WaitFutures(this.lockTimeout, keys);
        ArrayList futures = null;
        for (Object key : keys) {
            EntryVersion version;
            CacheEntry cacheEntry = ctx.lookupEntry(key);
            Metadata metadata = cacheEntry.getMetadata();
            EntryVersion entryVersion = version = metadata == null ? null : metadata.version();
            if (version != null) {
                CompletableFuture<?> future = this.orderedUpdatesManager.checkLockAndStore(key, version, wf -> {
                    CompletionStage cf = wf.thenAccept(nil -> {});
                    waitFutures.add((CompletableFuture<?>)cf);
                    return cf;
                }, k -> this.storeEntry(ctx, k, command));
                if (future == null || future.isDone()) continue;
                if (futures == null) {
                    futures = new ArrayList();
                }
                futures.add(future);
                continue;
            }
            this.storeEntry(ctx, cacheEntry.getKey(), command);
        }
        if (futures == null) {
            if (this.getStatisticsEnabled()) {
                this.cacheStores.getAndAdd(keys.size());
            }
            return rv;
        }
        ScheduledFuture<?> schedule = this.timeoutExecutor.schedule(waitFutures::cancel, this.timeService.remainingTime(deadline, TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
        CompletableFuture<Void> allFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
        return ScatteredCacheWriterInterceptor.asyncValue(allFuture.thenApply(nil -> {
            schedule.cancel(false);
            if (this.getStatisticsEnabled()) {
                this.cacheStores.getAndAdd(keys.size());
            }
            return rv;
        }));
    }

    @Override
    public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable {
        return this.invokeNextThenApply(ctx, command, this.handleManyWriteReturn);
    }

    @Override
    public Object visitReadWriteKeyCommand(InvocationContext ctx, ReadWriteKeyCommand command) throws Throwable {
        return this.invokeNextThenApply(ctx, command, this.handleDataWriteReturn);
    }

    @Override
    public Object visitReadWriteKeyValueCommand(InvocationContext ctx, ReadWriteKeyValueCommand command) throws Throwable {
        return this.invokeNextThenApply(ctx, command, this.handleDataWriteReturn);
    }

    @Override
    public Object visitWriteOnlyKeyCommand(InvocationContext ctx, WriteOnlyKeyCommand command) throws Throwable {
        return this.invokeNextThenApply(ctx, command, this.handleDataWriteReturn);
    }

    @Override
    public Object visitWriteOnlyKeyValueCommand(InvocationContext ctx, WriteOnlyKeyValueCommand command) throws Throwable {
        return this.invokeNextThenApply(ctx, command, this.handleDataWriteReturn);
    }

    @Override
    public Object visitWriteOnlyManyCommand(InvocationContext ctx, WriteOnlyManyCommand command) throws Throwable {
        return this.invokeNextThenApply(ctx, command, this.handleManyWriteReturn);
    }

    @Override
    public Object visitWriteOnlyManyEntriesCommand(InvocationContext ctx, WriteOnlyManyEntriesCommand command) throws Throwable {
        return this.invokeNextThenApply(ctx, command, this.handleManyWriteReturn);
    }

    @Override
    public Object visitReadWriteManyCommand(InvocationContext ctx, ReadWriteManyCommand command) throws Throwable {
        return this.invokeNextThenApply(ctx, command, this.handleManyWriteReturn);
    }

    @Override
    public Object visitReadWriteManyEntriesCommand(InvocationContext ctx, ReadWriteManyEntriesCommand command) throws Throwable {
        return this.invokeNextThenApply(ctx, command, this.handleManyWriteReturn);
    }

    @Override
    protected boolean skipSharedStores(InvocationContext ctx, Object key, FlagAffectedCommand command) {
        DistributionInfo info = this.dm.getCacheTopology().getDistribution(key);
        return !info.isPrimary() || command.hasAnyFlag(FlagBitSets.SKIP_SHARED_CACHE_STORE);
    }

    private CompletableFuture<?> scheduleTimeout(CompletableFuture<?> future, long deadline, Object key) {
        if (future.isDone()) {
            return future;
        }
        LockTimeoutFuture lockTimeoutFuture = new LockTimeoutFuture(this.lockTimeout, key);
        ScheduledFuture<?> schedule = this.timeoutExecutor.schedule(lockTimeoutFuture, this.timeService.remainingTime(deadline, TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
        lockTimeoutFuture.setCancellation(schedule);
        future.whenComplete((BiConsumer)lockTimeoutFuture);
        return lockTimeoutFuture;
    }

    private static class WaitFutures {
        private final long lockTimeout;
        private final Collection<?> keys;
        private List<CompletableFuture<?>> futures;
        private boolean cancelled;

        private WaitFutures(long lockTimeout, Collection<?> keys) {
            this.lockTimeout = lockTimeout;
            this.keys = keys;
        }

        public synchronized void add(CompletableFuture<?> cf) {
            if (this.cancelled) {
                cf.completeExceptionally(log.unableToAcquireLock(Util.prettyPrintTime(this.lockTimeout, TimeUnit.NANOSECONDS), this.keys, null, null));
                return;
            }
            if (this.futures == null) {
                this.futures = new ArrayList();
            }
            this.futures.add(cf);
        }

        public synchronized void cancel() {
            for (CompletableFuture<?> cf : this.futures) {
                cf.completeExceptionally(log.unableToAcquireLock(Util.prettyPrintTime(this.lockTimeout, TimeUnit.NANOSECONDS), this.keys, null, null));
            }
            this.cancelled = true;
        }
    }

    public class LockTimeoutFuture
    extends CompletableFuture<Void>
    implements Runnable,
    BiConsumer<Object, Throwable> {
        private final Object key;
        private final long lockTimeout;
        private ScheduledFuture<?> cancellation;

        private LockTimeoutFuture(long lockTimeout, Object key) {
            this.lockTimeout = lockTimeout;
            this.key = key;
        }

        @Override
        public void run() {
            this.completeExceptionally(log.unableToAcquireLock(Util.prettyPrintTime(this.lockTimeout, TimeUnit.NANOSECONDS), this.key, null, null));
        }

        @Override
        public void accept(Object o, Throwable throwable) {
            this.cancellation.cancel(false);
            if (throwable != null) {
                this.completeExceptionally(throwable);
            } else {
                this.complete(null);
            }
        }

        public void setCancellation(ScheduledFuture<?> cancellation) {
            this.cancellation = cancellation;
        }
    }
}

