/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.persistence.support;

import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.infinispan.Cache;
import org.infinispan.configuration.cache.SingletonStoreConfiguration;
import org.infinispan.container.DataContainer;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.marshall.core.MarshalledEntry;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachemanagerlistener.annotation.CacheStarted;
import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
import org.infinispan.notifications.cachemanagerlistener.event.Event;
import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
import org.infinispan.persistence.PersistenceUtil;
import org.infinispan.persistence.spi.CacheWriter;
import org.infinispan.persistence.support.DelegatingCacheWriter;
import org.infinispan.remoting.transport.Address;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

public class SingletonCacheWriter
extends DelegatingCacheWriter {
    private static final Log log = LogFactory.getLog(SingletonCacheWriter.class);
    private static final boolean trace = log.isTraceEnabled();
    private SingletonStoreConfiguration singletonConfiguration;
    private static final String THREAD_NAME = "SingletonStorePusherThread";
    protected final ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadFactory(){

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, SingletonCacheWriter.THREAD_NAME);
        }
    });
    Future<?> pushStateFuture;
    private Address localAddress;
    protected volatile boolean active;

    public SingletonCacheWriter(CacheWriter actual, SingletonStoreConfiguration singletonConfiguration) {
        super(actual);
        this.singletonConfiguration = singletonConfiguration;
    }

    @Override
    public void start() {
        this.ctx.getCache().getCacheManager().addListener(new SingletonStoreListener());
    }

    @Override
    public void stop() {
        this.executor.shutdownNow();
    }

    @Override
    public void write(MarshalledEntry entry) {
        if (this.active) {
            if (trace) {
                log.tracef("Storing key %s.  Instance: %s", entry.getKey(), (Object)this);
            }
            super.write(entry);
        } else if (trace) {
            log.tracef("Not storing key %s.  Instance: %s", entry.getKey(), (Object)this);
        }
    }

    @Override
    public boolean delete(Object key) {
        return this.active && super.delete(key);
    }

    protected Callable<?> createPushStateTask() {
        return new Callable<Object>(){

            @Override
            public Object call() throws Exception {
                boolean debugEnabled = log.isDebugEnabled();
                if (debugEnabled) {
                    log.debug("start pushing in-memory state to cache cacheLoader");
                }
                SingletonCacheWriter.this.pushState(SingletonCacheWriter.this.ctx.getCache());
                if (debugEnabled) {
                    log.debug("in-memory state passed to cache cacheLoader successfully");
                }
                return null;
            }
        };
    }

    protected void pushState(Cache cache) throws Exception {
        DataContainer dc = cache.getAdvancedCache().getDataContainer();
        Set<Object> keys = dc.keySet();
        for (Object k : keys) {
            InternalCacheEntry entry = dc.get(k);
            if (entry == null) continue;
            MarshalledEntry me = this.ctx.getMarshalledEntryFactory().newMarshalledEntry(entry.getKey(), entry.getValue(), PersistenceUtil.internalMetadata(entry));
            this.write(me);
        }
    }

    protected void awaitForPushToFinish(Future<?> future, long timeout, TimeUnit unit) {
        block7: {
            boolean debugEnabled = log.isDebugEnabled();
            try {
                if (debugEnabled) {
                    log.debug("wait for state push to cache loader to finish");
                }
                future.get(timeout, unit);
            }
            catch (TimeoutException e) {
                if (debugEnabled) {
                    log.debug("timed out waiting for state push to cache loader to finish");
                }
            }
            catch (ExecutionException e) {
                if (debugEnabled) {
                    log.debug("exception reported waiting for state push to cache loader to finish");
                }
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                if (!trace) break block7;
                log.trace("wait for state push to cache loader to finish was interrupted");
            }
        }
    }

    protected void activeStatusChanged(boolean newActiveState) throws PushStateException {
        this.active = newActiveState;
        log.debugf("changed mode %s", (Object)this);
        if (this.active && this.singletonConfiguration.pushStateWhenCoordinator()) {
            this.doPushState();
        }
    }

    private boolean isCoordinator(List<Address> newView, Address currentAddress) {
        if (!currentAddress.equals(this.localAddress)) {
            this.localAddress = currentAddress;
        }
        return !newView.isEmpty() && this.localAddress.equals(newView.get(0));
    }

    private void doPushState() throws PushStateException {
        if (this.pushStateFuture == null || this.pushStateFuture.isDone()) {
            Callable<?> task = this.createPushStateTask();
            this.pushStateFuture = this.executor.submit(task);
            try {
                this.waitForTaskToFinish(this.pushStateFuture, this.singletonConfiguration.pushStateTimeout(), TimeUnit.MILLISECONDS);
            }
            catch (Exception e) {
                throw new PushStateException("unable to complete in memory state push to cache loader", e);
            }
        } else {
            this.awaitForPushToFinish(this.pushStateFuture, this.singletonConfiguration.pushStateTimeout(), TimeUnit.MILLISECONDS);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void waitForTaskToFinish(Future<?> future, long timeout, TimeUnit unit) throws Exception {
        try {
            future.get(timeout, unit);
        }
        catch (TimeoutException e) {
            throw new Exception("task timed out", e);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            if (trace) {
                log.trace("task was interrupted");
            }
        }
        finally {
            future.cancel(true);
        }
    }

    public String toString() {
        return "SingletonStore: localAddress=" + this.localAddress + ", active=" + this.active;
    }

    public static class PushStateException
    extends Exception {
        private static final long serialVersionUID = 5542893943730200886L;

        public PushStateException(String message, Throwable cause) {
            super(message, cause);
        }
    }

    @Listener
    public class SingletonStoreListener {
        @CacheStarted
        public void cacheStarted(Event e) {
            EmbeddedCacheManager cm = SingletonCacheWriter.this.ctx.getCache().getCacheManager();
            SingletonCacheWriter.this.localAddress = cm.getAddress();
            SingletonCacheWriter.this.active = cm.isCoordinator();
        }

        @ViewChanged
        public void viewChange(ViewChangedEvent event) {
            boolean tmp = SingletonCacheWriter.this.isCoordinator(event.getNewMembers(), event.getLocalAddress());
            if (SingletonCacheWriter.this.active != tmp) {
                try {
                    SingletonCacheWriter.this.activeStatusChanged(tmp);
                }
                catch (PushStateException e) {
                    log.errorChangingSingletonStoreStatus(e);
                }
            }
        }
    }
}

