/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.cache.store;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.cache.Cache;
import javax.cache.integration.CacheWriterException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lifecycle.LifecycleAware;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentLinkedHashMap;

public class GridCacheWriteBehindStore<K, V>
implements CacheStore<K, V>,
LifecycleAware {
    public static final int DFLT_INITIAL_CAPACITY = 1024;
    public static final float CACHE_OVERFLOW_RATIO = 1.5f;
    public static final int DFLT_CONCUR_LVL = 64;
    private int initCap = 1024;
    private int concurLvl = 64;
    private int cacheMaxSize = 10240;
    private int cacheCriticalSize;
    private int flushThreadCnt = 1;
    private long cacheFlushFreq = 5000L;
    private int batchSize = 512;
    private String gridName;
    private String cacheName;
    private CacheStore<K, V> store;
    private ConcurrentLinkedHashMap<K, StatefulValue<K, V>> writeCache;
    private GridWorker[] flushThreads;
    private AtomicBoolean stopping = new AtomicBoolean(true);
    private Lock flushLock = new ReentrantLock();
    private Condition canFlush = this.flushLock.newCondition();
    private AtomicInteger cacheTotalOverflowCntr = new AtomicInteger();
    private AtomicInteger cacheOverflowCntr = new AtomicInteger();
    private AtomicInteger retryEntriesCnt = new AtomicInteger();
    private IgniteLogger log;
    private CacheStoreManager storeMgr;

    public GridCacheWriteBehindStore(CacheStoreManager storeMgr, String gridName, String cacheName, IgniteLogger log, CacheStore<K, V> store) {
        this.storeMgr = storeMgr;
        this.gridName = gridName;
        this.cacheName = cacheName;
        this.log = log;
        this.store = store;
    }

    public void setInitialCapacity(int initCap) {
        this.initCap = initCap;
    }

    public void setConcurrencyLevel(int concurLvl) {
        this.concurLvl = concurLvl;
    }

    public void setFlushSize(int cacheMaxSize) {
        this.cacheMaxSize = cacheMaxSize;
    }

    public int getWriteBehindFlushSize() {
        return this.cacheMaxSize;
    }

    public void setFlushThreadCount(int flushThreadCnt) {
        this.flushThreadCnt = flushThreadCnt;
    }

    public int getWriteBehindFlushThreadCount() {
        return this.flushThreadCnt;
    }

    public void setFlushFrequency(long cacheFlushFreq) {
        this.cacheFlushFreq = cacheFlushFreq;
    }

    public long getWriteBehindFlushFrequency() {
        return this.cacheFlushFreq;
    }

    public void setBatchSize(int batchSize) {
        this.batchSize = batchSize;
    }

    public int getWriteBehindStoreBatchSize() {
        return this.batchSize;
    }

    public int getWriteBehindBufferSize() {
        return this.writeCache.sizex();
    }

    public CacheStore<K, V> store() {
        return this.store;
    }

    @Override
    public void start() {
        assert (this.cacheFlushFreq != 0L || this.cacheMaxSize != 0);
        if (this.stopping.compareAndSet(true, false)) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Starting write-behind store for cache '" + this.cacheName + '\'');
            }
            this.cacheCriticalSize = (int)((float)this.cacheMaxSize * 1.5f);
            if (this.cacheCriticalSize == 0) {
                this.cacheCriticalSize = 16384;
            }
            this.flushThreads = new GridWorker[this.flushThreadCnt];
            this.writeCache = new ConcurrentLinkedHashMap(this.initCap, 0.75f, this.concurLvl);
            for (int i = 0; i < this.flushThreads.length; ++i) {
                this.flushThreads[i] = new Flusher(this.gridName, "flusher-" + i, this.log);
                new IgniteThread(this.flushThreads[i]).start();
            }
        }
    }

    public int getWriteBehindTotalCriticalOverflowCount() {
        return this.cacheTotalOverflowCntr.get();
    }

    public int getWriteBehindCriticalOverflowCount() {
        return this.cacheOverflowCntr.get();
    }

    public int getWriteBehindErrorRetryCount() {
        return this.retryEntriesCnt.get();
    }

    @Override
    public void stop() {
        if (this.stopping.compareAndSet(false, true)) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Stopping write-behind store for cache '" + this.cacheName + '\'');
            }
            this.wakeUp();
            boolean graceful = true;
            for (GridWorker worker : this.flushThreads) {
                graceful &= U.join(worker, this.log);
            }
            if (!graceful) {
                this.log.warning("Shutdown was aborted");
            }
        }
    }

    public void forceFlush() throws IgniteCheckedException {
        this.wakeUp();
    }

    @Override
    public void loadCache(IgniteBiInClosure<K, V> clo, Object ... args) {
        this.store.loadCache(clo, args);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Map<K, V> loadAll(Iterable<? extends K> keys) {
        Map loaded0;
        if (this.log.isDebugEnabled()) {
            this.log.debug("Store load all [keys=" + keys + ']');
        }
        HashMap loaded = new HashMap();
        LinkedList<K> remaining = new LinkedList<K>();
        for (K key : keys) {
            StatefulValue<K, V> val = this.writeCache.get(key);
            if (val != null) {
                val.readLock().lock();
                try {
                    if (((StatefulValue)val).operation() == StoreOperation.PUT) {
                        loaded.put(key, ((StatefulValue)val).entry().getValue());
                        continue;
                    }
                    assert (((StatefulValue)val).operation() == StoreOperation.RMV) : StatefulValue.access$000(val);
                    continue;
                }
                finally {
                    val.readLock().unlock();
                    continue;
                }
            }
            remaining.add(key);
        }
        if (!remaining.isEmpty() && (loaded0 = this.store.loadAll(remaining)) != null) {
            loaded.putAll(loaded0);
        }
        return loaded;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public V load(K key) {
        StatefulValue<K, V> val;
        if (this.log.isDebugEnabled()) {
            this.log.debug("Store load [key=" + key + ']');
        }
        if ((val = this.writeCache.get(key)) != null) {
            val.readLock().lock();
            try {
                switch (((StatefulValue)val).operation()) {
                    case PUT: {
                        Object v = ((StatefulValue)val).entry().getValue();
                        return v;
                    }
                    case RMV: {
                        V v = null;
                        return v;
                    }
                }
                assert (false) : "Unexpected operation: " + (Object)((Object)StatefulValue.access$200(val));
            }
            finally {
                val.readLock().unlock();
            }
        }
        return this.store.load(key);
    }

    @Override
    public void writeAll(Collection<Cache.Entry<? extends K, ? extends V>> entries) {
        for (Cache.Entry<K, V> entry : entries) {
            this.write(entry);
        }
    }

    @Override
    public void write(Cache.Entry<? extends K, ? extends V> entry) {
        try {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Store put [key=" + entry.getKey() + ", val=" + entry.getValue() + ']');
            }
            this.updateCache(entry.getKey(), entry, StoreOperation.PUT);
        }
        catch (IgniteInterruptedCheckedException e) {
            throw new CacheWriterException(U.convertExceptionNoWrap(e));
        }
    }

    @Override
    public void deleteAll(Collection<?> keys) {
        for (Object key : keys) {
            this.delete(key);
        }
    }

    @Override
    public void delete(Object key) {
        try {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Store remove [key=" + key + ']');
            }
            this.updateCache(key, null, StoreOperation.RMV);
        }
        catch (IgniteInterruptedCheckedException e) {
            throw new CacheWriterException(U.convertExceptionNoWrap(e));
        }
    }

    @Override
    public void sessionEnd(boolean commit) {
    }

    public String toString() {
        return S.toString(GridCacheWriteBehindStore.class, this);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateCache(K key, @Nullable Cache.Entry<? extends K, ? extends V> val, StoreOperation operation) throws IgniteInterruptedCheckedException {
        StatefulValue prev;
        StatefulValue newVal = new StatefulValue(val, operation);
        while ((prev = this.writeCache.putIfAbsent(key, newVal)) != null) {
            prev.writeLock().lock();
            try {
                if (prev.status() == ValueStatus.PENDING) {
                    prev.waitForFlush();
                    continue;
                }
                if (prev.status() == ValueStatus.FLUSHED) continue;
                if (prev.status() == ValueStatus.RETRY) {
                    this.retryEntriesCnt.decrementAndGet();
                }
                assert (prev.status() == ValueStatus.NEW || prev.status() == ValueStatus.RETRY);
                prev.update(val, operation, ValueStatus.NEW);
                break;
            }
            finally {
                prev.writeLock().unlock();
            }
        }
        if (this.writeCache.sizex() > this.cacheCriticalSize) {
            this.flushSingleValue();
        } else if (this.cacheMaxSize > 0 && this.writeCache.sizex() > this.cacheMaxSize) {
            this.wakeUp();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void flushSingleValue() {
        this.cacheOverflowCntr.incrementAndGet();
        try {
            Map<K, StatefulValue<K, V>> batch = null;
            for (Map.Entry<K, StatefulValue<K, V>> e : this.writeCache.entrySet()) {
                StatefulValue<K, V> val = e.getValue();
                val.writeLock().lock();
                try {
                    ValueStatus status = ((StatefulValue)val).status();
                    if (this.acquired(status)) continue;
                    if (((StatefulValue)val).status() == ValueStatus.RETRY) {
                        this.retryEntriesCnt.decrementAndGet();
                    }
                    assert (this.retryEntriesCnt.get() >= 0);
                    ((StatefulValue)val).status(ValueStatus.PENDING);
                    batch = Collections.singletonMap(e.getKey(), val);
                }
                finally {
                    val.writeLock().unlock();
                    continue;
                }
                if (batch.isEmpty()) continue;
                this.applyBatch(batch, false);
                this.cacheTotalOverflowCntr.incrementAndGet();
                return;
            }
        }
        finally {
            this.cacheOverflowCntr.decrementAndGet();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void applyBatch(Map<K, StatefulValue<K, V>> valMap, boolean initSes) {
        assert (valMap.size() <= this.batchSize);
        StoreOperation operation = null;
        LinkedHashMap<K, Cache.Entry> batch = U.newLinkedHashMap(valMap.size());
        for (Map.Entry<K, StatefulValue<K, V>> entry : valMap.entrySet()) {
            if (operation == null) {
                operation = ((StatefulValue)entry.getValue()).operation();
            }
            assert (operation == ((StatefulValue)entry.getValue()).operation());
            assert (((StatefulValue)entry.getValue()).status() == ValueStatus.PENDING);
            batch.put(entry.getKey(), ((StatefulValue)entry.getValue()).entry());
        }
        if (this.updateStore(operation, batch, initSes)) {
            for (Map.Entry entry : valMap.entrySet()) {
                StatefulValue val = (StatefulValue)entry.getValue();
                val.writeLock().lock();
                try {
                    val.status(ValueStatus.FLUSHED);
                    StatefulValue<K, V> prev = this.writeCache.remove(entry.getKey());
                    assert (prev == val) : "Map value for key " + entry.getKey() + " was updated during flush";
                    val.signalFlushed();
                }
                finally {
                    val.writeLock().unlock();
                }
            }
        } else {
            for (StatefulValue statefulValue : valMap.values()) {
                statefulValue.writeLock().lock();
                try {
                    statefulValue.status(ValueStatus.RETRY);
                    this.retryEntriesCnt.incrementAndGet();
                    statefulValue.signalFlushed();
                }
                finally {
                    statefulValue.writeLock().unlock();
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean updateStore(StoreOperation operation, Map<K, Cache.Entry<? extends K, ? extends V>> vals, boolean initSes) {
        boolean bl;
        block14: {
            if (initSes && this.storeMgr != null) {
                this.storeMgr.writeBehindSessionInit();
            }
            boolean threwEx = true;
            try {
                switch (operation) {
                    case PUT: {
                        this.store.writeAll(vals.values());
                        break;
                    }
                    case RMV: {
                        this.store.deleteAll(vals.keySet());
                        break;
                    }
                    default: {
                        assert (false) : "Unexpected operation: " + (Object)((Object)operation);
                        break;
                    }
                }
                threwEx = false;
                bl = true;
                if (!initSes || this.storeMgr == null) break block14;
            }
            catch (Throwable throwable) {
                try {
                    if (initSes && this.storeMgr != null) {
                        this.storeMgr.writeBehindSessionEnd(threwEx);
                    }
                    throw throwable;
                }
                catch (Exception e) {
                    LT.error(this.log, e, "Unable to update underlying store: " + this.store);
                    if (this.writeCache.sizex() > this.cacheCriticalSize || this.stopping.get()) {
                        for (Map.Entry<K, Cache.Entry<K, V>> entry : vals.entrySet()) {
                            Object val = entry.getValue() != null ? entry.getValue().getValue() : null;
                            this.log.warning("Failed to update store (value will be lost as current buffer size is greater than 'cacheCriticalSize' or node has been stopped before store was repaired) [key=" + entry.getKey() + ", val=" + val + ", op=" + (Object)((Object)operation) + "]");
                        }
                        return true;
                    }
                    return false;
                }
            }
            this.storeMgr.writeBehindSessionEnd(threwEx);
        }
        return bl;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void wakeUp() {
        this.flushLock.lock();
        try {
            this.canFlush.signalAll();
        }
        finally {
            this.flushLock.unlock();
        }
    }

    Map<K, StatefulValue<K, V>> writeCache() {
        return this.writeCache;
    }

    private boolean acquired(ValueStatus status) {
        return status == ValueStatus.PENDING || status == ValueStatus.FLUSHED;
    }

    private static class StatefulValue<K, V>
    extends ReentrantReadWriteLock {
        private static final long serialVersionUID = 0L;
        @GridToStringInclude(sensitive=true)
        private Cache.Entry<? extends K, ? extends V> val;
        private StoreOperation storeOperation;
        private ValueStatus valStatus;
        private Condition flushCond = this.writeLock().newCondition();

        private StatefulValue(Cache.Entry<? extends K, ? extends V> val, StoreOperation storeOperation) {
            assert (storeOperation == StoreOperation.PUT || storeOperation == StoreOperation.RMV);
            this.val = val;
            this.storeOperation = storeOperation;
            this.valStatus = ValueStatus.NEW;
        }

        private Cache.Entry<? extends K, ? extends V> entry() {
            return this.val;
        }

        private StoreOperation operation() {
            return this.storeOperation;
        }

        private ValueStatus status() {
            return this.valStatus;
        }

        private void status(ValueStatus valStatus) {
            this.valStatus = valStatus;
        }

        private void update(@Nullable Cache.Entry<? extends K, ? extends V> val, StoreOperation storeOperation, ValueStatus valStatus) {
            this.val = val;
            this.storeOperation = storeOperation;
            this.valStatus = valStatus;
        }

        private void waitForFlush() throws IgniteInterruptedCheckedException {
            U.await(this.flushCond);
        }

        private void signalFlushed() {
            this.flushCond.signalAll();
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (!(o instanceof StatefulValue)) {
                return false;
            }
            StatefulValue other = (StatefulValue)o;
            return F.eq(this.val, other.val) && F.eq((Object)this.valStatus, (Object)other.valStatus);
        }

        public int hashCode() {
            int res = this.val != null ? this.val.hashCode() : 0;
            res = 31 * res + this.valStatus.hashCode();
            return res;
        }

        @Override
        public String toString() {
            return S.toString(StatefulValue.class, this);
        }
    }

    private static enum ValueStatus {
        NEW,
        PENDING,
        RETRY,
        FLUSHED;

    }

    private static enum StoreOperation {
        PUT,
        RMV;

    }

    private class Flusher
    extends GridWorker {
        protected Flusher(String gridName, String name, IgniteLogger log) {
            super(gridName, name, log);
        }

        @Override
        protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
            while (!GridCacheWriteBehindStore.this.stopping.get() || GridCacheWriteBehindStore.this.writeCache.sizex() > 0) {
                this.awaitOperationsAvailable();
                this.flushCache(GridCacheWriteBehindStore.this.writeCache.entrySet().iterator());
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void awaitOperationsAvailable() throws InterruptedException {
            GridCacheWriteBehindStore.this.flushLock.lock();
            try {
                do {
                    if (GridCacheWriteBehindStore.this.writeCache.sizex() > GridCacheWriteBehindStore.this.cacheMaxSize && GridCacheWriteBehindStore.this.cacheMaxSize != 0) continue;
                    if (GridCacheWriteBehindStore.this.cacheFlushFreq > 0L) {
                        GridCacheWriteBehindStore.this.canFlush.await(GridCacheWriteBehindStore.this.cacheFlushFreq, TimeUnit.MILLISECONDS);
                        continue;
                    }
                    GridCacheWriteBehindStore.this.canFlush.await();
                } while (GridCacheWriteBehindStore.this.writeCache.sizex() == 0 && !GridCacheWriteBehindStore.this.stopping.get());
            }
            finally {
                GridCacheWriteBehindStore.this.flushLock.unlock();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void flushCache(Iterator<Map.Entry<K, StatefulValue<K, V>>> it) {
            StoreOperation operation = null;
            LinkedHashMap batch = null;
            LinkedHashMap pending = U.newLinkedHashMap(GridCacheWriteBehindStore.this.batchSize);
            while (it.hasNext()) {
                block11: {
                    Map.Entry e = it.next();
                    StatefulValue val = e.getValue();
                    val.writeLock().lock();
                    try {
                        ValueStatus status = val.status();
                        if (GridCacheWriteBehindStore.this.acquired(status)) continue;
                        if (status == ValueStatus.RETRY) {
                            GridCacheWriteBehindStore.this.retryEntriesCnt.decrementAndGet();
                        }
                        assert (GridCacheWriteBehindStore.this.retryEntriesCnt.get() >= 0);
                        val.status(ValueStatus.PENDING);
                        if (operation == null) {
                            operation = val.operation();
                        }
                        if (operation != val.operation()) {
                            batch = pending;
                            pending = U.newLinkedHashMap(GridCacheWriteBehindStore.this.batchSize);
                            operation = val.operation();
                            pending.put(e.getKey(), val);
                        } else {
                            pending.put(e.getKey(), val);
                        }
                        if (pending.size() != GridCacheWriteBehindStore.this.batchSize) break block11;
                        batch = pending;
                        pending = U.newLinkedHashMap(GridCacheWriteBehindStore.this.batchSize);
                        operation = null;
                    }
                    finally {
                        val.writeLock().unlock();
                        continue;
                    }
                }
                if (batch == null || batch.isEmpty()) continue;
                GridCacheWriteBehindStore.this.applyBatch(batch, true);
                batch = null;
            }
            if (!pending.isEmpty()) {
                GridCacheWriteBehindStore.this.applyBatch(pending, true);
            }
        }
    }
}

