/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.redis.processor.aggregate;

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.spi.OptimisticLockingAggregationRepository;
import org.apache.camel.spi.RecoverableAggregationRepository;
import org.apache.camel.support.DefaultExchange;
import org.apache.camel.support.DefaultExchangeHolder;
import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.util.StringHelper;
import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.redisson.api.RMap;
import org.redisson.api.RTransaction;
import org.redisson.api.RedissonClient;
import org.redisson.api.TransactionOptions;
import org.redisson.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RedisAggregationRepository
extends ServiceSupport
implements RecoverableAggregationRepository,
OptimisticLockingAggregationRepository {
    private static final Logger LOG = LoggerFactory.getLogger(RedisAggregationRepository.class);
    private static final String COMPLETED_SUFFIX = "-completed";
    private boolean optimistic;
    private boolean useRecovery = true;
    private Map<String, DefaultExchangeHolder> cache;
    private Map<String, DefaultExchangeHolder> persistedCache;
    private String endpoint;
    private String mapName;
    private String persistenceMapName;
    private RedissonClient redisson;
    private boolean shutdownRedisson;
    private String deadLetterChannel;
    private long recoveryInterval = 5000L;
    private int maximumRedeliveries = 3;
    private boolean allowSerializedHeaders;

    public RedisAggregationRepository() {
    }

    public RedisAggregationRepository(String mapName, String endpoint) {
        this.mapName = mapName;
        this.persistenceMapName = String.format("%s%s", mapName, COMPLETED_SUFFIX);
        this.optimistic = false;
        this.endpoint = endpoint;
    }

    public RedisAggregationRepository(String mapName, String persistenceMapName, String endpoint) {
        this.mapName = mapName;
        this.persistenceMapName = persistenceMapName;
        this.optimistic = false;
        this.endpoint = endpoint;
    }

    public RedisAggregationRepository(String mapName, String endpoint, boolean optimistic) {
        this(mapName, endpoint);
        this.optimistic = optimistic;
    }

    public RedisAggregationRepository(String mapName, String persistenceMapName, String endpoint, boolean optimistic) {
        this(mapName, persistenceMapName, endpoint);
        this.optimistic = optimistic;
    }

    @Override
    public Exchange add(CamelContext camelContext, String key, Exchange oldExchange, Exchange newExchange) throws OptimisticLockingAggregationRepository.OptimisticLockingException {
        if (!this.optimistic) {
            throw new UnsupportedOperationException();
        }
        LOG.trace("Adding an Exchange with ID {} for key {} in an optimistic manner.", (Object)newExchange.getExchangeId(), (Object)key);
        if (oldExchange == null) {
            DefaultExchangeHolder holder = DefaultExchangeHolder.marshal(newExchange, true, this.allowSerializedHeaders);
            DefaultExchangeHolder misbehaviorHolder = this.cache.putIfAbsent(key, holder);
            if (misbehaviorHolder != null) {
                Exchange misbehaviorEx = this.unmarshallExchange(camelContext, misbehaviorHolder);
                LOG.warn("Optimistic locking failed for exchange with key {}: IMap#putIfAbsend returned Exchange with ID {}, while it's expected no exchanges to be returned", (Object)key, (Object)(misbehaviorEx != null ? misbehaviorEx.getExchangeId() : "<null>"));
                throw new OptimisticLockingAggregationRepository.OptimisticLockingException();
            }
        } else {
            DefaultExchangeHolder newHolder;
            DefaultExchangeHolder oldHolder = DefaultExchangeHolder.marshal(oldExchange, true, this.allowSerializedHeaders);
            if (!this.cache.replace(key, oldHolder, newHolder = DefaultExchangeHolder.marshal(newExchange, true, this.allowSerializedHeaders))) {
                LOG.warn("Optimistic locking failed for exchange with key {}: IMap#replace returned no Exchanges, while it's expected to replace one", (Object)key);
                throw new OptimisticLockingAggregationRepository.OptimisticLockingException();
            }
        }
        LOG.trace("Added an Exchange with ID {} for key {} in optimistic manner.", (Object)newExchange.getExchangeId(), (Object)key);
        return oldExchange;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Exchange add(CamelContext camelContext, String key, Exchange exchange) {
        if (this.optimistic) {
            throw new UnsupportedOperationException();
        }
        LOG.trace("Adding an Exchange with ID {} for key {} in a thread-safe manner.", (Object)exchange.getExchangeId(), (Object)key);
        RLock lock = this.redisson.getLock("aggregationLock");
        try {
            lock.lock();
            DefaultExchangeHolder newHolder = DefaultExchangeHolder.marshal(exchange, true, this.allowSerializedHeaders);
            DefaultExchangeHolder oldHolder = this.cache.put(key, newHolder);
            Exchange exchange2 = this.unmarshallExchange(camelContext, oldHolder);
            return exchange2;
        }
        finally {
            LOG.trace("Added an Exchange with ID {} for key {} in a thread-safe manner.", (Object)exchange.getExchangeId(), (Object)key);
            lock.unlock();
        }
    }

    @Override
    public Set<String> scan(CamelContext camelContext) {
        if (this.useRecovery) {
            LOG.trace("Scanning for exchanges to recover in {} context", (Object)camelContext.getName());
            Set<String> scanned = Collections.unmodifiableSet(this.persistedCache.keySet());
            LOG.trace("Found {} keys for exchanges to recover in {} context", (Object)scanned.size(), (Object)camelContext.getName());
            return scanned;
        }
        LOG.warn("What for to run recovery scans in {} context while repository {} is running in non-recoverable aggregation repository mode?!", (Object)camelContext.getName(), (Object)this.mapName);
        return Collections.emptySet();
    }

    @Override
    public Exchange recover(CamelContext camelContext, String exchangeId) {
        LOG.trace("Recovering an Exchange with ID {}.", (Object)exchangeId);
        return this.useRecovery ? this.unmarshallExchange(camelContext, this.persistedCache.get(exchangeId)) : null;
    }

    public boolean isOptimistic() {
        return this.optimistic;
    }

    public void setOptimistic(boolean optimistic) {
        this.optimistic = optimistic;
    }

    public String getEndpoint() {
        return this.endpoint;
    }

    public void setEndpoint(String endpoint) {
        this.endpoint = endpoint;
    }

    public String getMapName() {
        return this.mapName;
    }

    public void setMapName(String mapName) {
        this.mapName = mapName;
    }

    public String getPersistenceMapName() {
        return this.persistenceMapName;
    }

    public void setPersistenceMapName(String persistenceMapName) {
        this.persistenceMapName = persistenceMapName;
    }

    public RedissonClient getRedisson() {
        return this.redisson;
    }

    public void setRedisson(RedissonClient redisson) {
        this.redisson = redisson;
    }

    @Override
    public void setRecoveryInterval(long interval, TimeUnit timeUnit) {
        this.recoveryInterval = timeUnit.toMillis(interval);
    }

    @Override
    public void setRecoveryInterval(long interval) {
        this.recoveryInterval = interval;
    }

    @Override
    public long getRecoveryIntervalInMillis() {
        return this.recoveryInterval;
    }

    @Override
    public void setUseRecovery(boolean useRecovery) {
        this.useRecovery = useRecovery;
    }

    @Override
    public boolean isUseRecovery() {
        return this.useRecovery;
    }

    @Override
    public void setDeadLetterUri(String deadLetterUri) {
        this.deadLetterChannel = deadLetterUri;
    }

    @Override
    public String getDeadLetterUri() {
        return this.deadLetterChannel;
    }

    @Override
    public void setMaximumRedeliveries(int maximumRedeliveries) {
        this.maximumRedeliveries = maximumRedeliveries;
    }

    @Override
    public int getMaximumRedeliveries() {
        return this.maximumRedeliveries;
    }

    @Override
    public Exchange get(CamelContext camelContext, String key) {
        return this.unmarshallExchange(camelContext, this.cache.get(key));
    }

    public boolean containsKey(Object key) {
        if (this.cache != null) {
            return this.cache.containsKey(key);
        }
        return false;
    }

    public boolean isAllowSerializedHeaders() {
        return this.allowSerializedHeaders;
    }

    public void setAllowSerializedHeaders(boolean allowSerializedHeaders) {
        this.allowSerializedHeaders = allowSerializedHeaders;
    }

    @Override
    public void remove(CamelContext camelContext, String key, Exchange exchange) {
        DefaultExchangeHolder holder = DefaultExchangeHolder.marshal(exchange, true, this.allowSerializedHeaders);
        if (this.optimistic) {
            LOG.trace("Removing an exchange with ID {} for key {} in an optimistic manner.", (Object)exchange.getExchangeId(), (Object)key);
            if (!this.cache.remove(key, holder)) {
                LOG.warn("Optimistic locking failed for exchange with key {}: IMap#remove removed no Exchanges, while it's expected to remove one.", (Object)key);
                throw new OptimisticLockingAggregationRepository.OptimisticLockingException();
            }
            LOG.trace("Removed an exchange with ID {} for key {} in an optimistic manner.", (Object)exchange.getExchangeId(), (Object)key);
            if (this.useRecovery) {
                LOG.trace("Putting an exchange with ID {} for key {} into a recoverable storage in an optimistic manner.", (Object)exchange.getExchangeId(), (Object)key);
                this.persistedCache.put(exchange.getExchangeId(), holder);
                LOG.trace("Put an exchange with ID {} for key {} into a recoverable storage in an optimistic manner.", (Object)exchange.getExchangeId(), (Object)key);
            }
        } else if (this.useRecovery) {
            LOG.trace("Removing an exchange with ID {} for key {} in a thread-safe manner.", (Object)exchange.getExchangeId(), (Object)key);
            TransactionOptions tOpts = TransactionOptions.defaults();
            RTransaction transaction = this.redisson.createTransaction(tOpts);
            try {
                RMap tCache = transaction.getMap(this.mapName);
                RMap<String, DefaultExchangeHolder> tPersistentCache = transaction.getMap(this.persistenceMapName);
                DefaultExchangeHolder removedHolder = (DefaultExchangeHolder)tCache.remove(key);
                LOG.trace("Putting an exchange with ID {} for key {} into a recoverable storage in a thread-safe manner.", (Object)exchange.getExchangeId(), (Object)key);
                tPersistentCache.put(exchange.getExchangeId(), removedHolder);
                transaction.commit();
                LOG.trace("Removed an exchange with ID {} for key {} in a thread-safe manner.", (Object)exchange.getExchangeId(), (Object)key);
                LOG.trace("Put an exchange with ID {} for key {} into a recoverable storage in a thread-safe manner.", (Object)exchange.getExchangeId(), (Object)key);
            }
            catch (Exception throwable) {
                transaction.rollback();
                String msg = String.format("Transaction was rolled back for remove operation with a key %s and an Exchange ID %s.", key, exchange.getExchangeId());
                LOG.warn(msg, (Throwable)throwable);
                throw new RuntimeException(msg, throwable);
            }
        } else {
            this.cache.remove(key);
        }
    }

    @Override
    public void confirm(CamelContext camelContext, String exchangeId) {
        LOG.trace("Confirming an exchange with ID {}.", (Object)exchangeId);
        if (this.useRecovery) {
            this.persistedCache.remove(exchangeId);
        }
    }

    @Override
    public Set<String> getKeys() {
        return Collections.unmodifiableSet(this.cache.keySet());
    }

    public String getPersistentRepositoryName() {
        return this.persistenceMapName;
    }

    @Override
    protected void doInit() throws Exception {
        StringHelper.notEmpty(this.mapName, "repositoryName");
        if (this.maximumRedeliveries < 0) {
            throw new IllegalArgumentException("Maximum redelivery retries must be zero or a positive integer.");
        }
        if (this.recoveryInterval < 0L) {
            throw new IllegalArgumentException("Recovery interval must be zero or a positive integer.");
        }
    }

    @Override
    protected void doStart() throws Exception {
        if (this.redisson == null) {
            Config config = new Config();
            config.useSingleServer().setAddress(String.format("redis://%s", this.endpoint));
            this.redisson = Redisson.create(config);
            this.shutdownRedisson = true;
        }
        this.cache = this.redisson.getMap(this.mapName);
        if (this.useRecovery) {
            this.persistedCache = this.redisson.getMap(this.persistenceMapName);
        }
    }

    @Override
    protected void doStop() throws Exception {
        if (this.redisson != null && this.shutdownRedisson) {
            this.redisson.shutdown();
            this.redisson = null;
        }
    }

    protected Exchange unmarshallExchange(CamelContext camelContext, DefaultExchangeHolder holder) {
        DefaultExchange exchange = null;
        if (holder != null) {
            exchange = new DefaultExchange(camelContext);
            DefaultExchangeHolder.unmarshal(exchange, holder);
        }
        return exchange;
    }
}

