/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.support;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangeTimedOutException;
import org.apache.camel.IsSingleton;
import org.apache.camel.PollingConsumerPollingStrategy;
import org.apache.camel.PooledExchange;
import org.apache.camel.Processor;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.spi.UnitOfWork;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.LoggingExceptionHandler;
import org.apache.camel.support.PluginHelper;
import org.apache.camel.support.PollingConsumerSupport;
import org.apache.camel.support.service.ServiceHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventDrivenPollingConsumer
extends PollingConsumerSupport
implements Processor,
IsSingleton {
    private static final Logger LOG = LoggerFactory.getLogger(EventDrivenPollingConsumer.class);
    private final BlockingQueue<Exchange> queue;
    private ExceptionHandler interruptedExceptionHandler;
    private Consumer consumer;
    private boolean blockWhenFull = true;
    private long blockTimeout;
    private final int queueCapacity;
    private boolean copy;

    public EventDrivenPollingConsumer(Endpoint endpoint) {
        this(endpoint, 1000);
    }

    public EventDrivenPollingConsumer(Endpoint endpoint, int queueSize) {
        super(endpoint);
        this.queueCapacity = queueSize;
        this.queue = queueSize <= 0 ? new LinkedBlockingQueue<Exchange>() : new ArrayBlockingQueue<Exchange>(queueSize);
        this.interruptedExceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), EventDrivenPollingConsumer.class);
    }

    public EventDrivenPollingConsumer(Endpoint endpoint, BlockingQueue<Exchange> queue) {
        super(endpoint);
        this.queue = queue;
        this.queueCapacity = queue.remainingCapacity();
        this.interruptedExceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), EventDrivenPollingConsumer.class);
    }

    @Override
    public Processor getProcessor() {
        return this;
    }

    public boolean isBlockWhenFull() {
        return this.blockWhenFull;
    }

    public void setBlockWhenFull(boolean blockWhenFull) {
        this.blockWhenFull = blockWhenFull;
    }

    public long getBlockTimeout() {
        return this.blockTimeout;
    }

    public void setBlockTimeout(long blockTimeout) {
        this.blockTimeout = blockTimeout;
    }

    public boolean isCopy() {
        return this.copy;
    }

    public void setCopy(boolean copy) {
        this.copy = copy;
    }

    public int getQueueCapacity() {
        return this.queueCapacity;
    }

    public int getQueueSize() {
        return this.queue.size();
    }

    @Override
    public Exchange receiveNoWait() {
        return this.receive(0L);
    }

    /*
     * Loose catch block
     */
    @Override
    public Exchange receive() {
        if (!this.isRunAllowed() || !this.isStarted()) {
            throw new RejectedExecutionException(String.valueOf(this) + " is not started, but in state: " + this.getStatus().name());
        }
        while (this.isRunAllowed()) {
            this.lock.lock();
            try {
                try {
                    this.beforePoll(0L);
                    Exchange exchange = this.queue.take();
                    return exchange;
                }
                catch (InterruptedException e) {
                    this.handleInterruptedException(e);
                    continue;
                }
                finally {
                    this.afterPoll();
                    continue;
                }
                {
                    catch (Throwable throwable) {
                        throw throwable;
                    }
                }
            }
            finally {
                this.lock.unlock();
            }
        }
        LOG.trace("Consumer is not running, so returning null");
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Exchange receive(long timeout) {
        if (!this.isRunAllowed() || !this.isStarted()) {
            throw new RejectedExecutionException(String.valueOf(this) + " is not started, but in state: " + this.getStatus().name());
        }
        this.lock.lock();
        try {
            try {
                timeout = this.beforePoll(timeout);
                Exchange exchange = this.queue.poll(timeout, TimeUnit.MILLISECONDS);
                this.afterPoll();
                return exchange;
            }
            catch (InterruptedException e) {
                try {
                    this.handleInterruptedException(e);
                    Exchange exchange = null;
                    this.afterPoll();
                    this.lock.unlock();
                    return exchange;
                }
                catch (Throwable throwable) {
                    this.afterPoll();
                    throw throwable;
                }
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    public void process(Exchange exchange) throws Exception {
        boolean pooled = exchange instanceof PooledExchange;
        if (this.isCopy() || pooled) {
            exchange = this.prepareCopy(exchange, true);
        }
        if (this.isBlockWhenFull()) {
            try {
                if (this.getBlockTimeout() <= 0L) {
                    this.queue.put(exchange);
                } else {
                    boolean added = this.queue.offer(exchange, this.getBlockTimeout(), TimeUnit.MILLISECONDS);
                    if (!added) {
                        throw new ExchangeTimedOutException(exchange, this.getBlockTimeout());
                    }
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                LOG.debug("Put interrupted, are we stopping? {}", (Object)(this.isStopping() || this.isStopped() ? 1 : 0));
            }
        } else {
            this.queue.add(exchange);
        }
    }

    protected Exchange prepareCopy(Exchange exchange, boolean handover) {
        Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, handover, true);
        UnitOfWork uow = PluginHelper.getUnitOfWorkFactory(this.getEndpoint().getCamelContext()).createUnitOfWork(copy);
        copy.getExchangeExtension().setUnitOfWork(uow);
        return copy;
    }

    public ExceptionHandler getInterruptedExceptionHandler() {
        return this.interruptedExceptionHandler;
    }

    public void setInterruptedExceptionHandler(ExceptionHandler interruptedExceptionHandler) {
        this.interruptedExceptionHandler = interruptedExceptionHandler;
    }

    public Consumer getDelegateConsumer() {
        return this.consumer;
    }

    protected void handleInterruptedException(InterruptedException e) {
        Thread.currentThread().interrupt();
        this.getInterruptedExceptionHandler().handleException(e);
    }

    protected long beforePoll(long timeout) {
        Consumer consumer = this.consumer;
        if (consumer instanceof PollingConsumerPollingStrategy) {
            PollingConsumerPollingStrategy strategy = (PollingConsumerPollingStrategy)((Object)consumer);
            try {
                timeout = strategy.beforePoll(timeout);
            }
            catch (Exception e) {
                LOG.debug("Error occurred before polling {}. This exception will be ignored.", (Object)this.consumer, (Object)e);
            }
        }
        return timeout;
    }

    protected void afterPoll() {
        Consumer consumer = this.consumer;
        if (consumer instanceof PollingConsumerPollingStrategy) {
            PollingConsumerPollingStrategy strategy = (PollingConsumerPollingStrategy)((Object)consumer);
            try {
                strategy.afterPoll();
            }
            catch (Exception e) {
                LOG.debug("Error occurred after polling {}. This exception will be ignored.", (Object)this.consumer, (Object)e);
            }
        }
    }

    protected Consumer getConsumer() {
        return this.consumer;
    }

    protected Consumer createConsumer() throws Exception {
        return this.getEndpoint().createConsumer(this);
    }

    @Override
    protected void doBuild() throws Exception {
        super.doBuild();
        this.consumer = this.createConsumer();
    }

    @Override
    protected void doInit() throws Exception {
        super.doInit();
        ServiceHelper.initService((Object)this.consumer);
    }

    @Override
    protected void doStart() throws Exception {
        Consumer consumer = this.consumer;
        if (consumer instanceof PollingConsumerPollingStrategy) {
            PollingConsumerPollingStrategy strategy = (PollingConsumerPollingStrategy)((Object)consumer);
            strategy.onInit();
        } else {
            ServiceHelper.startService(this.consumer);
        }
    }

    @Override
    protected void doStop() throws Exception {
        ServiceHelper.stopService(this.consumer);
    }

    @Override
    protected void doShutdown() throws Exception {
        ServiceHelper.stopAndShutdownService(this.consumer);
        this.queue.clear();
    }

    @Override
    public boolean isSingleton() {
        return true;
    }
}

