/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.support;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangeTimedOutException;
import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.ExtendedExchange;
import org.apache.camel.IsSingleton;
import org.apache.camel.PollingConsumerPollingStrategy;
import org.apache.camel.PooledExchange;
import org.apache.camel.Processor;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.spi.UnitOfWork;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.LoggingExceptionHandler;
import org.apache.camel.support.PollingConsumerSupport;
import org.apache.camel.support.service.ServiceHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventDrivenPollingConsumer
extends PollingConsumerSupport
implements Processor,
IsSingleton {
    private static final Logger LOG = LoggerFactory.getLogger(EventDrivenPollingConsumer.class);
    private final BlockingQueue<Exchange> queue;
    private ExceptionHandler interruptedExceptionHandler;
    private Consumer consumer;
    private boolean blockWhenFull = true;
    private long blockTimeout;
    private final int queueCapacity;
    private boolean copy;

    public EventDrivenPollingConsumer(Endpoint endpoint) {
        this(endpoint, 1000);
    }

    public EventDrivenPollingConsumer(Endpoint endpoint, int queueSize) {
        super(endpoint);
        this.queueCapacity = queueSize;
        this.queue = queueSize <= 0 ? new LinkedBlockingQueue<Exchange>() : new ArrayBlockingQueue<Exchange>(queueSize);
        this.interruptedExceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), EventDrivenPollingConsumer.class);
    }

    public EventDrivenPollingConsumer(Endpoint endpoint, BlockingQueue<Exchange> queue) {
        super(endpoint);
        this.queue = queue;
        this.queueCapacity = queue.remainingCapacity();
        this.interruptedExceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), EventDrivenPollingConsumer.class);
    }

    @Override
    public Processor getProcessor() {
        return this;
    }

    public boolean isBlockWhenFull() {
        return this.blockWhenFull;
    }

    public void setBlockWhenFull(boolean blockWhenFull) {
        this.blockWhenFull = blockWhenFull;
    }

    public long getBlockTimeout() {
        return this.blockTimeout;
    }

    public void setBlockTimeout(long blockTimeout) {
        this.blockTimeout = blockTimeout;
    }

    public boolean isCopy() {
        return this.copy;
    }

    public void setCopy(boolean copy) {
        this.copy = copy;
    }

    public int getQueueCapacity() {
        return this.queueCapacity;
    }

    public int getQueueSize() {
        return this.queue.size();
    }

    @Override
    public Exchange receiveNoWait() {
        return this.receive(0L);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Exchange receive() {
        if (!this.isRunAllowed() || !this.isStarted()) {
            throw new RejectedExecutionException(this + " is not started, but in state: " + this.getStatus().name());
        }
        while (this.isRunAllowed()) {
            EventDrivenPollingConsumer eventDrivenPollingConsumer = this;
            synchronized (eventDrivenPollingConsumer) {
                Exchange exchange;
                try {
                    this.beforePoll(0L);
                    exchange = this.queue.take();
                    this.afterPoll();
                }
                catch (InterruptedException e) {
                    try {
                        this.handleInterruptedException(e);
                    }
                    catch (Throwable throwable) {
                        throw throwable;
                    }
                    finally {
                        this.afterPoll();
                    }
                    continue;
                }
                return exchange;
            }
        }
        LOG.trace("Consumer is not running, so returning null");
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Exchange receive(long timeout2) {
        if (!this.isRunAllowed() || !this.isStarted()) {
            throw new RejectedExecutionException(this + " is not started, but in state: " + this.getStatus().name());
        }
        EventDrivenPollingConsumer eventDrivenPollingConsumer = this;
        synchronized (eventDrivenPollingConsumer) {
            Exchange exchange;
            try {
                timeout2 = this.beforePoll(timeout2);
                exchange = this.queue.poll(timeout2, TimeUnit.MILLISECONDS);
                this.afterPoll();
            }
            catch (InterruptedException e) {
                Exchange exchange2;
                try {
                    this.handleInterruptedException(e);
                    exchange2 = null;
                    this.afterPoll();
                }
                catch (Throwable throwable) {
                    this.afterPoll();
                    throw throwable;
                }
                return exchange2;
            }
            return exchange;
        }
    }

    @Override
    public void process(Exchange exchange) throws Exception {
        boolean pooled = exchange instanceof PooledExchange;
        if (this.isCopy() || pooled) {
            exchange = this.prepareCopy(exchange, true);
        }
        if (this.isBlockWhenFull()) {
            try {
                if (this.getBlockTimeout() <= 0L) {
                    this.queue.put(exchange);
                } else {
                    boolean added = this.queue.offer(exchange, this.getBlockTimeout(), TimeUnit.MILLISECONDS);
                    if (!added) {
                        throw new ExchangeTimedOutException(exchange, this.getBlockTimeout());
                    }
                }
            }
            catch (InterruptedException e) {
                LOG.debug("Put interrupted, are we stopping? {}", (Object)(this.isStopping() || this.isStopped() ? 1 : 0));
            }
        } else {
            this.queue.add(exchange);
        }
    }

    protected Exchange prepareCopy(Exchange exchange, boolean handover) {
        Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, handover, true);
        UnitOfWork uow = this.getEndpoint().getCamelContext().adapt(ExtendedCamelContext.class).getUnitOfWorkFactory().createUnitOfWork(copy);
        copy.adapt(ExtendedExchange.class).setUnitOfWork(uow);
        return copy;
    }

    public ExceptionHandler getInterruptedExceptionHandler() {
        return this.interruptedExceptionHandler;
    }

    public void setInterruptedExceptionHandler(ExceptionHandler interruptedExceptionHandler) {
        this.interruptedExceptionHandler = interruptedExceptionHandler;
    }

    public Consumer getDelegateConsumer() {
        return this.consumer;
    }

    protected void handleInterruptedException(InterruptedException e) {
        this.getInterruptedExceptionHandler().handleException(e);
    }

    protected long beforePoll(long timeout2) {
        if (this.consumer instanceof PollingConsumerPollingStrategy) {
            PollingConsumerPollingStrategy strategy = (PollingConsumerPollingStrategy)((Object)this.consumer);
            try {
                timeout2 = strategy.beforePoll(timeout2);
            }
            catch (Exception e) {
                LOG.debug("Error occurred before polling {}. This exception will be ignored.", (Object)this.consumer, (Object)e);
            }
        }
        return timeout2;
    }

    protected void afterPoll() {
        if (this.consumer instanceof PollingConsumerPollingStrategy) {
            PollingConsumerPollingStrategy strategy = (PollingConsumerPollingStrategy)((Object)this.consumer);
            try {
                strategy.afterPoll();
            }
            catch (Exception e) {
                LOG.debug("Error occurred after polling {}. This exception will be ignored.", (Object)this.consumer, (Object)e);
            }
        }
    }

    protected Consumer getConsumer() {
        return this.consumer;
    }

    protected Consumer createConsumer() throws Exception {
        return this.getEndpoint().createConsumer(this);
    }

    @Override
    protected void doBuild() throws Exception {
        super.doBuild();
        this.consumer = this.createConsumer();
    }

    @Override
    protected void doInit() throws Exception {
        super.doInit();
        ServiceHelper.initService((Object)this.consumer);
    }

    @Override
    protected void doStart() throws Exception {
        if (this.consumer instanceof PollingConsumerPollingStrategy) {
            PollingConsumerPollingStrategy strategy = (PollingConsumerPollingStrategy)((Object)this.consumer);
            strategy.onInit();
        } else {
            ServiceHelper.startService((Object)this.consumer);
        }
    }

    @Override
    protected void doStop() throws Exception {
        ServiceHelper.stopService((Object)this.consumer);
    }

    @Override
    protected void doShutdown() throws Exception {
        ServiceHelper.stopAndShutdownService(this.consumer);
        this.queue.clear();
    }

    @Override
    public boolean isSingleton() {
        return true;
    }
}

