package org.apache.camel.support;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangeTimedOutException;
import org.apache.camel.IsSingleton;
import org.apache.camel.PollingConsumerPollingStrategy;
import org.apache.camel.PooledExchange;
import org.apache.camel.Processor;
import org.apache.camel.Service;
import org.apache.camel.impl.debugger.BacklogTracer;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.support.service.ServiceHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/camel/support/EventDrivenPollingConsumer.class */
public class EventDrivenPollingConsumer extends PollingConsumerSupport implements Processor, IsSingleton {
    private static final Logger LOG = LoggerFactory.getLogger(EventDrivenPollingConsumer.class);
    private final BlockingQueue<Exchange> queue;
    private ExceptionHandler interruptedExceptionHandler;
    private Consumer consumer;
    private boolean blockWhenFull;
    private long blockTimeout;
    private final int queueCapacity;
    private boolean copy;

    public EventDrivenPollingConsumer(Endpoint endpoint) {
        this(endpoint, BacklogTracer.MAX_BACKLOG_SIZE);
    }

    public EventDrivenPollingConsumer(Endpoint endpoint, int i) {
        super(endpoint);
        this.blockWhenFull = true;
        this.queueCapacity = i;
        if (i <= 0) {
            this.queue = new LinkedBlockingQueue();
        } else {
            this.queue = new ArrayBlockingQueue(i);
        }
        this.interruptedExceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), (Class<?>) EventDrivenPollingConsumer.class);
    }

    public EventDrivenPollingConsumer(Endpoint endpoint, BlockingQueue<Exchange> blockingQueue) {
        super(endpoint);
        this.blockWhenFull = true;
        this.queue = blockingQueue;
        this.queueCapacity = blockingQueue.remainingCapacity();
        this.interruptedExceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), (Class<?>) EventDrivenPollingConsumer.class);
    }

    @Override // org.apache.camel.support.PollingConsumerSupport, org.apache.camel.Consumer
    public Processor getProcessor() {
        return this;
    }

    public boolean isBlockWhenFull() {
        return this.blockWhenFull;
    }

    public void setBlockWhenFull(boolean z) {
        this.blockWhenFull = z;
    }

    public long getBlockTimeout() {
        return this.blockTimeout;
    }

    public void setBlockTimeout(long j) {
        this.blockTimeout = j;
    }

    public boolean isCopy() {
        return this.copy;
    }

    public void setCopy(boolean z) {
        this.copy = z;
    }

    public int getQueueCapacity() {
        return this.queueCapacity;
    }

    public int getQueueSize() {
        return this.queue.size();
    }

    public Exchange receiveNoWait() {
        return receive(0L);
    }

    public Exchange receive() {
        if (!isRunAllowed() || !isStarted()) {
            throw new RejectedExecutionException(String.valueOf(this) + " is not started, but in state: " + getStatus().name());
        }
        while (isRunAllowed()) {
            this.lock.lock();
            try {
                beforePoll(0L);
                Exchange take = this.queue.take();
                this.lock.unlock();
                return take;
            } catch (InterruptedException e) {
                try {
                    try {
                        handleInterruptedException(e);
                        afterPoll();
                        this.lock.unlock();
                    } finally {
                        afterPoll();
                    }
                } catch (Throwable th) {
                    this.lock.unlock();
                    throw th;
                }
            }
        }
        LOG.trace("Consumer is not running, so returning null");
        return null;
    }

    public Exchange receive(long j) {
        if (!isRunAllowed() || !isStarted()) {
            throw new RejectedExecutionException(String.valueOf(this) + " is not started, but in state: " + getStatus().name());
        }
        this.lock.lock();
        try {
            try {
                try {
                    Exchange poll = this.queue.poll(beforePoll(j), TimeUnit.MILLISECONDS);
                    afterPoll();
                    this.lock.unlock();
                    return poll;
                } catch (InterruptedException e) {
                    handleInterruptedException(e);
                    afterPoll();
                    this.lock.unlock();
                    return null;
                }
            } catch (Throwable th) {
                afterPoll();
                throw th;
            }
        } catch (Throwable th2) {
            this.lock.unlock();
            throw th2;
        }
    }

    public void process(Exchange exchange) throws Exception {
        boolean z = exchange instanceof PooledExchange;
        if (isCopy() || z) {
            exchange = prepareCopy(exchange, true);
        }
        if (!isBlockWhenFull()) {
            this.queue.add(exchange);
            return;
        }
        try {
            if (getBlockTimeout() <= 0) {
                this.queue.put(exchange);
            } else if (!this.queue.offer(exchange, getBlockTimeout(), TimeUnit.MILLISECONDS)) {
                throw new ExchangeTimedOutException(exchange, getBlockTimeout());
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOG.debug("Put interrupted, are we stopping? {}", Boolean.valueOf(isStopping() || isStopped()));
        }
    }

    protected Exchange prepareCopy(Exchange exchange, boolean z) {
        Exchange createCorrelatedCopy = ExchangeHelper.createCorrelatedCopy(exchange, z, true);
        createCorrelatedCopy.getExchangeExtension().setUnitOfWork(PluginHelper.getUnitOfWorkFactory(getEndpoint().getCamelContext()).createUnitOfWork(createCorrelatedCopy));
        return createCorrelatedCopy;
    }

    public ExceptionHandler getInterruptedExceptionHandler() {
        return this.interruptedExceptionHandler;
    }

    public void setInterruptedExceptionHandler(ExceptionHandler exceptionHandler) {
        this.interruptedExceptionHandler = exceptionHandler;
    }

    public Consumer getDelegateConsumer() {
        return this.consumer;
    }

    protected void handleInterruptedException(InterruptedException interruptedException) {
        Thread.currentThread().interrupt();
        getInterruptedExceptionHandler().handleException(interruptedException);
    }

    protected long beforePoll(long j) {
        Consumer consumer = this.consumer;
        if (consumer instanceof PollingConsumerPollingStrategy) {
            try {
                j = ((PollingConsumerPollingStrategy) consumer).beforePoll(j);
            } catch (Exception e) {
                LOG.debug("Error occurred before polling {}. This exception will be ignored.", this.consumer, e);
            }
        }
        return j;
    }

    protected void afterPoll() {
        Consumer consumer = this.consumer;
        if (consumer instanceof PollingConsumerPollingStrategy) {
            try {
                ((PollingConsumerPollingStrategy) consumer).afterPoll();
            } catch (Exception e) {
                LOG.debug("Error occurred after polling {}. This exception will be ignored.", this.consumer, e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Consumer getConsumer() {
        return this.consumer;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Consumer createConsumer() throws Exception {
        return getEndpoint().createConsumer(this);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.support.service.BaseService
    public void doBuild() throws Exception {
        super.doBuild();
        this.consumer = createConsumer();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.support.service.BaseService
    public void doInit() throws Exception {
        super.doInit();
        ServiceHelper.initService(this.consumer);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.support.service.BaseService
    public void doStart() throws Exception {
        Consumer consumer = this.consumer;
        if (consumer instanceof PollingConsumerPollingStrategy) {
            ((PollingConsumerPollingStrategy) consumer).onInit();
        } else {
            ServiceHelper.startService((Service) this.consumer);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.support.service.BaseService
    public void doStop() throws Exception {
        ServiceHelper.stopService((Service) this.consumer);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.support.service.BaseService
    public void doShutdown() throws Exception {
        ServiceHelper.stopAndShutdownService(this.consumer);
        this.queue.clear();
    }

    @Override // org.apache.camel.IsSingleton
    public boolean isSingleton() {
        return true;
    }
}
