package org.apache.camel.component.kafka;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/camel/component/kafka/KafkaConsumer.class */
public class KafkaConsumer extends DefaultConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumer.class);
    protected ExecutorService executor;
    private final KafkaEndpoint endpoint;
    private final Processor processor;
    private ConsumerConnector consumer;

    /* loaded from: input_file:org/apache/camel/component/kafka/KafkaConsumer$ConsumerTask.class */
    class ConsumerTask implements Runnable {
        private KafkaStream stream;

        public ConsumerTask(KafkaStream kafkaStream) {
            this.stream = kafkaStream;
        }

        @Override // java.lang.Runnable
        public void run() {
            ConsumerIterator it = this.stream.iterator();
            while (Boolean.valueOf(it.hasNext()).booleanValue()) {
                try {
                    KafkaConsumer.this.processor.process(KafkaConsumer.this.endpoint.createKafkaExchange(it.next()));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public KafkaConsumer(KafkaEndpoint kafkaEndpoint, Processor processor) {
        super(kafkaEndpoint, processor);
        this.endpoint = kafkaEndpoint;
        this.processor = processor;
        if (kafkaEndpoint.getZookeeperHost() == null) {
            throw new IllegalArgumentException("zookeeper host must be specified");
        }
        if (kafkaEndpoint.getZookeeperPort() == 0) {
            throw new IllegalArgumentException("zookeeper port must be specified");
        }
        if (kafkaEndpoint.getGroupId() == null) {
            throw new IllegalArgumentException("groupId must not be null");
        }
    }

    Properties getProps() {
        Properties properties = new Properties();
        properties.put("zookeeper.connect", this.endpoint.getZookeeperHost() + ":" + this.endpoint.getZookeeperPort());
        properties.put("group.id", this.endpoint.getGroupId());
        properties.put("zookeeper.session.timeout.ms", "400");
        properties.put("zookeeper.sync.time.ms", "200");
        properties.put("auto.commit.interval.ms", "1000");
        return properties;
    }

    protected void doStart() throws Exception {
        super.doStart();
        this.log.info("Starting Kafka consumer");
        this.consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(getProps()));
        HashMap hashMap = new HashMap();
        hashMap.put(this.endpoint.getTopic(), Integer.valueOf(this.endpoint.getConsumerStreams()));
        List<KafkaStream<byte[], byte[]>> list = this.consumer.createMessageStreams(hashMap).get(this.endpoint.getTopic());
        this.executor = this.endpoint.createExecutor();
        Iterator<KafkaStream<byte[], byte[]>> it = list.iterator();
        while (it.hasNext()) {
            this.executor.submit(new ConsumerTask(it.next()));
        }
    }

    protected void doStop() throws Exception {
        super.doStop();
        this.log.info("Stopping Kafka consumer");
        if (this.consumer != null) {
            this.consumer.shutdown();
        }
        if (this.executor != null) {
            if (getEndpoint() == null || getEndpoint().getCamelContext() == null) {
                this.executor.shutdownNow();
            } else {
                getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(this.executor);
            }
        }
        this.executor = null;
    }
}
