package org.apache.camel.component.kafka;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import java.util.stream.StreamSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.Service;
import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.spi.StateRepository;
import org.apache.camel.support.BridgeExceptionHandlerToErrorHandler;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.util.IOHelper;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.header.Header;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/camel/component/kafka/KafkaConsumer.class */
public class KafkaConsumer extends DefaultConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumer.class);
    protected ExecutorService executor;
    private final KafkaEndpoint endpoint;
    private final Processor processor;
    private final Long pollTimeoutMs;
    private final List<KafkaFetchRecords> tasks;
    private volatile boolean stopOffsetRepo;
    private final BridgeExceptionHandlerToErrorHandler bridge;
    private PollExceptionStrategy pollExceptionStrategy;

    /* loaded from: input_file:org/apache/camel/component/kafka/KafkaConsumer$KafkaFetchRecords.class */
    class KafkaFetchRecords implements Runnable, ConsumerRebalanceListener {
        private org.apache.kafka.clients.consumer.KafkaConsumer consumer;
        private final String topicName;
        private final Pattern topicPattern;
        private final String threadId;
        private final Properties kafkaProps;
        private final Map<String, Long> lastProcessedOffset = new ConcurrentHashMap();

        KafkaFetchRecords(String str, Pattern pattern, String str2, Properties properties) {
            this.topicName = str;
            this.topicPattern = pattern;
            this.threadId = str + "-Thread " + str2;
            this.kafkaProps = properties;
        }

        /* JADX WARN: Code restructure failed: missing block: B:45:0x002a, code lost:
        
            if (r0.get() != false) goto L11;
         */
        @Override // java.lang.Runnable
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public void run() {
            /*
                Method dump skipped, instructions count: 262
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.camel.component.kafka.KafkaConsumer.KafkaFetchRecords.run():void");
        }

        void preInit() {
            doInit();
        }

        protected void doInit() {
            ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
            try {
                Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader());
                this.consumer = KafkaConsumer.this.endpoint.getKafkaClientFactory().getConsumer(this.kafkaProps);
                Thread.currentThread().setContextClassLoader(contextClassLoader);
            } catch (Throwable th) {
                Thread.currentThread().setContextClassLoader(contextClassLoader);
                throw th;
            }
        }

        protected void doRun(AtomicBoolean atomicBoolean, AtomicBoolean atomicBoolean2) {
            if (atomicBoolean2.get()) {
                doReconnectRun();
                atomicBoolean2.set(false);
            }
            doPollRun(atomicBoolean, atomicBoolean2);
        }

        protected void doReconnectRun() {
            if (this.topicPattern != null) {
                KafkaConsumer.LOG.info("Subscribing {} to topic pattern {}", this.threadId, this.topicName);
                this.consumer.subscribe(this.topicPattern, this);
            } else {
                KafkaConsumer.LOG.info("Subscribing {} to topic {}", this.threadId, this.topicName);
                this.consumer.subscribe(Arrays.asList(this.topicName.split(",")), this);
            }
            StateRepository<String, String> offsetRepository = KafkaConsumer.this.endpoint.getConfiguration().getOffsetRepository();
            if (offsetRepository == null) {
                if (KafkaConsumer.this.endpoint.getConfiguration().getSeekTo() != null) {
                    if (KafkaConsumer.this.endpoint.getConfiguration().getSeekTo().equals("beginning")) {
                        KafkaConsumer.LOG.debug("{} is seeking to the beginning on topic {}", this.threadId, this.topicName);
                        this.consumer.poll(Duration.ofMillis(100L));
                        this.consumer.seekToBeginning(this.consumer.assignment());
                        return;
                    } else {
                        if (KafkaConsumer.this.endpoint.getConfiguration().getSeekTo().equals("end")) {
                            KafkaConsumer.LOG.debug("{} is seeking to the end on topic {}", this.threadId, this.topicName);
                            this.consumer.poll(Duration.ofMillis(100L));
                            this.consumer.seekToEnd(this.consumer.assignment());
                            return;
                        }
                        return;
                    }
                }
                return;
            }
            ConsumerRecords poll = this.consumer.poll(100L);
            for (TopicPartition topicPartition : this.consumer.assignment()) {
                String state = offsetRepository.getState(KafkaConsumer.this.serializeOffsetKey(topicPartition));
                if (state == null || state.isEmpty()) {
                    List records = poll.records(topicPartition);
                    if (!records.isEmpty()) {
                        long offset = ((ConsumerRecord) records.get(0)).offset();
                        KafkaConsumer.LOG.debug("Resuming partition {} from offset {}", Integer.valueOf(topicPartition.partition()), Long.valueOf(offset));
                        this.consumer.seek(topicPartition, offset);
                    }
                } else {
                    long deserializeOffsetValue = KafkaConsumer.this.deserializeOffsetValue(state) + 1;
                    KafkaConsumer.LOG.debug("Resuming partition {} from offset {} from state", Integer.valueOf(topicPartition.partition()), Long.valueOf(deserializeOffsetValue));
                    this.consumer.seek(topicPartition, deserializeOffsetValue);
                }
            }
        }

        protected void doPollRun(AtomicBoolean atomicBoolean, AtomicBoolean atomicBoolean2) {
            StateRepository<String, String> offsetRepository = KafkaConsumer.this.endpoint.getConfiguration().getOffsetRepository();
            boolean z = false;
            long j = -1;
            while (KafkaConsumer.this.isRunAllowed() && !KafkaConsumer.this.isStoppingOrStopped() && !KafkaConsumer.this.isSuspendingOrSuspended() && atomicBoolean.get() && !atomicBoolean2.get()) {
                try {
                    try {
                        boolean z2 = false;
                        KafkaConsumer.LOG.trace("Polling {} from topic: {} with timeout: {}", new Object[]{this.threadId, this.topicName, KafkaConsumer.this.pollTimeoutMs});
                        ConsumerRecords poll = this.consumer.poll(KafkaConsumer.this.pollTimeoutMs.longValue());
                        Iterator<TopicPartition> it = poll.partitions().iterator();
                        while (it.hasNext()) {
                            TopicPartition next = it.next();
                            j = -1;
                            Iterator it2 = poll.records(next).iterator();
                            KafkaConsumer.LOG.debug("Records count {} received for partition {}", Integer.valueOf(poll.records(next).size()), next);
                            if (!z2 && it2.hasNext()) {
                                while (!z2 && it2.hasNext()) {
                                    ConsumerRecord consumerRecord = (ConsumerRecord) it2.next();
                                    if (KafkaConsumer.LOG.isTraceEnabled()) {
                                        KafkaConsumer.LOG.trace("Partition = {}, offset = {}, key = {}, value = {}", new Object[]{Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), consumerRecord.key(), consumerRecord.value()});
                                    }
                                    Exchange createKafkaExchange = KafkaConsumer.this.createKafkaExchange(consumerRecord);
                                    KafkaConsumer.this.propagateHeaders(consumerRecord, createKafkaExchange, KafkaConsumer.this.endpoint.getConfiguration());
                                    if (!KafkaConsumer.this.isAutoCommitEnabled()) {
                                        createKafkaExchange.getIn().setHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, Boolean.valueOf(!it2.hasNext()));
                                    }
                                    if (KafkaConsumer.this.endpoint.getConfiguration().isAllowManualCommit()) {
                                        createKafkaExchange.getIn().setHeader(KafkaConstants.MANUAL_COMMIT, KafkaConsumer.this.endpoint.getComponent().getKafkaManualCommitFactory().newInstance(createKafkaExchange, this.consumer, this.topicName, this.threadId, offsetRepository, next, consumerRecord.offset()));
                                    }
                                    if (!KafkaConsumer.this.isAutoCommitEnabled() || KafkaConsumer.this.endpoint.getConfiguration().isAllowManualCommit()) {
                                        createKafkaExchange.getIn().setHeader(KafkaConstants.LAST_POLL_RECORD, Boolean.valueOf((it2.hasNext() || it.hasNext()) ? false : true));
                                    }
                                    try {
                                        KafkaConsumer.this.processor.process(createKafkaExchange);
                                    } catch (Exception e) {
                                        createKafkaExchange.setException(e);
                                    }
                                    if (createKafkaExchange.getException() == null) {
                                        j = consumerRecord.offset();
                                        this.lastProcessedOffset.put(KafkaConsumer.this.serializeOffsetKey(next), Long.valueOf(j));
                                    } else if (KafkaConsumer.this.endpoint.getConfiguration().isBreakOnFirstError()) {
                                        KafkaConsumer.LOG.warn("Error during processing {} from topic: {}. Will seek consumer to offset: {} and re-connect and start polling again.", new Object[]{createKafkaExchange, this.topicName, Long.valueOf(j), createKafkaExchange.getException()});
                                        commitOffset(offsetRepository, next, j, false, true);
                                        z2 = true;
                                    } else {
                                        KafkaConsumer.this.getExceptionHandler().handleException("Error during processing", createKafkaExchange, createKafkaExchange.getException());
                                    }
                                    KafkaConsumer.this.releaseExchange(createKafkaExchange, false);
                                }
                                if (!z2) {
                                    commitOffset(offsetRepository, next, j, false, false);
                                }
                            }
                        }
                        if (z2) {
                            atomicBoolean2.set(true);
                        }
                    } catch (Throwable th) {
                        if (!atomicBoolean.get() && !atomicBoolean2.get()) {
                            KafkaConsumer.LOG.debug("Closing consumer {}", this.threadId);
                            IOHelper.close(this.consumer);
                        }
                        throw th;
                    }
                } catch (InterruptException e2) {
                    KafkaConsumer.this.getExceptionHandler().handleException("Interrupted while consuming " + this.threadId + " from kafka topic", e2);
                    KafkaConsumer.LOG.info("Unsubscribing {} from topic {}", this.threadId, this.topicName);
                    this.consumer.unsubscribe();
                    Thread.currentThread().interrupt();
                    if (atomicBoolean.get() || atomicBoolean2.get()) {
                        return;
                    }
                    KafkaConsumer.LOG.debug("Closing consumer {}", this.threadId);
                    IOHelper.close(this.consumer);
                    return;
                } catch (Exception e3) {
                    if (KafkaConsumer.LOG.isDebugEnabled()) {
                        KafkaConsumer.LOG.debug("Exception caught while polling " + this.threadId + " from kafka topic " + this.topicName + " at offset " + this.lastProcessedOffset + ". Deciding what to do.", e3);
                    }
                    if (z) {
                        KafkaConsumer.this.getExceptionHandler().handleException("Error unsubscribing " + this.threadId + " from kafka topic " + this.topicName, e3);
                    } else {
                        PollOnError handleException = KafkaConsumer.this.pollExceptionStrategy.handleException(e3);
                        if (PollOnError.RETRY == handleException) {
                            KafkaConsumer.LOG.warn("{} consuming {} from topic {} causedby {}. Will attempt again polling the same message (stacktrace in DEBUG logging level)", new Object[]{e3.getClass().getName(), this.threadId, this.topicName, e3.getMessage()});
                            if (KafkaConsumer.LOG.isDebugEnabled()) {
                                KafkaConsumer.LOG.debug("KafkaException consuming {} from topic {} causedby {}. Will attempt again polling the same message", new Object[]{this.threadId, this.topicName, e3.getMessage(), e3});
                            }
                            atomicBoolean.set(true);
                        } else if (PollOnError.RECONNECT == handleException) {
                            KafkaConsumer.LOG.warn("{} consuming {} from topic {} causedby {}. Will attempt to re-connect on next run (stacktrace in DEBUG logging level)", new Object[]{e3.getClass().getName(), this.threadId, this.topicName, e3.getMessage()});
                            if (KafkaConsumer.LOG.isDebugEnabled()) {
                                KafkaConsumer.LOG.debug("{} consuming {} from topic {} causedby {}. Will attempt to re-connect on next run", new Object[]{e3.getClass().getName(), this.threadId, this.topicName, e3.getMessage(), e3});
                            }
                            atomicBoolean2.set(true);
                        } else if (PollOnError.ERROR_HANDLER == handleException) {
                            KafkaConsumer.this.bridge.handleException(e3);
                            seekToNextOffset(j);
                        } else if (PollOnError.DISCARD == handleException) {
                            KafkaConsumer.LOG.warn("{} consuming {} from topic {} causedby {}. Will discard the message and continue to poll the next message (stracktrace in DEBUG logging level).", new Object[]{e3.getClass().getName(), this.threadId, this.topicName, e3.getMessage()});
                            if (KafkaConsumer.LOG.isDebugEnabled()) {
                                KafkaConsumer.LOG.debug("{} consuming {} from topic {} causedby {}. Will discard the message and continue to poll the next message.", new Object[]{e3.getClass().getName(), this.threadId, this.topicName, e3.getMessage(), e3});
                            }
                            seekToNextOffset(j);
                        } else if (PollOnError.STOP == handleException) {
                            KafkaConsumer.LOG.warn("{} consuming {} from topic {} causedby {}. Will stop consumer (stacktrace in DEBUG logging level).", new Object[]{e3.getClass().getName(), this.threadId, this.topicName, e3.getMessage()});
                            if (KafkaConsumer.LOG.isDebugEnabled()) {
                                KafkaConsumer.LOG.debug("{} consuming {} from topic {} causedby {}. Will stop consumer.", new Object[]{e3.getClass().getName(), this.threadId, this.topicName, e3.getMessage(), e3});
                            }
                            atomicBoolean.set(false);
                            atomicBoolean2.set(false);
                        }
                    }
                    if (atomicBoolean.get() || atomicBoolean2.get()) {
                        return;
                    }
                    KafkaConsumer.LOG.debug("Closing consumer {}", this.threadId);
                    IOHelper.close(this.consumer);
                    return;
                }
            }
            if (!atomicBoolean2.get() && KafkaConsumer.this.isAutoCommitEnabled()) {
                if ("async".equals(KafkaConsumer.this.endpoint.getConfiguration().getAutoCommitOnStop())) {
                    KafkaConsumer.LOG.info("Auto commitAsync on stop {} from topic {}", this.threadId, this.topicName);
                    this.consumer.commitAsync();
                } else if ("sync".equals(KafkaConsumer.this.endpoint.getConfiguration().getAutoCommitOnStop())) {
                    KafkaConsumer.LOG.info("Auto commitSync on stop {} from topic {}", this.threadId, this.topicName);
                    this.consumer.commitSync();
                } else if ("none".equals(KafkaConsumer.this.endpoint.getConfiguration().getAutoCommitOnStop())) {
                    KafkaConsumer.LOG.info("Auto commit on stop {} from topic {} is disabled (none)", this.threadId, this.topicName);
                }
            }
            KafkaConsumer.LOG.info("Unsubscribing {} from topic {}", this.threadId, this.topicName);
            z = true;
            this.consumer.unsubscribe();
            if (atomicBoolean.get() || atomicBoolean2.get()) {
                return;
            }
            KafkaConsumer.LOG.debug("Closing consumer {}", this.threadId);
            IOHelper.close(this.consumer);
        }

        private void seekToNextOffset(long j) {
            boolean z = false;
            Set<TopicPartition> assignment = this.consumer.assignment();
            if (assignment != null && j != -1) {
                long j2 = j + 1;
                KafkaConsumer.LOG.info("Consumer seeking to next offset {} to continue polling next message from topic: {}", Long.valueOf(j2), this.topicName);
                Iterator<TopicPartition> it = assignment.iterator();
                while (it.hasNext()) {
                    this.consumer.seek(it.next(), j2);
                }
                return;
            }
            if (assignment != null) {
                for (TopicPartition topicPartition : assignment) {
                    long position = this.consumer.position(topicPartition) + 1;
                    if (!z) {
                        KafkaConsumer.LOG.info("Consumer seeking to next offset {} to continue polling next message from topic: {}", Long.valueOf(position), this.topicName);
                        z = true;
                    }
                    this.consumer.seek(topicPartition, position);
                }
            }
        }

        private void commitOffset(StateRepository<String, String> stateRepository, TopicPartition topicPartition, long j, boolean z, boolean z2) {
            if (j != -1) {
                if (!KafkaConsumer.this.endpoint.getConfiguration().isAllowManualCommit() && stateRepository != null) {
                    KafkaConsumer.LOG.debug("Saving offset repository state {} [topic: {} partition: {} offset: {}]", new Object[]{this.threadId, this.topicName, Integer.valueOf(topicPartition.partition()), Long.valueOf(j)});
                    stateRepository.setState(KafkaConsumer.this.serializeOffsetKey(topicPartition), KafkaConsumer.this.serializeOffsetValue(j));
                    return;
                }
                if (!z) {
                    if (z2) {
                        KafkaConsumer.LOG.debug("Forcing commitSync {} [topic: {} partition: {} offset: {}]", new Object[]{this.threadId, this.topicName, Integer.valueOf(topicPartition.partition()), Long.valueOf(j)});
                        this.consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(j + 1)));
                        return;
                    }
                    return;
                }
                if ("async".equals(KafkaConsumer.this.endpoint.getConfiguration().getAutoCommitOnStop())) {
                    KafkaConsumer.LOG.debug("Auto commitAsync on stop {} from topic {}", this.threadId, this.topicName);
                    this.consumer.commitAsync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(j + 1)), null);
                } else if ("sync".equals(KafkaConsumer.this.endpoint.getConfiguration().getAutoCommitOnStop())) {
                    KafkaConsumer.LOG.debug("Auto commitSync on stop {} from topic {}", this.threadId, this.topicName);
                    this.consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(j + 1)));
                } else if ("none".equals(KafkaConsumer.this.endpoint.getConfiguration().getAutoCommitOnStop())) {
                    KafkaConsumer.LOG.debug("Auto commit on stop {} from topic {} is disabled (none)", this.threadId, this.topicName);
                }
            }
        }

        void stop() {
            this.consumer.wakeup();
        }

        void shutdown() {
            this.consumer.wakeup();
        }

        @Override // org.apache.kafka.clients.consumer.ConsumerRebalanceListener
        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            KafkaConsumer.LOG.debug("onPartitionsRevoked: {} from topic {}", this.threadId, this.topicName);
            boolean z = KafkaConsumer.this.getEndpoint().getCamelContext().isStopping() && !KafkaConsumer.this.isRunAllowed();
            StateRepository<String, String> offsetRepository = KafkaConsumer.this.endpoint.getConfiguration().getOffsetRepository();
            for (TopicPartition topicPartition : collection) {
                String serializeOffsetKey = KafkaConsumer.this.serializeOffsetKey(topicPartition);
                Long l = this.lastProcessedOffset.get(serializeOffsetKey);
                if (l == null) {
                    l = -1L;
                }
                try {
                    try {
                        if (KafkaConsumer.this.endpoint.getConfiguration().getAutoCommitEnable().booleanValue()) {
                            commitOffset(offsetRepository, topicPartition, l.longValue(), z, false);
                        }
                    } catch (Exception e) {
                        KafkaConsumer.LOG.error("Error saving offset repository state {} from offsetKey {} with offset: {}", new Object[]{this.threadId, serializeOffsetKey, l});
                        throw e;
                    }
                } finally {
                    this.lastProcessedOffset.remove(serializeOffsetKey);
                }
            }
        }

        @Override // org.apache.kafka.clients.consumer.ConsumerRebalanceListener
        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
            KafkaConsumer.LOG.debug("onPartitionsAssigned: {} from topic {}", this.threadId, this.topicName);
            StateRepository<String, String> offsetRepository = KafkaConsumer.this.endpoint.getConfiguration().getOffsetRepository();
            if (offsetRepository != null) {
                for (TopicPartition topicPartition : collection) {
                    String state = offsetRepository.getState(KafkaConsumer.this.serializeOffsetKey(topicPartition));
                    if (state != null && !state.isEmpty()) {
                        long deserializeOffsetValue = KafkaConsumer.this.deserializeOffsetValue(state) + 1;
                        KafkaConsumer.LOG.debug("Resuming partition {} from offset {} from state", Integer.valueOf(topicPartition.partition()), Long.valueOf(deserializeOffsetValue));
                        this.consumer.seek(topicPartition, deserializeOffsetValue);
                    }
                }
            }
        }
    }

    public KafkaConsumer(KafkaEndpoint kafkaEndpoint, Processor processor) {
        super(kafkaEndpoint, processor);
        this.tasks = new ArrayList();
        this.bridge = new BridgeExceptionHandlerToErrorHandler(this);
        this.endpoint = kafkaEndpoint;
        this.processor = processor;
        this.pollTimeoutMs = kafkaEndpoint.getConfiguration().getPollTimeoutMs();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.support.DefaultConsumer, org.apache.camel.support.service.BaseService
    public void doBuild() throws Exception {
        super.doBuild();
        if (this.endpoint.getComponent().getPollExceptionStrategy() != null) {
            this.pollExceptionStrategy = this.endpoint.getComponent().getPollExceptionStrategy();
        } else {
            this.pollExceptionStrategy = new DefaultPollExceptionStrategy(this.endpoint.getConfiguration().getPollOnError());
        }
    }

    @Override // org.apache.camel.support.DefaultConsumer, org.apache.camel.EndpointAware
    public KafkaEndpoint getEndpoint() {
        return (KafkaEndpoint) super.getEndpoint();
    }

    Properties getProps() {
        Properties createConsumerProperties = this.endpoint.getConfiguration().createConsumerProperties();
        this.endpoint.updateClassProperties(createConsumerProperties);
        String brokers = this.endpoint.getKafkaClientFactory().getBrokers(this.endpoint.getConfiguration());
        if (brokers != null) {
            createConsumerProperties.put("bootstrap.servers", brokers);
        }
        if (this.endpoint.getConfiguration().getGroupId() != null) {
            String groupId = this.endpoint.getConfiguration().getGroupId();
            createConsumerProperties.put("group.id", groupId);
            LOG.debug("Kafka consumer groupId is {}", groupId);
        } else {
            String uuid = UUID.randomUUID().toString();
            createConsumerProperties.put("group.id", uuid);
            LOG.debug("Kafka consumer groupId is {} (generated)", uuid);
        }
        if (this.endpoint.getConfiguration().getGroupInstanceId() != null) {
            String groupInstanceId = this.endpoint.getConfiguration().getGroupInstanceId();
            LOG.debug("Kafka consumer groupInstanceId is {}", groupInstanceId);
            createConsumerProperties.put("group.instance.id", groupInstanceId);
        }
        return createConsumerProperties;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.support.DefaultConsumer, org.apache.camel.support.service.BaseService
    public void doStart() throws Exception {
        LOG.info("Starting Kafka consumer on topic: {} with breakOnFirstError: {}", this.endpoint.getConfiguration().getTopic(), Boolean.valueOf(this.endpoint.getConfiguration().isBreakOnFirstError()));
        super.doStart();
        Service offsetRepository = this.endpoint.getConfiguration().getOffsetRepository();
        if ((offsetRepository instanceof ServiceSupport) && !((ServiceSupport) offsetRepository).isStarted()) {
            this.stopOffsetRepo = true;
            LOG.debug("Starting OffsetRepository: {}", offsetRepository);
            ServiceHelper.startService(this.endpoint.getConfiguration().getOffsetRepository());
        }
        this.executor = this.endpoint.createExecutor();
        String topic = this.endpoint.getConfiguration().getTopic();
        Pattern compile = this.endpoint.getConfiguration().isTopicIsPattern() ? Pattern.compile(topic) : null;
        for (int i = 0; i < this.endpoint.getConfiguration().getConsumersCount(); i++) {
            KafkaFetchRecords kafkaFetchRecords = new KafkaFetchRecords(topic, compile, i + "", getProps());
            kafkaFetchRecords.preInit();
            this.executor.submit(kafkaFetchRecords);
            this.tasks.add(kafkaFetchRecords);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.support.DefaultConsumer, org.apache.camel.support.service.BaseService
    public void doStop() throws Exception {
        LOG.info("Stopping Kafka consumer on topic: {}", this.endpoint.getConfiguration().getTopic());
        if (this.executor != null) {
            if (getEndpoint() == null || getEndpoint().getCamelContext() == null) {
                this.executor.shutdownNow();
            } else {
                Iterator<KafkaFetchRecords> it = this.tasks.iterator();
                while (it.hasNext()) {
                    it.next().stop();
                }
                int shutdownTimeout = getEndpoint().getConfiguration().getShutdownTimeout();
                LOG.debug("Shutting down Kafka consumer worker threads with timeout {} millis", Integer.valueOf(shutdownTimeout));
                getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(this.executor, shutdownTimeout);
            }
            if (!this.executor.isTerminated()) {
                this.tasks.forEach((v0) -> {
                    v0.shutdown();
                });
                this.executor.shutdownNow();
            }
        }
        this.tasks.clear();
        this.executor = null;
        if (this.stopOffsetRepo) {
            StateRepository<String, String> offsetRepository = this.endpoint.getConfiguration().getOffsetRepository();
            LOG.debug("Stopping OffsetRepository: {}", offsetRepository);
            ServiceHelper.stopAndShutdownService(offsetRepository);
        }
        super.doStop();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Exchange createKafkaExchange(ConsumerRecord consumerRecord) {
        Exchange createExchange = createExchange(false);
        Message in = createExchange.getIn();
        in.setHeader(KafkaConstants.PARTITION, Integer.valueOf(consumerRecord.partition()));
        in.setHeader(KafkaConstants.TOPIC, consumerRecord.topic());
        in.setHeader(KafkaConstants.OFFSET, Long.valueOf(consumerRecord.offset()));
        in.setHeader(KafkaConstants.HEADERS, consumerRecord.headers());
        in.setHeader(KafkaConstants.TIMESTAMP, Long.valueOf(consumerRecord.timestamp()));
        in.setHeader(Exchange.MESSAGE_TIMESTAMP, Long.valueOf(consumerRecord.timestamp()));
        if (consumerRecord.key() != null) {
            in.setHeader(KafkaConstants.KEY, consumerRecord.key());
        }
        in.setBody(consumerRecord.value());
        return createExchange;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void propagateHeaders(ConsumerRecord<Object, Object> consumerRecord, Exchange exchange, KafkaConfiguration kafkaConfiguration) {
        HeaderFilterStrategy headerFilterStrategy = kafkaConfiguration.getHeaderFilterStrategy();
        KafkaHeaderDeserializer headerDeserializer = kafkaConfiguration.getHeaderDeserializer();
        StreamSupport.stream(consumerRecord.headers().spliterator(), false).filter(header -> {
            return shouldBeFiltered(header, exchange, headerFilterStrategy);
        }).forEach(header2 -> {
            exchange.getIn().setHeader(header2.key(), headerDeserializer.deserialize(header2.key(), header2.value()));
        });
    }

    private boolean shouldBeFiltered(Header header, Exchange exchange, HeaderFilterStrategy headerFilterStrategy) {
        return !headerFilterStrategy.applyFilterToExternalHeaders(header.key(), header.value(), exchange);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isAutoCommitEnabled() {
        return this.endpoint.getConfiguration().getAutoCommitEnable() != null && this.endpoint.getConfiguration().getAutoCommitEnable().booleanValue();
    }

    protected String serializeOffsetKey(TopicPartition topicPartition) {
        return topicPartition.topic() + '/' + topicPartition.partition();
    }

    protected String serializeOffsetValue(long j) {
        return String.valueOf(j);
    }

    protected long deserializeOffsetValue(String str) {
        return Long.parseLong(str);
    }
}
