/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.disruptor;

import java.util.HashSet;
import java.util.Set;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ShutdownRunningTask;
import org.apache.camel.Suspendable;
import org.apache.camel.component.disruptor.AbstractLifecycleAwareExchangeEventHandler;
import org.apache.camel.component.disruptor.DisruptorEndpoint;
import org.apache.camel.component.disruptor.ExchangeEvent;
import org.apache.camel.component.disruptor.LifecycleAwareExchangeEventHandler;
import org.apache.camel.component.disruptor.SynchronizedExchange;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.spi.ShutdownAware;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.AsyncProcessorConverterHelper;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.LoggingExceptionHandler;
import org.apache.camel.support.service.ServiceSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DisruptorConsumer
extends ServiceSupport
implements Consumer,
Suspendable,
ShutdownAware {
    private static final Logger LOGGER = LoggerFactory.getLogger(DisruptorConsumer.class);
    private static final AsyncCallback NOOP_ASYNC_CALLBACK = new AsyncCallback(){

        @Override
        public void done(boolean doneSync) {
        }
    };
    private final DisruptorEndpoint endpoint;
    private final AsyncProcessor processor;
    private ExceptionHandler exceptionHandler;

    public DisruptorConsumer(DisruptorEndpoint endpoint, Processor processor) {
        this.endpoint = endpoint;
        this.processor = AsyncProcessorConverterHelper.convert(processor);
    }

    @Override
    public AsyncProcessor getProcessor() {
        return this.processor;
    }

    public ExceptionHandler getExceptionHandler() {
        if (this.exceptionHandler == null) {
            this.exceptionHandler = new LoggingExceptionHandler(this.endpoint.getCamelContext(), this.getClass());
        }
        return this.exceptionHandler;
    }

    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
        this.exceptionHandler = exceptionHandler;
    }

    @Override
    public DisruptorEndpoint getEndpoint() {
        return this.endpoint;
    }

    @Override
    protected void doStart() throws Exception {
        this.getEndpoint().onStarted(this);
    }

    @Override
    protected void doStop() throws Exception {
        this.getEndpoint().onStopped(this);
    }

    @Override
    protected void doSuspend() throws Exception {
        this.getEndpoint().onStopped(this);
    }

    @Override
    protected void doResume() throws Exception {
        this.getEndpoint().onStarted(this);
    }

    Set<LifecycleAwareExchangeEventHandler> createEventHandlers(int concurrentConsumers) {
        HashSet<LifecycleAwareExchangeEventHandler> eventHandlers = new HashSet<LifecycleAwareExchangeEventHandler>();
        for (int i = 0; i < concurrentConsumers; ++i) {
            eventHandlers.add(new ConsumerEventHandler(i, concurrentConsumers));
        }
        return eventHandlers;
    }

    @Override
    public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
        return true;
    }

    @Override
    public void prepareShutdown(boolean suspendOnly, boolean forced) {
    }

    @Override
    public int getPendingExchangesSize() {
        return this.getEndpoint().getDisruptor().getPendingExchangeCount();
    }

    public String toString() {
        return "DisruptorConsumer[" + String.valueOf(this.endpoint) + "]";
    }

    private Exchange prepareExchange(Exchange exchange) {
        Exchange newExchange = ExchangeHelper.copyExchangeWithProperties(exchange, this.endpoint.getCamelContext());
        newExchange.getExchangeExtension().setFromEndpoint(this.endpoint);
        return newExchange;
    }

    private void process(SynchronizedExchange synchronizedExchange) {
        try {
            boolean ignore;
            Exchange exchange = synchronizedExchange.getExchange();
            boolean bl = ignore = exchange.hasProperties() && exchange.getProperties().containsKey("disruptor.ignoreExchange");
            if (ignore) {
                LOGGER.trace("Ignoring exchange {}", (Object)exchange);
                return;
            }
            Exchange result = this.prepareExchange(exchange);
            result.getExchangeExtension().addOnCompletion(DisruptorConsumer.newSynchronization(synchronizedExchange, result));
            this.processor.process(result, NOOP_ASYNC_CALLBACK);
        }
        catch (Exception e) {
            this.handleException(synchronizedExchange, e);
        }
    }

    private static Synchronization newSynchronization(final SynchronizedExchange synchronizedExchange, final Exchange result) {
        return new Synchronization(){

            @Override
            public void onComplete(Exchange exchange) {
                synchronizedExchange.consumed(result);
            }

            @Override
            public void onFailure(Exchange exchange) {
                synchronizedExchange.consumed(result);
            }
        };
    }

    private void handleException(SynchronizedExchange synchronizedExchange, Exception e) {
        Exchange exchange = synchronizedExchange.getExchange();
        if (exchange != null) {
            this.getExceptionHandler().handleException("Error processing exchange", exchange, e);
        } else {
            this.getExceptionHandler().handleException(e);
        }
    }

    @Override
    public Exchange createExchange(boolean autoRelease) {
        return null;
    }

    @Override
    public void releaseExchange(Exchange exchange, boolean autoRelease) {
    }

    private class ConsumerEventHandler
    extends AbstractLifecycleAwareExchangeEventHandler {
        private final int ordinal;
        private final int concurrentConsumers;

        ConsumerEventHandler(int ordinal, int concurrentConsumers) {
            this.ordinal = ordinal;
            this.concurrentConsumers = concurrentConsumers;
        }

        @Override
        public void onEvent(ExchangeEvent event, long sequence, boolean endOfBatch) throws Exception {
            if (sequence % (long)this.concurrentConsumers == (long)this.ordinal) {
                DisruptorConsumer.this.process(event.getSynchronizedExchange());
            }
        }
    }
}

