/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.springrabbit;

import com.rabbitmq.client.Channel;
import java.util.Map;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.RollbackExchangeException;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.component.springrabbit.MessagePropertiesConverter;
import org.apache.camel.component.springrabbit.SpringRabbitMQConsumer;
import org.apache.camel.component.springrabbit.SpringRabbitMQEndpoint;
import org.apache.camel.support.AsyncProcessorConverterHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Address;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.support.converter.MessageConverter;

public class EndpointMessageListener
implements ChannelAwareMessageListener {
    private static final Logger LOG = LoggerFactory.getLogger(EndpointMessageListener.class);
    private final SpringRabbitMQConsumer consumer;
    private final SpringRabbitMQEndpoint endpoint;
    private final AsyncProcessor processor;
    private final MessagePropertiesConverter messagePropertiesConverter;
    private final MessageConverter messageConverter;
    private RabbitTemplate template;
    private boolean disableReplyTo;
    private boolean async;

    public EndpointMessageListener(SpringRabbitMQConsumer consumer, SpringRabbitMQEndpoint endpoint, Processor processor) {
        this.consumer = consumer;
        this.endpoint = endpoint;
        this.processor = AsyncProcessorConverterHelper.convert(processor);
        this.messagePropertiesConverter = endpoint.getMessagePropertiesConverter();
        this.messageConverter = endpoint.getMessageConverter();
    }

    public boolean isAsync() {
        return this.async;
    }

    public void setAsync(boolean async) {
        this.async = async;
    }

    public boolean isDisableReplyTo() {
        return this.disableReplyTo;
    }

    public void setDisableReplyTo(boolean disableReplyTo) {
        this.disableReplyTo = disableReplyTo;
    }

    public synchronized RabbitTemplate getTemplate() {
        if (this.template == null) {
            this.template = this.endpoint.createInOnlyTemplate();
        }
        return this.template;
    }

    public void setTemplate(RabbitTemplate template) {
        this.template = template;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onMessage(org.springframework.amqp.core.Message message, Channel channel) throws Exception {
        RuntimeCamelException rce;
        LOG.trace("onMessage START");
        LOG.debug("{} consumer received RabbitMQ message: {}", (Object)this.endpoint, (Object)message);
        try {
            Address replyDestination = message.getMessageProperties() != null ? message.getMessageProperties().getReplyToAddress() : null;
            boolean sendReply = !this.isDisableReplyTo() && replyDestination != null;
            Exchange exchange = this.createExchange(message, channel, replyDestination);
            LOG.trace("onMessage.process START");
            EndpointMessageListenerAsyncCallback callback = new EndpointMessageListenerAsyncCallback(message, exchange, this.endpoint, sendReply, replyDestination);
            boolean forceSync = this.endpoint.isSynchronous();
            if (forceSync || !this.isAsync()) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Processing exchange {} synchronously", (Object)exchange.getExchangeId());
                }
                try {
                    this.processor.process(exchange);
                }
                catch (Exception e) {
                    exchange.setException(e);
                }
                finally {
                    callback.done(true);
                }
            } else {
                boolean sync;
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Processing exchange {} asynchronously", (Object)exchange.getExchangeId());
                }
                if (!(sync = this.processor.process(exchange, callback))) {
                    return;
                }
            }
            rce = exchange.getException(RuntimeCamelException.class);
            this.consumer.releaseExchange(exchange, false);
        }
        catch (Exception e) {
            rce = RuntimeCamelException.wrapRuntimeCamelException(e);
        }
        if (rce != null) {
            LOG.trace("onMessage END throwing exception: {}", (Object)rce.getMessage());
            throw rce;
        }
        LOG.trace("onMessage END");
    }

    protected Exchange createExchange(org.springframework.amqp.core.Message message, Channel channel, Object replyDestination) {
        Exchange exchange = this.consumer.createExchange(false);
        exchange.setProperty("CamelSpringRabbitmqChannel", (Object)channel);
        Object body = this.endpoint.getMessageConverter().fromMessage(message);
        exchange.getMessage().setBody(body);
        Map<String, Object> headers = this.endpoint.getMessagePropertiesConverter().fromMessageProperties(message.getMessageProperties(), exchange);
        if (!headers.isEmpty()) {
            exchange.getMessage().setHeaders(headers);
        }
        if (replyDestination != null && !this.disableReplyTo && !exchange.getPattern().isOutCapable()) {
            exchange.setPattern(ExchangePattern.InOut);
        }
        return exchange;
    }

    private final class EndpointMessageListenerAsyncCallback
    implements AsyncCallback {
        private final org.springframework.amqp.core.Message message;
        private final Exchange exchange;
        private final SpringRabbitMQEndpoint endpoint;
        private final boolean sendReply;
        private final Address replyDestination;

        private EndpointMessageListenerAsyncCallback(org.springframework.amqp.core.Message message, Exchange exchange, SpringRabbitMQEndpoint endpoint, boolean sendReply, Address replyDestination) {
            this.message = message;
            this.exchange = exchange;
            this.endpoint = endpoint;
            this.sendReply = sendReply;
            this.replyDestination = replyDestination;
        }

        @Override
        public void done(boolean doneSync) {
            LOG.trace("onMessage.process END");
            Message body = null;
            RuntimeCamelException rce = null;
            if (this.exchange.isFailed() || this.exchange.isRollbackOnly()) {
                if (this.exchange.isRollbackOnly()) {
                    rce = RuntimeCamelException.wrapRuntimeCamelException(new RollbackExchangeException(this.exchange));
                } else if (this.exchange.getException() != null) {
                    rce = RuntimeCamelException.wrapRuntimeCamelException(this.exchange.getException());
                }
            } else if (this.sendReply && this.exchange.getPattern().isOutCapable()) {
                body = this.exchange.getMessage();
            }
            if (rce == null && this.sendReply && body != null) {
                LOG.trace("onMessage.sendReply START");
                try {
                    this.sendReply(this.replyDestination, this.message, this.exchange, body);
                }
                catch (Exception e) {
                    rce = new RuntimeCamelException(e);
                }
                LOG.trace("onMessage.sendReply END");
            }
            if (rce != null) {
                if (doneSync) {
                    this.exchange.setException(rce);
                } else if (this.endpoint.getExceptionHandler() != null) {
                    this.endpoint.getExceptionHandler().handleException(rce);
                }
            }
            if (!doneSync) {
                EndpointMessageListener.this.consumer.releaseExchange(this.exchange, false);
            }
        }

        private void sendReply(Address replyDestination, org.springframework.amqp.core.Message message, Exchange exchange, Message out) {
            org.springframework.amqp.core.Message msg;
            if (replyDestination == null) {
                LOG.debug("Cannot send reply message as there is no reply-to for: {}", (Object)out);
                return;
            }
            String cid = message.getMessageProperties().getCorrelationId();
            Object body = out.getBody();
            if (body instanceof org.springframework.amqp.core.Message) {
                msg = (org.springframework.amqp.core.Message)body;
            } else {
                MessageProperties mp = this.endpoint.getMessagePropertiesConverter().toMessageProperties(exchange);
                mp.setCorrelationId(cid);
                msg = this.endpoint.getMessageConverter().toMessage(body, mp);
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("{} sending reply message [correlationId:{}]: {}", new Object[]{this.endpoint, cid, msg});
            }
            EndpointMessageListener.this.getTemplate().send(replyDestination.getExchangeName(), replyDestination.getRoutingKey(), msg);
        }
    }
}

