package org.apache.camel.processor.idempotent.kafka;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.api.management.ManagedOperation;
import org.apache.camel.api.management.ManagedResource;
import org.apache.camel.spi.IdempotentRepository;
import org.apache.camel.support.LRUCacheFactory;
import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.StringHelper;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ManagedResource(description = "Kafka IdempotentRepository")
/* loaded from: input_file:org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.class */
public class KafkaIdempotentRepository extends ServiceSupport implements IdempotentRepository, CamelContextAware {
    private static final int DEFAULT_MAXIMUM_CACHE_SIZE = 1000;
    private static final int DEFAULT_POLL_DURATION_MS = 100;
    private final Logger log;
    private String topic;
    private String bootstrapServers;
    private Properties producerConfig;
    private Properties consumerConfig;
    private int maxCacheSize;
    private int pollDurationMs;
    private Map<String, Object> cache;
    private Consumer<String, String> consumer;
    private Producer<String, String> producer;
    private TopicPoller topicPoller;
    private CamelContext camelContext;
    private ExecutorService executorService;
    private CountDownLatch cacheReadyLatch;

    /* loaded from: input_file:org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository$CacheAction.class */
    enum CacheAction {
        add,
        remove,
        clear
    }

    /* loaded from: input_file:org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository$TopicPoller.class */
    private class TopicPoller implements Runnable {
        private final Consumer<String, String> consumer;
        private final CountDownLatch cacheReadyLatch;
        private final int pollDurationMs;
        private final Logger log = LoggerFactory.getLogger(getClass());
        private final CountDownLatch shutdownLatch = new CountDownLatch(1);
        private final AtomicBoolean running = new AtomicBoolean(true);

        TopicPoller(Consumer<String, String> consumer, CountDownLatch countDownLatch, int i) {
            this.consumer = consumer;
            this.cacheReadyLatch = countDownLatch;
            this.pollDurationMs = i;
        }

        @Override // java.lang.Runnable
        public void run() {
            this.log.debug("Subscribing consumer to {}", KafkaIdempotentRepository.this.topic);
            this.consumer.subscribe(Collections.singleton(KafkaIdempotentRepository.this.topic), new ConsumerRebalanceListener() { // from class: org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository.TopicPoller.1
                @Override // org.apache.kafka.clients.consumer.ConsumerRebalanceListener
                public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                }

                @Override // org.apache.kafka.clients.consumer.ConsumerRebalanceListener
                public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                    TopicPoller.this.log.debug("Seeking to beginning");
                    TopicPoller.this.consumer.seekToBeginning(collection);
                }
            });
            this.log.debug("Forcing rebalance to get partitions assigned");
            if (!this.consumer.poll(Duration.ofMillis(0L)).isEmpty()) {
                throw new IllegalStateException("First call to Kafka consumer.poll(0) should never return any record");
            }
            while (this.running.get()) {
                this.log.trace("Polling");
                ConsumerRecords<String, String> poll = this.consumer.poll(Duration.ofMillis(this.pollDurationMs));
                if (poll.isEmpty()) {
                    this.log.trace("0 messages fetched on poll");
                    if (this.cacheReadyLatch.getCount() > 0) {
                        this.log.debug("Cache warmed up");
                        this.cacheReadyLatch.countDown();
                    }
                }
                Iterator<ConsumerRecord<String, String>> it = poll.iterator();
                while (true) {
                    if (it.hasNext()) {
                        ConsumerRecord<String, String> next = it.next();
                        try {
                            CacheAction valueOf = CacheAction.valueOf(next.value());
                            String key = next.key();
                            if (valueOf != CacheAction.add) {
                                if (valueOf != CacheAction.remove) {
                                    if (valueOf != CacheAction.clear) {
                                        this.log.warn("No idea how to {} a record. Shutting down.", valueOf);
                                        setRunning(false);
                                        break;
                                    }
                                    KafkaIdempotentRepository.this.cache.clear();
                                } else {
                                    this.log.debug("Removing from cache messageId:{}", key);
                                    KafkaIdempotentRepository.this.cache.remove(key);
                                }
                            } else {
                                this.log.debug("Adding to cache messageId:{}", key);
                                KafkaIdempotentRepository.this.cache.put(key, key);
                            }
                        } catch (IllegalArgumentException e) {
                            this.log.error("Unexpected action value:\"{}\" received on [topic:{}, partition:{}, offset:{}]. Shutting down.", new Object[]{next.key(), next.topic(), Integer.valueOf(next.partition()), Long.valueOf(next.offset())});
                            setRunning(false);
                        }
                    }
                }
            }
            this.log.debug("TopicPoller finished - triggering shutdown latch");
            this.shutdownLatch.countDown();
        }

        CountDownLatch getShutdownLatch() {
            return this.shutdownLatch;
        }

        void setRunning(boolean z) {
            this.running.set(z);
        }

        boolean isRunning() {
            return this.running.get();
        }

        public String toString() {
            return "TopicPoller[" + KafkaIdempotentRepository.this.topic + "]";
        }
    }

    public KafkaIdempotentRepository() {
        this.log = LoggerFactory.getLogger(getClass());
        this.maxCacheSize = DEFAULT_MAXIMUM_CACHE_SIZE;
        this.pollDurationMs = 100;
    }

    public KafkaIdempotentRepository(String str, String str2) {
        this(str, str2, DEFAULT_MAXIMUM_CACHE_SIZE, 100);
    }

    public KafkaIdempotentRepository(String str, String str2, int i, int i2) {
        this.log = LoggerFactory.getLogger(getClass());
        this.maxCacheSize = DEFAULT_MAXIMUM_CACHE_SIZE;
        this.pollDurationMs = 100;
        this.topic = str;
        this.bootstrapServers = str2;
        this.maxCacheSize = i;
        this.pollDurationMs = i2;
    }

    public KafkaIdempotentRepository(String str, Properties properties, Properties properties2) {
        this(str, properties, properties2, DEFAULT_MAXIMUM_CACHE_SIZE, 100);
    }

    public KafkaIdempotentRepository(String str, Properties properties, Properties properties2, int i, int i2) {
        this.log = LoggerFactory.getLogger(getClass());
        this.maxCacheSize = DEFAULT_MAXIMUM_CACHE_SIZE;
        this.pollDurationMs = 100;
        this.topic = str;
        this.consumerConfig = properties;
        this.producerConfig = properties2;
        this.maxCacheSize = i;
        this.pollDurationMs = i2;
    }

    public String getTopic() {
        return this.topic;
    }

    public void setTopic(String str) {
        this.topic = str;
    }

    public String getBootstrapServers() {
        return this.bootstrapServers;
    }

    public void setBootstrapServers(String str) {
        this.bootstrapServers = str;
    }

    public Properties getProducerConfig() {
        return this.producerConfig;
    }

    public void setProducerConfig(Properties properties) {
        this.producerConfig = properties;
    }

    public Properties getConsumerConfig() {
        return this.consumerConfig;
    }

    public void setConsumerConfig(Properties properties) {
        this.consumerConfig = properties;
    }

    public int getMaxCacheSize() {
        return this.maxCacheSize;
    }

    public void setMaxCacheSize(int i) {
        this.maxCacheSize = i;
    }

    public int getPollDurationMs() {
        return this.pollDurationMs;
    }

    public void setPollDurationMs(int i) {
        this.pollDurationMs = i;
    }

    @Override // org.apache.camel.CamelContextAware
    public void setCamelContext(CamelContext camelContext) {
        this.camelContext = camelContext;
    }

    @Override // org.apache.camel.CamelContextAware
    public CamelContext getCamelContext() {
        return this.camelContext;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.support.service.BaseService
    public void doStart() throws Exception {
        ObjectHelper.notNull(this.camelContext, "camelContext");
        StringHelper.notEmpty(this.topic, "topic");
        this.cache = LRUCacheFactory.newLRUCache(this.maxCacheSize);
        if (this.consumerConfig == null) {
            this.consumerConfig = new Properties();
            StringHelper.notEmpty(this.bootstrapServers, "bootstrapServers");
            this.consumerConfig.put("bootstrap.servers", this.bootstrapServers);
        }
        if (this.producerConfig == null) {
            this.producerConfig = new Properties();
            StringHelper.notEmpty(this.bootstrapServers, "bootstrapServers");
            this.producerConfig.put("bootstrap.servers", this.bootstrapServers);
        }
        ObjectHelper.notNull(this.consumerConfig, "consumerConfig");
        ObjectHelper.notNull(this.producerConfig, "producerConfig");
        String uuid = UUID.randomUUID().toString();
        this.log.debug("Creating consumer with {}[{}]", "group.id", uuid);
        this.consumerConfig.put("group.id", uuid);
        this.consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.TRUE.toString());
        this.consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        this.consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        this.consumer = new KafkaConsumer(this.consumerConfig);
        this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        this.producerConfig.putIfAbsent(ProducerConfig.ACKS_CONFIG, "1");
        this.producerConfig.putIfAbsent(ProducerConfig.BATCH_SIZE_CONFIG, "0");
        this.producer = new KafkaProducer(this.producerConfig);
        this.cacheReadyLatch = new CountDownLatch(1);
        this.topicPoller = new TopicPoller(this.consumer, this.cacheReadyLatch, this.pollDurationMs);
        this.executorService = this.camelContext.getExecutorServiceManager().newSingleThreadExecutor(this, "KafkaIdempotentRepository");
        this.executorService.submit(this.topicPoller);
        this.log.info("Warming up cache from topic {}", this.topic);
        try {
            if (this.cacheReadyLatch.await(30L, TimeUnit.SECONDS)) {
                this.log.info("Cache OK");
            } else {
                this.log.warn("Timeout waiting for cache warm-up from topic {}. Proceeding anyway. Duplicate records may not be detected.", this.topic);
            }
        } catch (InterruptedException e) {
            this.log.warn("Interrupted while warming up cache. This exception is ignored.", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.support.service.BaseService
    public void doStop() {
        this.topicPoller.setRunning(false);
        try {
            if (this.topicPoller.getShutdownLatch().await(30L, TimeUnit.SECONDS)) {
                this.log.info("Cache from topic {} shutdown successfully", this.topic);
            } else {
                this.log.warn("Timeout waiting for cache to shutdown from topic {}. Proceeding anyway.", this.topic);
            }
        } catch (InterruptedException e) {
            this.log.warn("Interrupted waiting on shutting down cache due {}. This exception is ignored.", e.getMessage());
        }
        this.camelContext.getExecutorServiceManager().shutdown(this.executorService);
        IOHelper.close(this.consumer, "consumer", this.log);
        IOHelper.close(this.producer, "producer", this.log);
    }

    @Override // org.apache.camel.spi.IdempotentRepository
    public boolean add(String str) {
        if (this.cache.containsKey(str)) {
            return false;
        }
        this.cache.put(str, str);
        broadcastAction(str, CacheAction.add);
        return true;
    }

    private void broadcastAction(String str, CacheAction cacheAction) {
        try {
            this.log.debug("Broadcasting action:{} for key:{}", cacheAction, str);
            this.producer.send(new ProducerRecord<>(this.topic, str, cacheAction.toString())).get();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeCamelException(e);
        }
    }

    @Override // org.apache.camel.spi.IdempotentRepository
    @ManagedOperation(description = "Does the store contain the given key")
    public boolean contains(String str) {
        this.log.debug("Checking cache for key:{}", str);
        return this.cache.containsKey(str);
    }

    @Override // org.apache.camel.spi.IdempotentRepository
    @ManagedOperation(description = "Remove the key from the store")
    public boolean remove(String str) {
        this.cache.remove(str, str);
        broadcastAction(str, CacheAction.remove);
        return true;
    }

    @Override // org.apache.camel.spi.IdempotentRepository
    public boolean confirm(String str) {
        return true;
    }

    @Override // org.apache.camel.spi.IdempotentRepository
    public void clear() {
        broadcastAction(null, CacheAction.clear);
    }

    @ManagedOperation(description = "Number of times duplicate messages have been detected")
    public boolean isPollerRunning() {
        return this.topicPoller.isRunning();
    }
}
