package org.apache.camel.component.rabbitmq.reply;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.class */
public class TemporaryQueueReplyManager extends ReplyManagerSupport {
    private static final Logger LOG = LoggerFactory.getLogger(TemporaryQueueReplyManager.class);
    private RabbitConsumer consumer;

    /* loaded from: input_file:org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager$RabbitConsumer.class */
    class RabbitConsumer extends DefaultConsumer {
        private final TemporaryQueueReplyManager consumer;
        private final Channel channel;
        private String tag;

        RabbitConsumer(TemporaryQueueReplyManager temporaryQueueReplyManager, Channel channel) {
            super(channel);
            this.consumer = temporaryQueueReplyManager;
            this.channel = channel;
        }

        @Override // com.rabbitmq.client.DefaultConsumer, com.rabbitmq.client.Consumer
        public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
            this.consumer.onMessage(basicProperties, bArr);
        }

        private void start() throws IOException {
            this.tag = this.channel.basicConsume(TemporaryQueueReplyManager.this.getReplyTo(), true, (Consumer) this);
        }

        private void stop() throws IOException, TimeoutException {
            if (this.channel.isOpen()) {
                if (this.tag != null) {
                    this.channel.basicCancel(this.tag);
                }
                this.channel.close();
            }
        }
    }

    public TemporaryQueueReplyManager(CamelContext camelContext) {
        super(camelContext);
    }

    @Override // org.apache.camel.component.rabbitmq.reply.ReplyManagerSupport
    protected ReplyHandler createReplyHandler(ReplyManager replyManager, Exchange exchange, AsyncCallback asyncCallback, String str, String str2, long j) {
        return new TemporaryQueueReplyHandler(this, exchange, asyncCallback, str, str2, j);
    }

    @Override // org.apache.camel.component.rabbitmq.reply.ReplyManager
    public void updateCorrelationId(String str, String str2, long j) {
        LOG.trace("Updated provisional correlationId [{}] to expected correlationId [{}]", str, str2);
        ReplyHandler remove = this.correlation.remove(str);
        if (remove != null) {
            this.correlation.put(str2, remove, j);
        }
    }

    @Override // org.apache.camel.component.rabbitmq.reply.ReplyManagerSupport
    protected void handleReplyMessage(String str, AMQP.BasicProperties basicProperties, byte[] bArr) {
        ReplyHandler replyHandler = this.correlation.get(str);
        if (replyHandler == null && this.endpoint.isUseMessageIDAsCorrelationID()) {
            replyHandler = waitForProvisionCorrelationToBeUpdated(str, bArr);
        }
        if (replyHandler == null) {
            LOG.warn("Reply received for unknown correlationID [{}]. The message will be ignored: {}", str, bArr);
        } else {
            this.correlation.remove(str);
            replyHandler.onReply(str, basicProperties, bArr);
        }
    }

    @Override // org.apache.camel.component.rabbitmq.reply.ReplyManagerSupport
    protected Connection createListenerContainer() throws Exception {
        LOG.trace("Creating connection");
        Connection connect = this.endpoint.connect(this.executorService);
        LOG.trace("Creating channel");
        Channel createChannel = connect.createChannel();
        if (this.endpoint.isPrefetchEnabled()) {
            createChannel.basicQos(this.endpoint.getPrefetchSize(), this.endpoint.getPrefetchCount(), this.endpoint.isPrefetchGlobal());
        }
        AMQP.Queue.DeclareOk queueDeclare = createChannel.queueDeclare();
        LOG.debug("Using temporary queue name: {}", queueDeclare.getQueue());
        setReplyTo(queueDeclare.getQueue());
        createChannel.queueBind(getReplyTo(), this.endpoint.getExchangeName(), getReplyTo());
        if (connect instanceof AutorecoveringConnection) {
            ((AutorecoveringConnection) connect).addQueueRecoveryListener((str, str2) -> {
                LOG.debug("Temporary queue name {} was changed to {}. Updating replyTo.", str, str2);
                setReplyTo(str2);
                LOG.debug("Trying to rebind the new temporary queue to update routingKey");
                try {
                    createChannel.queueBind(str2, this.endpoint.getExchangeName(), str2);
                    createChannel.queueUnbind(str2, this.endpoint.getExchangeName(), str);
                } catch (IOException e) {
                    LOG.warn("Failed to bind or unbind a queue. This exception is ignored.", e);
                }
            });
        }
        this.consumer = new RabbitConsumer(this, createChannel);
        this.consumer.start();
        return connect;
    }

    @Override // org.apache.camel.component.rabbitmq.reply.ReplyManagerSupport, org.apache.camel.support.service.BaseService
    protected void doStop() throws Exception {
        super.doStop();
        this.consumer.stop();
    }
}
