package org.apache.camel.processor.aggregate.hazelcast;

import com.hazelcast.config.Config;
import com.hazelcast.config.XmlConfigBuilder;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.cp.lock.FencedLock;
import com.hazelcast.map.IMap;
import com.hazelcast.transaction.TransactionContext;
import com.hazelcast.transaction.TransactionOptions;
import com.hazelcast.transaction.TransactionalMap;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.component.hazelcast.HazelcastConstants;
import org.apache.camel.spi.Configurer;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.OptimisticLockingAggregationRepository;
import org.apache.camel.spi.RecoverableAggregationRepository;
import org.apache.camel.support.DefaultExchange;
import org.apache.camel.support.DefaultExchangeHolder;
import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.StringHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Configurer(metadataOnly = true)
@Metadata(label = "bean", description = "Aggregation repository that uses Hazelcast Cache to store exchanges.", annotations = {"interfaceName=org.apache.camel.AggregationStrategy"})
/* loaded from: input_file:org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepository.class */
public class HazelcastAggregationRepository extends ServiceSupport implements RecoverableAggregationRepository, OptimisticLockingAggregationRepository {
    protected static final String COMPLETED_SUFFIX = "-completed";
    private static final Logger LOG = LoggerFactory.getLogger(HazelcastAggregationRepository.class.getName());
    protected boolean useLocalHzInstance;
    protected IMap<String, DefaultExchangeHolder> cache;
    protected IMap<String, DefaultExchangeHolder> persistedCache;

    @Metadata(description = "Name of cache to use", required = true)
    protected String mapName;

    @Metadata(description = "To use an existing Hazelcast instance instead of local")
    protected HazelcastInstance hazelcastInstance;

    @Metadata(label = "advanced", description = "Name of cache to use for completed exchanges")
    protected String persistenceMapName;

    @Metadata(description = "Whether to use optimistic locking")
    protected boolean optimistic;

    @Metadata(description = "Whether or not recovery is enabled", defaultValue = "true")
    protected boolean useRecovery;

    @Metadata(description = "Sets the interval between recovery scans", defaultValue = "5000")
    protected long recoveryInterval;

    @Metadata(description = "Sets an optional dead letter channel which exhausted recovered Exchange should be send to.")
    protected String deadLetterUri;

    @Metadata(description = "Sets an optional limit of the number of redelivery attempt of recovered Exchange should be attempted, before its exhausted. When this limit is hit, then the Exchange is moved to the dead letter channel.", defaultValue = "3")
    protected int maximumRedeliveries;

    @Metadata(label = "advanced", description = "Whether headers on the Exchange that are Java objects and Serializable should be included and saved to the repository")
    protected boolean allowSerializedHeaders;

    public HazelcastAggregationRepository() {
        this.useRecovery = true;
        this.recoveryInterval = 5000L;
        this.maximumRedeliveries = 3;
    }

    public HazelcastAggregationRepository(String str) {
        this.useRecovery = true;
        this.recoveryInterval = 5000L;
        this.maximumRedeliveries = 3;
        this.mapName = str;
        this.persistenceMapName = String.format("%s%s", this.mapName, COMPLETED_SUFFIX);
    }

    public HazelcastAggregationRepository(String str, String str2) {
        this.useRecovery = true;
        this.recoveryInterval = 5000L;
        this.maximumRedeliveries = 3;
        this.mapName = str;
        this.persistenceMapName = str2;
    }

    public HazelcastAggregationRepository(String str, boolean z) {
        this(str);
        this.optimistic = z;
    }

    public HazelcastAggregationRepository(String str, String str2, boolean z) {
        this(str, str2);
        this.optimistic = z;
    }

    public HazelcastAggregationRepository(String str, HazelcastInstance hazelcastInstance) {
        this(str, false);
        this.hazelcastInstance = hazelcastInstance;
    }

    public HazelcastAggregationRepository(String str, String str2, HazelcastInstance hazelcastInstance) {
        this(str, str2, false);
        this.hazelcastInstance = hazelcastInstance;
    }

    public HazelcastAggregationRepository(String str, boolean z, HazelcastInstance hazelcastInstance) {
        this(str, z);
        this.hazelcastInstance = hazelcastInstance;
    }

    public HazelcastAggregationRepository(String str, String str2, boolean z, HazelcastInstance hazelcastInstance) {
        this(str, str2, z);
        this.hazelcastInstance = hazelcastInstance;
    }

    @Override // org.apache.camel.spi.OptimisticLockingAggregationRepository
    public Exchange add(CamelContext camelContext, String str, Exchange exchange, Exchange exchange2) throws OptimisticLockingAggregationRepository.OptimisticLockingException {
        if (!this.optimistic) {
            throw new UnsupportedOperationException();
        }
        LOG.trace("Adding an Exchange with ID {} for key {} in an optimistic manner.", exchange2.getExchangeId(), str);
        if (exchange == null) {
            DefaultExchangeHolder putIfAbsent = this.cache.putIfAbsent(str, DefaultExchangeHolder.marshal(exchange2, true, this.allowSerializedHeaders));
            if (putIfAbsent != null) {
                Exchange unmarshallExchange = unmarshallExchange(camelContext, putIfAbsent);
                LOG.error("Optimistic locking failed for exchange with key {}: IMap#putIfAbsend returned Exchange with ID {}, while it's expected no exchanges to be returned", str, unmarshallExchange != null ? unmarshallExchange.getExchangeId() : "<null>");
                throw new OptimisticLockingAggregationRepository.OptimisticLockingException();
            }
        } else {
            if (!this.cache.replace(str, DefaultExchangeHolder.marshal(exchange, true, this.allowSerializedHeaders), DefaultExchangeHolder.marshal(exchange2, true, this.allowSerializedHeaders))) {
                LOG.error("Optimistic locking failed for exchange with key {}: IMap#replace returned no Exchanges, while it's expected to replace one", str);
                throw new OptimisticLockingAggregationRepository.OptimisticLockingException();
            }
        }
        LOG.trace("Added an Exchange with ID {} for key {} in optimistic manner.", exchange2.getExchangeId(), str);
        return exchange;
    }

    @Override // org.apache.camel.spi.AggregationRepository
    public Exchange add(CamelContext camelContext, String str, Exchange exchange) {
        if (this.optimistic) {
            throw new UnsupportedOperationException();
        }
        LOG.trace("Adding an Exchange with ID {} for key {} in a thread-safe manner.", exchange.getExchangeId(), str);
        FencedLock lock = this.hazelcastInstance.getCPSubsystem().getLock(this.mapName);
        try {
            lock.lock();
            Exchange unmarshallExchange = unmarshallExchange(camelContext, this.cache.put(str, DefaultExchangeHolder.marshal(exchange, true, this.allowSerializedHeaders)));
            LOG.trace("Added an Exchange with ID {} for key {} in a thread-safe manner.", exchange.getExchangeId(), str);
            lock.unlock();
            return unmarshallExchange;
        } catch (Throwable th) {
            LOG.trace("Added an Exchange with ID {} for key {} in a thread-safe manner.", exchange.getExchangeId(), str);
            lock.unlock();
            throw th;
        }
    }

    @Override // org.apache.camel.spi.RecoverableAggregationRepository
    public Set<String> scan(CamelContext camelContext) {
        if (!this.useRecovery) {
            LOG.warn("What for to run recovery scans in {} context while repository {} is running in non-recoverable aggregation repository mode?!", camelContext.getName(), this.mapName);
            return Collections.emptySet();
        }
        LOG.trace("Scanning for exchanges to recover in {} context", camelContext.getName());
        Set<String> unmodifiableSet = Collections.unmodifiableSet(this.persistedCache.keySet());
        LOG.trace("Found {} keys for exchanges to recover in {} context", Integer.valueOf(unmodifiableSet.size()), camelContext.getName());
        return unmodifiableSet;
    }

    @Override // org.apache.camel.spi.RecoverableAggregationRepository
    public Exchange recover(CamelContext camelContext, String str) {
        LOG.trace("Recovering an Exchange with ID {}.", str);
        if (this.useRecovery) {
            return unmarshallExchange(camelContext, this.persistedCache.get(str));
        }
        return null;
    }

    @Override // org.apache.camel.spi.RecoverableAggregationRepository
    public void setRecoveryInterval(long j, TimeUnit timeUnit) {
        this.recoveryInterval = timeUnit.toMillis(j);
    }

    @Override // org.apache.camel.spi.RecoverableAggregationRepository
    public void setRecoveryInterval(long j) {
        this.recoveryInterval = j;
    }

    @Override // org.apache.camel.spi.RecoverableAggregationRepository
    public long getRecoveryInterval() {
        return this.recoveryInterval;
    }

    @Override // org.apache.camel.spi.RecoverableAggregationRepository
    public void setUseRecovery(boolean z) {
        this.useRecovery = z;
    }

    @Override // org.apache.camel.spi.RecoverableAggregationRepository
    public boolean isUseRecovery() {
        return this.useRecovery;
    }

    @Override // org.apache.camel.spi.RecoverableAggregationRepository
    public void setDeadLetterUri(String str) {
        this.deadLetterUri = str;
    }

    @Override // org.apache.camel.spi.RecoverableAggregationRepository
    public String getDeadLetterUri() {
        return this.deadLetterUri;
    }

    @Override // org.apache.camel.spi.RecoverableAggregationRepository
    public void setMaximumRedeliveries(int i) {
        this.maximumRedeliveries = i;
    }

    @Override // org.apache.camel.spi.RecoverableAggregationRepository
    public int getMaximumRedeliveries() {
        return this.maximumRedeliveries;
    }

    @Override // org.apache.camel.spi.AggregationRepository
    public Exchange get(CamelContext camelContext, String str) {
        return unmarshallExchange(camelContext, this.cache.get(str));
    }

    public boolean containsKey(Object obj) {
        if (this.cache != null) {
            return this.cache.containsKey(obj);
        }
        return false;
    }

    public boolean isAllowSerializedHeaders() {
        return this.allowSerializedHeaders;
    }

    public void setAllowSerializedHeaders(boolean z) {
        this.allowSerializedHeaders = z;
    }

    public HazelcastInstance getHazelcastInstance() {
        return this.hazelcastInstance;
    }

    public void setHazelcastInstance(HazelcastInstance hazelcastInstance) {
        this.hazelcastInstance = hazelcastInstance;
    }

    @Override // org.apache.camel.spi.AggregationRepository, org.apache.camel.spi.OptimisticLockingAggregationRepository
    public void remove(CamelContext camelContext, String str, Exchange exchange) {
        DefaultExchangeHolder marshal = DefaultExchangeHolder.marshal(exchange, true, this.allowSerializedHeaders);
        if (this.optimistic) {
            LOG.trace("Removing an exchange with ID {} for key {} in an optimistic manner.", exchange.getExchangeId(), str);
            if (!this.cache.remove(str, marshal)) {
                LOG.error("Optimistic locking failed for exchange with key {}: IMap#remove removed no Exchanges, while it's expected to remove one.", str);
                throw new OptimisticLockingAggregationRepository.OptimisticLockingException();
            }
            LOG.trace("Removed an exchange with ID {} for key {} in an optimistic manner.", exchange.getExchangeId(), str);
            if (this.useRecovery) {
                LOG.trace("Putting an exchange with ID {} for key {} into a recoverable storage in an optimistic manner.", exchange.getExchangeId(), str);
                this.persistedCache.put(exchange.getExchangeId(), marshal);
                LOG.trace("Put an exchange with ID {} for key {} into a recoverable storage in an optimistic manner.", exchange.getExchangeId(), str);
                return;
            }
            return;
        }
        if (!this.useRecovery) {
            this.cache.remove(str);
            return;
        }
        LOG.trace("Removing an exchange with ID {} for key {} in a thread-safe manner.", exchange.getExchangeId(), str);
        TransactionOptions transactionOptions = new TransactionOptions();
        transactionOptions.setTransactionType(TransactionOptions.TransactionType.ONE_PHASE);
        TransactionContext newTransactionContext = this.hazelcastInstance.newTransactionContext(transactionOptions);
        try {
            newTransactionContext.beginTransaction();
            TransactionalMap map = newTransactionContext.getMap(this.cache.getName());
            TransactionalMap map2 = newTransactionContext.getMap(this.persistedCache.getName());
            DefaultExchangeHolder defaultExchangeHolder = (DefaultExchangeHolder) map.remove(str);
            LOG.trace("Putting an exchange with ID {} for key {} into a recoverable storage in a thread-safe manner.", exchange.getExchangeId(), str);
            map2.put(exchange.getExchangeId(), defaultExchangeHolder);
            newTransactionContext.commitTransaction();
            LOG.trace("Removed an exchange with ID {} for key {} in a thread-safe manner.", exchange.getExchangeId(), str);
            LOG.trace("Put an exchange with ID {} for key {} into a recoverable storage in a thread-safe manner.", exchange.getExchangeId(), str);
        } catch (Exception e) {
            newTransactionContext.rollbackTransaction();
            String format = String.format("Transaction with ID %s was rolled back for remove operation with a key %s and an Exchange ID %s.", newTransactionContext.getTxnId(), str, exchange.getExchangeId());
            LOG.warn(format, e);
            throw new RuntimeCamelException(format, e);
        }
    }

    @Override // org.apache.camel.spi.AggregationRepository
    public void confirm(CamelContext camelContext, String str) {
        LOG.trace("Confirming an exchange with ID {}.", str);
        if (this.useRecovery) {
            this.persistedCache.remove(str);
        }
    }

    @Override // org.apache.camel.spi.AggregationRepository
    public Set<String> getKeys() {
        return Collections.unmodifiableSet(this.cache.keySet());
    }

    public String getPersistentRepositoryName() {
        return this.persistenceMapName;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.support.service.BaseService
    public void doStart() throws Exception {
        StringHelper.notEmpty(this.mapName, "repositoryName");
        if (this.maximumRedeliveries < 0) {
            throw new IllegalArgumentException("Maximum redelivery retries must be zero or a positive integer.");
        }
        if (this.recoveryInterval < 0) {
            throw new IllegalArgumentException("Recovery interval must be zero or a positive integer.");
        }
        if (this.persistenceMapName == null) {
            this.persistenceMapName = String.format("%s%s", this.mapName, COMPLETED_SUFFIX);
        }
        if (this.hazelcastInstance == null) {
            this.useLocalHzInstance = true;
            Config build = new XmlConfigBuilder().build();
            build.setProperty("hazelcast.version.check.enabled", "false");
            this.hazelcastInstance = Hazelcast.newHazelcastInstance(build);
        } else {
            ObjectHelper.notNull(this.hazelcastInstance, HazelcastConstants.HAZELCAST_INSTANCE_PARAM);
        }
        this.cache = this.hazelcastInstance.getMap(this.mapName);
        if (this.useRecovery) {
            this.persistedCache = this.hazelcastInstance.getMap(this.persistenceMapName);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.support.service.BaseService
    public void doStop() throws Exception {
        if (this.useLocalHzInstance) {
            this.hazelcastInstance.getLifecycleService().shutdown();
            this.hazelcastInstance = null;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Exchange unmarshallExchange(CamelContext camelContext, DefaultExchangeHolder defaultExchangeHolder) {
        DefaultExchange defaultExchange = null;
        if (defaultExchangeHolder != null) {
            defaultExchange = new DefaultExchange(camelContext);
            DefaultExchangeHolder.unmarshal(defaultExchange, defaultExchangeHolder);
        }
        return defaultExchange;
    }
}
