package org.apache.camel.component.splunk;

import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePropertyKey;
import org.apache.camel.Processor;
import org.apache.camel.component.splunk.event.SplunkEvent;
import org.apache.camel.component.splunk.support.SplunkDataReader;
import org.apache.camel.support.ScheduledBatchPollingConsumer;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/camel/component/splunk/SplunkConsumer.class */
public class SplunkConsumer extends ScheduledBatchPollingConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(SplunkConsumer.class);
    private SplunkDataReader dataReader;
    private SplunkEndpoint endpoint;

    public SplunkConsumer(SplunkEndpoint splunkEndpoint, Processor processor, ConsumerType consumerType) {
        super(splunkEndpoint, processor);
        this.endpoint = splunkEndpoint;
        if ((consumerType.equals(ConsumerType.NORMAL) || consumerType.equals(ConsumerType.REALTIME)) && ObjectHelper.isEmpty(splunkEndpoint.getConfiguration().getSearch())) {
            throw new RuntimeException("Missing option 'search' with normal or realtime search");
        }
        if (consumerType.equals(ConsumerType.SAVEDSEARCH) && ObjectHelper.isEmpty(splunkEndpoint.getConfiguration().getSavedSearch())) {
            throw new RuntimeException("Missing option 'savedSearch' with saved search");
        }
        this.dataReader = new SplunkDataReader(splunkEndpoint, consumerType);
    }

    @Override // org.apache.camel.support.ScheduledPollConsumer
    protected int poll() throws Exception {
        try {
            if (this.endpoint.getConfiguration().isStreaming()) {
                this.dataReader.read(splunkEvent -> {
                    Exchange createExchange = createExchange(true);
                    createExchange.getIn().setBody(splunkEvent);
                    getAsyncProcessor().process(createExchange, defaultConsumerCallback(createExchange, true));
                });
                return 0;
            }
            List<SplunkEvent> read = this.dataReader.read();
            forceConsumerAsReady();
            return processBatch(CastUtils.cast((Queue<?>) createExchanges(read)));
        } catch (Exception e) {
            this.endpoint.reset(e);
            getExceptionHandler().handleException(e);
            return 0;
        }
    }

    protected Queue<Exchange> createExchanges(List<SplunkEvent> list) {
        LOG.trace("Received {} messages in this poll", Integer.valueOf(list.size()));
        LinkedList linkedList = new LinkedList();
        for (SplunkEvent splunkEvent : list) {
            Exchange createExchange = createExchange(true);
            createExchange.getIn().setBody(splunkEvent);
            linkedList.add(createExchange);
        }
        return linkedList;
    }

    @Override // org.apache.camel.BatchConsumer
    public int processBatch(Queue<Object> queue) throws Exception {
        int size = queue.size();
        int i = 0;
        while (i < size && isBatchAllowed()) {
            Exchange exchange = (Exchange) ObjectHelper.cast(Exchange.class, queue.poll());
            exchange.setProperty(ExchangePropertyKey.BATCH_INDEX, Integer.valueOf(i));
            exchange.setProperty(ExchangePropertyKey.BATCH_SIZE, Integer.valueOf(size));
            exchange.setProperty(ExchangePropertyKey.BATCH_COMPLETE, Boolean.valueOf(i == size - 1));
            try {
                LOG.trace("Processing exchange [{}]...", exchange);
                getProcessor().process(exchange);
            } catch (Exception e) {
                exchange.setException(e);
            }
            if (exchange.getException() != null) {
                getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException());
            }
            i++;
        }
        return size;
    }
}
