package org.apache.camel.component.kafka;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.ConsumerTimeoutException;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
import org.apache.camel.util.ObjectHelper;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/camel/component/kafka/KafkaConsumer.class */
public class KafkaConsumer extends DefaultConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumer.class);
    protected ExecutorService executor;
    private final KafkaEndpoint endpoint;
    private final Processor processor;
    private Map<ConsumerConnector, CyclicBarrier> consumerBarriers;

    /* loaded from: input_file:org/apache/camel/component/kafka/KafkaConsumer$AutoCommitConsumerTask.class */
    class AutoCommitConsumerTask implements Runnable {
        private final ConsumerConnector consumer;
        private KafkaStream<byte[], byte[]> stream;

        public AutoCommitConsumerTask(ConsumerConnector consumerConnector, KafkaStream<byte[], byte[]> kafkaStream) {
            this.consumer = consumerConnector;
            this.stream = kafkaStream;
        }

        @Override // java.lang.Runnable
        public void run() {
            ConsumerIterator<byte[], byte[]> it = this.stream.iterator();
            while (KafkaConsumer.this.isRunAllowed() && !KafkaConsumer.this.isSuspendingOrSuspended() && it.hasNext()) {
                Exchange createKafkaExchange = KafkaConsumer.this.endpoint.createKafkaExchange(it.next());
                try {
                    KafkaConsumer.this.processor.process(createKafkaExchange);
                } catch (Exception e) {
                    KafkaConsumer.this.getExceptionHandler().handleException("Error during processing", createKafkaExchange, e);
                }
            }
            KafkaConsumer.LOG.debug("Commit offsets on consumer: {}", ObjectHelper.getIdentityHashCode(this.consumer));
            this.consumer.commitOffsets();
        }
    }

    /* loaded from: input_file:org/apache/camel/component/kafka/KafkaConsumer$BatchingConsumerTask.class */
    class BatchingConsumerTask implements Runnable {
        private KafkaStream<byte[], byte[]> stream;
        private CyclicBarrier barrier;

        public BatchingConsumerTask(KafkaStream<byte[], byte[]> kafkaStream, CyclicBarrier cyclicBarrier) {
            this.stream = kafkaStream;
            this.barrier = cyclicBarrier;
        }

        @Override // java.lang.Runnable
        public void run() {
            boolean z;
            int i = 0;
            ConsumerIterator<byte[], byte[]> it = this.stream.iterator();
            boolean z2 = true;
            while (z2) {
                try {
                    z = false;
                    if (KafkaConsumer.this.isRunAllowed() && !KafkaConsumer.this.isSuspendingOrSuspended() && it.hasNext()) {
                        try {
                            KafkaConsumer.this.processor.process(KafkaConsumer.this.endpoint.createKafkaExchange(it.next()));
                        } catch (Exception e) {
                            KafkaConsumer.LOG.error(e.getMessage(), e);
                        }
                        i++;
                    } else {
                        z2 = false;
                    }
                } catch (ConsumerTimeoutException e2) {
                    KafkaConsumer.LOG.debug("Consumer timeout occurred due " + e2.getMessage(), e2);
                    z = true;
                }
                if (i >= KafkaConsumer.this.endpoint.getBatchSize() || z || (i > 0 && !z2)) {
                    try {
                        this.barrier.await(KafkaConsumer.this.endpoint.getBarrierAwaitTimeoutMs(), TimeUnit.MILLISECONDS);
                        if (!z) {
                            i = 0;
                        }
                    } catch (Exception e3) {
                        KafkaConsumer.this.getExceptionHandler().handleException("Error waiting for batch to complete", e3);
                        return;
                    }
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/camel/component/kafka/KafkaConsumer$CommitOffsetTask.class */
    class CommitOffsetTask implements Runnable {
        private final ConsumerConnector consumer;

        public CommitOffsetTask(ConsumerConnector consumerConnector) {
            this.consumer = consumerConnector;
        }

        @Override // java.lang.Runnable
        public void run() {
            KafkaConsumer.LOG.debug("Commit offsets on consumer: {}", ObjectHelper.getIdentityHashCode(this.consumer));
            this.consumer.commitOffsets();
        }
    }

    public KafkaConsumer(KafkaEndpoint kafkaEndpoint, Processor processor) {
        super(kafkaEndpoint, processor);
        this.endpoint = kafkaEndpoint;
        this.processor = processor;
        this.consumerBarriers = new HashMap();
        if (kafkaEndpoint.getZookeeperConnect() == null) {
            throw new IllegalArgumentException("zookeeper host or zookeeper connect must be specified");
        }
        if (kafkaEndpoint.getGroupId() == null) {
            throw new IllegalArgumentException("groupId must not be null");
        }
    }

    Properties getProps() {
        Properties createConsumerProperties = this.endpoint.getConfiguration().createConsumerProperties();
        createConsumerProperties.put("zookeeper.connect", this.endpoint.getZookeeperConnect());
        createConsumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, this.endpoint.getGroupId());
        return createConsumerProperties;
    }

    protected void doStart() throws Exception {
        super.doStart();
        this.log.info("Starting Kafka consumer");
        this.executor = this.endpoint.createExecutor();
        for (int i = 0; i < this.endpoint.getConsumersCount(); i++) {
            ConsumerConnector createJavaConsumerConnector = Consumer.createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(getProps()));
            HashMap hashMap = new HashMap();
            hashMap.put(this.endpoint.getTopic(), Integer.valueOf(this.endpoint.getConsumerStreams()));
            List<KafkaStream<byte[], byte[]>> list = createJavaConsumerConnector.createMessageStreams(hashMap).get(this.endpoint.getTopic());
            if (this.endpoint.isAutoCommitEnable() == null || this.endpoint.isAutoCommitEnable().booleanValue()) {
                Iterator<KafkaStream<byte[], byte[]>> it = list.iterator();
                while (it.hasNext()) {
                    this.executor.submit(new AutoCommitConsumerTask(createJavaConsumerConnector, it.next()));
                }
                this.consumerBarriers.put(createJavaConsumerConnector, null);
            } else {
                if ((this.endpoint.getConsumerTimeoutMs() == null || this.endpoint.getConsumerTimeoutMs().intValue() < 0) && this.endpoint.getConsumerStreams() > 1) {
                    LOG.warn("consumerTimeoutMs is set to -1 (infinite) while requested multiple consumer streams.");
                }
                CyclicBarrier cyclicBarrier = new CyclicBarrier(this.endpoint.getConsumerStreams(), new CommitOffsetTask(createJavaConsumerConnector));
                Iterator<KafkaStream<byte[], byte[]>> it2 = list.iterator();
                while (it2.hasNext()) {
                    this.executor.submit(new BatchingConsumerTask(it2.next(), cyclicBarrier));
                }
                this.consumerBarriers.put(createJavaConsumerConnector, cyclicBarrier);
            }
        }
    }

    protected void doStop() throws Exception {
        super.doStop();
        this.log.info("Stopping Kafka consumer");
        for (ConsumerConnector consumerConnector : this.consumerBarriers.keySet()) {
            if (consumerConnector != null) {
                consumerConnector.shutdown();
            }
        }
        this.consumerBarriers.clear();
        if (this.executor != null) {
            if (getEndpoint() == null || getEndpoint().getCamelContext() == null) {
                this.executor.shutdownNow();
            } else {
                getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(this.executor);
            }
        }
        this.executor = null;
    }
}
