/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.kafka.consumer.support.streaming;

import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.component.kafka.KafkaConfiguration;
import org.apache.camel.component.kafka.KafkaConsumer;
import org.apache.camel.component.kafka.consumer.CommitManager;
import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor;
import org.apache.camel.component.kafka.consumer.support.ProcessingResult;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class KafkaRecordStreamingProcessor
extends KafkaRecordProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordStreamingProcessor.class);
    private final boolean autoCommitEnabled;
    private final KafkaConfiguration configuration;
    private final Processor processor;
    private final CommitManager commitManager;

    public KafkaRecordStreamingProcessor(KafkaConfiguration configuration, Processor processor, CommitManager commitManager) {
        this.autoCommitEnabled = configuration.isAutoCommitEnable();
        this.configuration = configuration;
        this.processor = processor;
        this.commitManager = commitManager;
    }

    public ProcessingResult processExchange(KafkaConsumer camelKafkaConsumer, TopicPartition topicPartition, boolean partitionHasNext, boolean recordHasNext, ConsumerRecord<Object, Object> consumerRecord) {
        ProcessingResult result;
        Exchange exchange = camelKafkaConsumer.createExchange(false);
        Message message = exchange.getMessage();
        this.setupExchangeMessage(message, consumerRecord);
        this.propagateHeaders(this.configuration, consumerRecord, exchange);
        if (!this.autoCommitEnabled) {
            message.setHeader("kafka.LAST_RECORD_BEFORE_COMMIT", !recordHasNext);
            message.setHeader("kafka.LAST_POLL_RECORD", !recordHasNext && !partitionHasNext);
        }
        if (this.configuration.isAllowManualCommit()) {
            KafkaManualCommit manual = this.commitManager.getManualCommit(exchange, topicPartition, consumerRecord);
            message.setHeader("CamelKafkaManualCommit", manual);
            message.setHeader("kafka.LAST_POLL_RECORD", !recordHasNext && !partitionHasNext);
        }
        try {
            this.processor.process(exchange);
        }
        catch (Exception e) {
            exchange.setException(e);
        }
        if (exchange.getException() != null) {
            LOG.debug("An exception was thrown for consumerRecord at partition {} and offset {}", (Object)consumerRecord.partition(), (Object)consumerRecord.offset());
            ExceptionHandler exceptionHandler = camelKafkaConsumer.getExceptionHandler();
            boolean breakOnErrorExit = this.processException(exchange, topicPartition, consumerRecord, exceptionHandler);
            result = new ProcessingResult(breakOnErrorExit, true, consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset());
        } else {
            result = new ProcessingResult(false, exchange.getException() != null, consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset());
        }
        if (!result.isBreakOnErrorHit()) {
            this.commitManager.recordOffset(topicPartition, consumerRecord.offset());
        }
        camelKafkaConsumer.releaseExchange(exchange, false);
        return result;
    }

    private boolean processException(Exchange exchange, TopicPartition topicPartition, ConsumerRecord<Object, Object> consumerRecord, ExceptionHandler exceptionHandler) {
        if (this.configuration.isBreakOnFirstError()) {
            if (LOG.isWarnEnabled()) {
                Exception exc = exchange.getException();
                LOG.warn("Error during processing {} from topic: {} due to {}", new Object[]{exchange, topicPartition.topic(), exc.getMessage()});
                LOG.warn("Will seek consumer to offset {} on partition {} and start polling again.", (Object)consumerRecord.offset(), (Object)consumerRecord.partition());
            }
            this.commitManager.commit(topicPartition);
            return true;
        }
        exceptionHandler.handleException("Error during processing", exchange, exchange.getException());
        return false;
    }
}

