/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.processor.aggregate.hazelcast;

import com.hazelcast.config.Config;
import com.hazelcast.config.XmlConfigBuilder;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.cp.lock.FencedLock;
import com.hazelcast.map.IMap;
import com.hazelcast.transaction.TransactionContext;
import com.hazelcast.transaction.TransactionOptions;
import com.hazelcast.transaction.TransactionalMap;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.spi.Configurer;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.OptimisticLockingAggregationRepository;
import org.apache.camel.spi.RecoverableAggregationRepository;
import org.apache.camel.support.DefaultExchange;
import org.apache.camel.support.DefaultExchangeHolder;
import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.StringHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Metadata(label="bean", description="Aggregation repository that uses Hazelcast Cache to store exchanges.", annotations={"interfaceName=org.apache.camel.spi.AggregationRepository"})
@Configurer(metadataOnly=true)
public class HazelcastAggregationRepository
extends ServiceSupport
implements RecoverableAggregationRepository,
OptimisticLockingAggregationRepository {
    protected static final String COMPLETED_SUFFIX = "-completed";
    private static final Logger LOG = LoggerFactory.getLogger((String)HazelcastAggregationRepository.class.getName());
    protected boolean useLocalHzInstance;
    protected IMap<String, DefaultExchangeHolder> cache;
    protected IMap<String, DefaultExchangeHolder> persistedCache;
    @Metadata(description="Name of cache to use", required=true)
    protected String mapName;
    @Metadata(description="To use an existing Hazelcast instance instead of local")
    protected HazelcastInstance hazelcastInstance;
    @Metadata(label="advanced", description="Name of cache to use for completed exchanges")
    protected String persistenceMapName;
    @Metadata(description="Whether to use optimistic locking")
    protected boolean optimistic;
    @Metadata(description="Whether or not recovery is enabled", defaultValue="true")
    protected boolean useRecovery = true;
    @Metadata(description="Sets the interval between recovery scans", defaultValue="5000")
    protected long recoveryInterval = 5000L;
    @Metadata(description="Sets an optional dead letter channel which exhausted recovered Exchange should be send to.")
    protected String deadLetterUri;
    @Metadata(description="Sets an optional limit of the number of redelivery attempt of recovered Exchange should be attempted, before its exhausted. When this limit is hit, then the Exchange is moved to the dead letter channel.", defaultValue="3")
    protected int maximumRedeliveries = 3;
    @Metadata(label="advanced", description="Whether headers on the Exchange that are Java objects and Serializable should be included and saved to the repository")
    protected boolean allowSerializedHeaders;

    public HazelcastAggregationRepository() {
    }

    public HazelcastAggregationRepository(String repositoryName) {
        this.mapName = repositoryName;
        this.persistenceMapName = String.format("%s%s", this.mapName, COMPLETED_SUFFIX);
    }

    public HazelcastAggregationRepository(String repositoryName, String persistentRepositoryName) {
        this.mapName = repositoryName;
        this.persistenceMapName = persistentRepositoryName;
    }

    public HazelcastAggregationRepository(String repositoryName, boolean optimistic) {
        this(repositoryName);
        this.optimistic = optimistic;
    }

    public HazelcastAggregationRepository(String repositoryName, String persistentRepositoryName, boolean optimistic) {
        this(repositoryName, persistentRepositoryName);
        this.optimistic = optimistic;
    }

    public HazelcastAggregationRepository(String repositoryName, HazelcastInstance hzInstanse) {
        this(repositoryName, false);
        this.hazelcastInstance = hzInstanse;
    }

    public HazelcastAggregationRepository(String repositoryName, String persistentRepositoryName, HazelcastInstance hzInstanse) {
        this(repositoryName, persistentRepositoryName, false);
        this.hazelcastInstance = hzInstanse;
    }

    public HazelcastAggregationRepository(String repositoryName, boolean optimistic, HazelcastInstance hzInstance) {
        this(repositoryName, optimistic);
        this.hazelcastInstance = hzInstance;
    }

    public HazelcastAggregationRepository(String repositoryName, String persistentRepositoryName, boolean optimistic, HazelcastInstance hzInstance) {
        this(repositoryName, persistentRepositoryName, optimistic);
        this.hazelcastInstance = hzInstance;
    }

    @Override
    public Exchange add(CamelContext camelContext, String key, Exchange oldExchange, Exchange newExchange) throws OptimisticLockingAggregationRepository.OptimisticLockingException {
        if (!this.optimistic) {
            throw new UnsupportedOperationException();
        }
        LOG.trace("Adding an Exchange with ID {} for key {} in an optimistic manner.", (Object)newExchange.getExchangeId(), (Object)key);
        if (oldExchange == null) {
            DefaultExchangeHolder holder = DefaultExchangeHolder.marshal(newExchange, true, this.allowSerializedHeaders);
            DefaultExchangeHolder misbehaviorHolder = this.cache.putIfAbsent(key, holder);
            if (misbehaviorHolder != null) {
                Exchange misbehaviorEx = this.unmarshallExchange(camelContext, misbehaviorHolder);
                LOG.error("Optimistic locking failed for exchange with key {}: IMap#putIfAbsend returned Exchange with ID {}, while it's expected no exchanges to be returned", (Object)key, (Object)(misbehaviorEx != null ? misbehaviorEx.getExchangeId() : "<null>"));
                throw new OptimisticLockingAggregationRepository.OptimisticLockingException();
            }
        } else {
            DefaultExchangeHolder newHolder;
            DefaultExchangeHolder oldHolder = DefaultExchangeHolder.marshal(oldExchange, true, this.allowSerializedHeaders);
            if (!this.cache.replace(key, oldHolder, newHolder = DefaultExchangeHolder.marshal(newExchange, true, this.allowSerializedHeaders))) {
                LOG.error("Optimistic locking failed for exchange with key {}: IMap#replace returned no Exchanges, while it's expected to replace one", (Object)key);
                throw new OptimisticLockingAggregationRepository.OptimisticLockingException();
            }
        }
        LOG.trace("Added an Exchange with ID {} for key {} in optimistic manner.", (Object)newExchange.getExchangeId(), (Object)key);
        return oldExchange;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Exchange add(CamelContext camelContext, String key, Exchange exchange) {
        if (this.optimistic) {
            throw new UnsupportedOperationException();
        }
        LOG.trace("Adding an Exchange with ID {} for key {} in a thread-safe manner.", (Object)exchange.getExchangeId(), (Object)key);
        FencedLock l = this.hazelcastInstance.getCPSubsystem().getLock(this.mapName);
        try {
            l.lock();
            DefaultExchangeHolder newHolder = DefaultExchangeHolder.marshal(exchange, true, this.allowSerializedHeaders);
            DefaultExchangeHolder oldHolder = this.cache.put(key, newHolder);
            Exchange exchange2 = this.unmarshallExchange(camelContext, oldHolder);
            return exchange2;
        }
        finally {
            LOG.trace("Added an Exchange with ID {} for key {} in a thread-safe manner.", (Object)exchange.getExchangeId(), (Object)key);
            l.unlock();
        }
    }

    @Override
    public Set<String> scan(CamelContext camelContext) {
        if (this.useRecovery) {
            LOG.trace("Scanning for exchanges to recover in {} context", (Object)camelContext.getName());
            Set<String> scanned = Collections.unmodifiableSet(this.persistedCache.keySet());
            LOG.trace("Found {} keys for exchanges to recover in {} context", (Object)scanned.size(), (Object)camelContext.getName());
            return scanned;
        }
        LOG.warn("What for to run recovery scans in {} context while repository {} is running in non-recoverable aggregation repository mode?!", (Object)camelContext.getName(), (Object)this.mapName);
        return Collections.emptySet();
    }

    @Override
    public Exchange recover(CamelContext camelContext, String exchangeId) {
        LOG.trace("Recovering an Exchange with ID {}.", (Object)exchangeId);
        return this.useRecovery ? this.unmarshallExchange(camelContext, this.persistedCache.get(exchangeId)) : null;
    }

    @Override
    public void setRecoveryInterval(long interval, TimeUnit timeUnit) {
        this.recoveryInterval = timeUnit.toMillis(interval);
    }

    @Override
    public void setRecoveryInterval(long interval) {
        this.recoveryInterval = interval;
    }

    @Override
    public long getRecoveryInterval() {
        return this.recoveryInterval;
    }

    @Override
    public void setUseRecovery(boolean useRecovery) {
        this.useRecovery = useRecovery;
    }

    @Override
    public boolean isUseRecovery() {
        return this.useRecovery;
    }

    @Override
    public void setDeadLetterUri(String deadLetterUri) {
        this.deadLetterUri = deadLetterUri;
    }

    @Override
    public String getDeadLetterUri() {
        return this.deadLetterUri;
    }

    @Override
    public void setMaximumRedeliveries(int maximumRedeliveries) {
        this.maximumRedeliveries = maximumRedeliveries;
    }

    @Override
    public int getMaximumRedeliveries() {
        return this.maximumRedeliveries;
    }

    @Override
    public Exchange get(CamelContext camelContext, String key) {
        return this.unmarshallExchange(camelContext, this.cache.get(key));
    }

    public boolean containsKey(Object key) {
        if (this.cache != null) {
            return this.cache.containsKey(key);
        }
        return false;
    }

    public boolean isAllowSerializedHeaders() {
        return this.allowSerializedHeaders;
    }

    public void setAllowSerializedHeaders(boolean allowSerializedHeaders) {
        this.allowSerializedHeaders = allowSerializedHeaders;
    }

    public HazelcastInstance getHazelcastInstance() {
        return this.hazelcastInstance;
    }

    public void setHazelcastInstance(HazelcastInstance hazelcastInstance) {
        this.hazelcastInstance = hazelcastInstance;
    }

    @Override
    public void remove(CamelContext camelContext, String key, Exchange exchange) {
        DefaultExchangeHolder holder = DefaultExchangeHolder.marshal(exchange, true, this.allowSerializedHeaders);
        if (this.optimistic) {
            LOG.trace("Removing an exchange with ID {} for key {} in an optimistic manner.", (Object)exchange.getExchangeId(), (Object)key);
            if (!this.cache.remove(key, holder)) {
                LOG.error("Optimistic locking failed for exchange with key {}: IMap#remove removed no Exchanges, while it's expected to remove one.", (Object)key);
                throw new OptimisticLockingAggregationRepository.OptimisticLockingException();
            }
            LOG.trace("Removed an exchange with ID {} for key {} in an optimistic manner.", (Object)exchange.getExchangeId(), (Object)key);
            if (this.useRecovery) {
                LOG.trace("Putting an exchange with ID {} for key {} into a recoverable storage in an optimistic manner.", (Object)exchange.getExchangeId(), (Object)key);
                this.persistedCache.put(exchange.getExchangeId(), holder);
                LOG.trace("Put an exchange with ID {} for key {} into a recoverable storage in an optimistic manner.", (Object)exchange.getExchangeId(), (Object)key);
            }
        } else if (this.useRecovery) {
            LOG.trace("Removing an exchange with ID {} for key {} in a thread-safe manner.", (Object)exchange.getExchangeId(), (Object)key);
            TransactionOptions tOpts = new TransactionOptions();
            tOpts.setTransactionType(TransactionOptions.TransactionType.ONE_PHASE);
            TransactionContext tCtx = this.hazelcastInstance.newTransactionContext(tOpts);
            try {
                tCtx.beginTransaction();
                TransactionalMap tCache = tCtx.getMap(this.cache.getName());
                TransactionalMap<String, DefaultExchangeHolder> tPersistentCache = tCtx.getMap(this.persistedCache.getName());
                DefaultExchangeHolder removedHolder = (DefaultExchangeHolder)tCache.remove(key);
                LOG.trace("Putting an exchange with ID {} for key {} into a recoverable storage in a thread-safe manner.", (Object)exchange.getExchangeId(), (Object)key);
                tPersistentCache.put(exchange.getExchangeId(), removedHolder);
                tCtx.commitTransaction();
                LOG.trace("Removed an exchange with ID {} for key {} in a thread-safe manner.", (Object)exchange.getExchangeId(), (Object)key);
                LOG.trace("Put an exchange with ID {} for key {} into a recoverable storage in a thread-safe manner.", (Object)exchange.getExchangeId(), (Object)key);
            }
            catch (Exception exception) {
                tCtx.rollbackTransaction();
                String msg = String.format("Transaction with ID %s was rolled back for remove operation with a key %s and an Exchange ID %s.", tCtx.getTxnId(), key, exchange.getExchangeId());
                LOG.warn(msg, (Throwable)exception);
                throw new RuntimeCamelException(msg, exception);
            }
        } else {
            this.cache.remove(key);
        }
    }

    @Override
    public void confirm(CamelContext camelContext, String exchangeId) {
        LOG.trace("Confirming an exchange with ID {}.", (Object)exchangeId);
        if (this.useRecovery) {
            this.persistedCache.remove(exchangeId);
        }
    }

    @Override
    public Set<String> getKeys() {
        return Collections.unmodifiableSet(this.cache.keySet());
    }

    public String getPersistentRepositoryName() {
        return this.persistenceMapName;
    }

    @Override
    protected void doStart() throws Exception {
        StringHelper.notEmpty(this.mapName, "repositoryName");
        if (this.maximumRedeliveries < 0) {
            throw new IllegalArgumentException("Maximum redelivery retries must be zero or a positive integer.");
        }
        if (this.recoveryInterval < 0L) {
            throw new IllegalArgumentException("Recovery interval must be zero or a positive integer.");
        }
        if (this.persistenceMapName == null) {
            this.persistenceMapName = String.format("%s%s", this.mapName, COMPLETED_SUFFIX);
        }
        if (this.hazelcastInstance == null) {
            this.useLocalHzInstance = true;
            Config cfg = new XmlConfigBuilder().build();
            cfg.setProperty("hazelcast.version.check.enabled", "false");
            this.hazelcastInstance = Hazelcast.newHazelcastInstance(cfg);
        } else {
            ObjectHelper.notNull(this.hazelcastInstance, "hazelcastInstance");
        }
        this.cache = this.hazelcastInstance.getMap(this.mapName);
        if (this.useRecovery) {
            this.persistedCache = this.hazelcastInstance.getMap(this.persistenceMapName);
        }
    }

    @Override
    protected void doStop() throws Exception {
        if (this.useLocalHzInstance) {
            this.hazelcastInstance.getLifecycleService().shutdown();
            this.hazelcastInstance = null;
        }
    }

    protected Exchange unmarshallExchange(CamelContext camelContext, DefaultExchangeHolder holder) {
        DefaultExchange exchange = null;
        if (holder != null) {
            exchange = new DefaultExchange(camelContext);
            DefaultExchangeHolder.unmarshal(exchange, holder);
        }
        return exchange;
    }
}

