/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.amqp.rabbit.listener;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import java.lang.reflect.Constructor;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.logging.Log;
import org.springframework.amqp.AmqpApplicationContextClosedException;
import org.springframework.amqp.AmqpConnectException;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.AmqpIOException;
import org.springframework.amqp.AmqpTimeoutException;
import org.springframework.amqp.ImmediateAcknowledgeAmqpException;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ChannelProxy;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils;
import org.springframework.amqp.rabbit.connection.ConsumerChannelRegistry;
import org.springframework.amqp.rabbit.connection.RabbitResourceHolder;
import org.springframework.amqp.rabbit.connection.RabbitUtils;
import org.springframework.amqp.rabbit.connection.SimpleResourceHolder;
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.AsyncConsumerStartedEvent;
import org.springframework.amqp.rabbit.listener.ConsumeOkEvent;
import org.springframework.amqp.rabbit.listener.support.ContainerUtils;
import org.springframework.amqp.rabbit.support.ActiveObjectCounter;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.lang.Nullable;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.interceptor.TransactionAttribute;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import org.springframework.util.backoff.BackOffExecution;

public class DirectMessageListenerContainer
extends AbstractMessageListenerContainer {
    private static final int START_WAIT_TIME = 60;
    private static final int DEFAULT_MONITOR_INTERVAL = 10000;
    private static final int DEFAULT_ACK_TIMEOUT = 20000;
    protected final List<SimpleConsumer> consumers = new LinkedList<SimpleConsumer>();
    private final Set<SimpleConsumer> consumersToRestart = new LinkedHashSet<SimpleConsumer>();
    private final Set<String> removedQueues = ConcurrentHashMap.newKeySet();
    private final MultiValueMap<String, SimpleConsumer> consumersByQueue = new LinkedMultiValueMap<String, SimpleConsumer>();
    private final ActiveObjectCounter<SimpleConsumer> cancellationLock = new ActiveObjectCounter();
    private TaskScheduler taskScheduler;
    private boolean taskSchedulerSet;
    private long monitorInterval = 10000L;
    private int messagesPerAck;
    private long ackTimeout = 20000L;
    private volatile boolean started;
    private volatile boolean aborted;
    private volatile boolean hasStopped;
    private volatile CountDownLatch startedLatch = new CountDownLatch(1);
    private volatile int consumersPerQueue = 1;
    private volatile ScheduledFuture<?> consumerMonitorTask;
    private volatile long lastAlertAt;
    private volatile long lastRestartAttempt;

    public DirectMessageListenerContainer() {
        this.setMissingQueuesFatal(false);
        this.doSetPossibleAuthenticationFailureFatal(false);
    }

    public DirectMessageListenerContainer(ConnectionFactory connectionFactory) {
        this.setConnectionFactory(connectionFactory);
        this.setMissingQueuesFatal(false);
        this.doSetPossibleAuthenticationFailureFatal(false);
    }

    public void setConsumersPerQueue(int consumersPerQueue) {
        if (this.isRunning()) {
            this.adjustConsumers(consumersPerQueue);
        }
        this.consumersPerQueue = consumersPerQueue;
    }

    @Override
    public final void setExclusive(boolean exclusive) {
        Assert.isTrue(!exclusive || this.consumersPerQueue == 1, "When the consumer is exclusive, the consumers per queue must be 1");
        super.setExclusive(exclusive);
    }

    public void setTaskScheduler(TaskScheduler taskScheduler) {
        this.taskScheduler = taskScheduler;
        this.taskSchedulerSet = true;
    }

    public void setMonitorInterval(long monitorInterval) {
        this.monitorInterval = monitorInterval;
    }

    @Override
    public void setQueueNames(String ... queueName) {
        Assert.state(!this.isRunning(), "Cannot set queue names while running, use add/remove");
        super.setQueueNames(queueName);
    }

    @Override
    public final void setMissingQueuesFatal(boolean missingQueuesFatal) {
        super.setMissingQueuesFatal(missingQueuesFatal);
    }

    public void setMessagesPerAck(int messagesPerAck) {
        this.messagesPerAck = messagesPerAck;
    }

    public void setAckTimeout(long ackTimeout) {
        this.ackTimeout = ackTimeout;
    }

    @Override
    public void addQueueNames(String ... queueNames) {
        Assert.notNull((Object)queueNames, "'queueNames' cannot be null");
        Assert.noNullElements((Object[])queueNames, "'queueNames' cannot contain null elements");
        try {
            Arrays.stream(queueNames).forEach(this.removedQueues::remove);
            this.addQueues(Arrays.stream(queueNames));
        }
        catch (AmqpIOException e) {
            throw new AmqpIOException("Failed to add " + Arrays.toString(queueNames), e);
        }
        super.addQueueNames(queueNames);
    }

    @Override
    public void addQueues(Queue ... queues) {
        Assert.notNull((Object)queues, "'queues' cannot be null");
        Assert.noNullElements((Object[])queues, "'queues' cannot contain null elements");
        try {
            Arrays.stream(queues).map(Queue::getActualName).forEach(this.removedQueues::remove);
            this.addQueues(Arrays.stream(queues).map(Queue::getName));
        }
        catch (AmqpIOException e) {
            throw new AmqpIOException("Failed to add " + Arrays.toString(queues), e);
        }
        super.addQueues(queues);
    }

    private void addQueues(Stream<String> queueNameStream) {
        if (this.isRunning()) {
            this.consumersLock.lock();
            try {
                this.checkStartState();
                Set<String> current = this.getQueueNamesAsSet();
                queueNameStream.forEach(queue -> {
                    if (current.contains(queue)) {
                        this.logger.warn("Queue " + queue + " is already configured for this container: " + this + ", ignoring add");
                    } else {
                        this.consumeFromQueue((String)queue);
                    }
                });
            }
            finally {
                this.consumersLock.unlock();
            }
        }
    }

    @Override
    public boolean removeQueueNames(String ... queueNames) {
        this.removeQueues(Arrays.stream(queueNames));
        return super.removeQueueNames(queueNames);
    }

    @Override
    public boolean removeQueues(Queue ... queues) {
        this.removeQueues(Arrays.stream(queues).map(Queue::getActualName));
        return super.removeQueues(queues);
    }

    private void removeQueues(Stream<String> queueNames) {
        if (this.isRunning()) {
            this.consumersLock.lock();
            try {
                this.checkStartState();
                queueNames.map(queue -> {
                    this.removedQueues.add((String)queue);
                    return (List)this.consumersByQueue.remove(queue);
                }).filter(Objects::nonNull).flatMap(Collection::stream).forEach(this::cancelConsumer);
            }
            finally {
                this.consumersLock.unlock();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void adjustConsumers(int newCount) {
        this.consumersLock.lock();
        try {
            this.checkStartState();
            this.consumersToRestart.clear();
            for (String queue : this.getQueueNames()) {
                while (this.isActive() && (this.consumersByQueue.get(queue) == null || ((List)this.consumersByQueue.get(queue)).size() < newCount)) {
                    List cBQ = (List)this.consumersByQueue.get(queue);
                    int index = 0;
                    if (cBQ != null) {
                        List<Integer> indices = cBQ.stream().map(SimpleConsumer::getIndex).sorted().toList();
                        for (index = 0; index < indices.size() && index >= indices.get(index); ++index) {
                        }
                    }
                    this.doConsumeFromQueue(queue, index);
                }
                this.reduceConsumersIfIdle(newCount, queue);
            }
        }
        finally {
            this.consumersLock.unlock();
        }
    }

    private void reduceConsumersIfIdle(int newCount, String queue) {
        List consumerList = (List)this.consumersByQueue.get(queue);
        if (consumerList != null && consumerList.size() > newCount) {
            int delta = consumerList.size() - newCount;
            for (int i2 = 0; i2 < delta; ++i2) {
                SimpleConsumer consumer;
                int index = this.findIdleConsumer();
                if (index < 0 || (consumer = (SimpleConsumer)consumerList.remove(index)) == null) continue;
                this.cancelConsumer(consumer);
            }
        }
    }

    protected int findIdleConsumer() {
        return 0;
    }

    private void checkStartState() {
        if (!this.isRunning()) {
            try {
                Assert.state(this.startedLatch.await(60L, TimeUnit.SECONDS), "Container is not started - cannot adjust queues");
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new AmqpException("Interrupted waiting for start", e);
            }
        }
    }

    @Override
    protected void doInitialize() {
        if (this.taskScheduler == null) {
            ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
            threadPoolTaskScheduler.setThreadNamePrefix(this.getListenerId() + "-consumerMonitor-");
            threadPoolTaskScheduler.afterPropertiesSet();
            this.taskScheduler = threadPoolTaskScheduler;
        }
        if (this.messagesPerAck > 0) {
            Assert.state(!this.isChannelTransacted(), "'messagesPerAck' is not allowed with transactions");
        }
    }

    @Override
    protected void doStart() {
        if (!this.started) {
            this.actualStart();
        }
    }

    @Override
    protected void doStop() {
        super.doStop();
        if (!this.taskSchedulerSet && this.taskScheduler != null) {
            ((ThreadPoolTaskScheduler)this.taskScheduler).shutdown();
            this.taskScheduler = null;
        }
    }

    protected void actualStart() {
        this.aborted = false;
        this.hasStopped = false;
        if (this.getPrefetchCount() < this.messagesPerAck) {
            this.setPrefetchCount(this.messagesPerAck);
        }
        super.doStart();
        String[] queueNames = this.getQueueNames();
        this.checkMissingQueues(queueNames);
        this.checkConnect();
        long idleEventInterval = this.getIdleEventInterval();
        if (this.taskScheduler == null) {
            this.afterPropertiesSet();
        }
        if (idleEventInterval > 0L && this.monitorInterval > idleEventInterval) {
            this.monitorInterval = idleEventInterval / 2L;
        }
        if (this.getFailedDeclarationRetryInterval() < this.monitorInterval) {
            this.monitorInterval = this.getFailedDeclarationRetryInterval();
        }
        Map<String, Queue> namesToQueues = this.getQueueNamesToQueues();
        this.lastRestartAttempt = System.currentTimeMillis();
        this.startMonitor(idleEventInterval, namesToQueues);
        if (queueNames.length > 0) {
            this.doRedeclareElementsIfNecessary();
            this.getTaskExecutor().execute(() -> this.startConsumers(queueNames));
        } else {
            this.started = true;
            this.startedLatch.countDown();
        }
        if (this.logger.isInfoEnabled()) {
            this.logger.info("Container initialized for queues: " + Arrays.asList(queueNames));
        }
    }

    protected void checkConnect() {
        if (this.isPossibleAuthenticationFailureFatal()) {
            try (Connection connection = null;){
                connection = this.getConnectionFactory().createConnection();
            }
        }
    }

    private void startMonitor(long idleEventInterval, Map<String, Queue> namesToQueues) {
        this.consumerMonitorTask = this.taskScheduler.scheduleAtFixedRate(() -> {
            long now = System.currentTimeMillis();
            this.checkIdle(idleEventInterval, now);
            this.checkConsumers(now);
            if (this.lastRestartAttempt + this.getFailedDeclarationRetryInterval() < now) {
                this.consumersLock.lock();
                try {
                    if (this.started) {
                        ArrayList<SimpleConsumer> restartableConsumers = new ArrayList<SimpleConsumer>(this.consumersToRestart);
                        this.consumersToRestart.clear();
                        if (!restartableConsumers.isEmpty()) {
                            this.doRedeclareElementsIfNecessary();
                        }
                        Iterator iterator = restartableConsumers.iterator();
                        while (iterator.hasNext()) {
                            SimpleConsumer consumer = (SimpleConsumer)iterator.next();
                            iterator.remove();
                            if (this.removedQueues.contains(consumer.getQueue())) {
                                if (!this.logger.isDebugEnabled()) continue;
                                this.logger.debug("Skipping restart of consumer, queue removed " + consumer);
                                continue;
                            }
                            if (this.logger.isDebugEnabled()) {
                                this.logger.debug("Attempting to restart consumer " + consumer);
                            }
                            if (this.restartConsumer(namesToQueues, restartableConsumers, consumer)) continue;
                            break;
                        }
                        this.lastRestartAttempt = now;
                    }
                }
                finally {
                    this.consumersLock.unlock();
                }
            }
            this.processMonitorTask();
        }, Duration.ofMillis(this.monitorInterval));
    }

    private void checkIdle(long idleEventInterval, long now) {
        if (idleEventInterval > 0L && now - this.getLastReceive() > idleEventInterval && now - this.lastAlertAt > idleEventInterval) {
            this.publishIdleContainerEvent(now - this.getLastReceive());
            this.lastAlertAt = now;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void checkConsumers(long now) {
        List<SimpleConsumer> consumersToCancel;
        this.consumersLock.lock();
        try {
            consumersToCancel = this.consumers.stream().filter(consumer -> {
                boolean open;
                boolean bl = open = consumer.getChannel().isOpen() && !consumer.isAckFailed() && !consumer.targetChanged();
                if (open && this.messagesPerAck > 1) {
                    try {
                        consumer.ackIfNecessary(now);
                    }
                    catch (Exception e) {
                        this.logger.error("Exception while sending delayed ack", e);
                    }
                }
                return !open;
            }).collect(Collectors.toList());
        }
        finally {
            this.consumersLock.unlock();
        }
        consumersToCancel.forEach(consumer -> {
            block2: {
                try {
                    RabbitUtils.closeMessageConsumer(consumer.getChannel(), Collections.singletonList(consumer.getConsumerTag()), this.isChannelTransacted());
                }
                catch (Exception e) {
                    if (!this.logger.isDebugEnabled()) break block2;
                    this.logger.debug("Error closing consumer " + consumer, e);
                }
            }
            this.logger.error("Consumer canceled - channel closed " + consumer);
            consumer.cancelConsumer("Consumer " + consumer + " channel closed");
        });
    }

    private boolean restartConsumer(Map<String, Queue> namesToQueues, List<SimpleConsumer> restartableConsumers, SimpleConsumer consumerArg) {
        String actualName;
        SimpleConsumer consumer = consumerArg;
        Queue queue = namesToQueues.get(consumer.getQueue());
        if (queue != null && !StringUtils.hasText(queue.getName()) && StringUtils.hasText(actualName = queue.getActualName())) {
            namesToQueues.remove(consumer.getQueue());
            namesToQueues.put(actualName, queue);
            consumer = new SimpleConsumer(null, null, actualName, consumer.getIndex());
        }
        try {
            this.doConsumeFromQueue(consumer.getQueue(), consumer.getIndex());
            return true;
        }
        catch (AmqpConnectException | AmqpIOException e) {
            this.logger.error("Cannot connect to server", e);
            if (e.getCause() instanceof AmqpApplicationContextClosedException) {
                this.logger.error("Application context is closed, terminating");
                this.taskScheduler.schedule(this::stop, Instant.now());
            }
            this.consumersToRestart.addAll(restartableConsumers);
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("After restart exception, consumers to restart now: " + this.consumersToRestart);
            }
            return false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void startConsumers(String[] queueNames) {
        block12: {
            this.consumersLock.lock();
            try {
                if (this.hasStopped) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Consumer start aborted - container stopping");
                    }
                    break block12;
                }
                BackOffExecution backOffExecution = this.getRecoveryBackOff().start();
                while (!this.started && this.isRunning()) {
                    this.cancellationLock.reset();
                    try {
                        for (String queue : queueNames) {
                            this.consumeFromQueue(queue);
                        }
                    }
                    catch (AmqpConnectException | AmqpIOException e) {
                        long nextBackOff = backOffExecution.nextBackOff();
                        if (nextBackOff < 0L || e.getCause() instanceof AmqpApplicationContextClosedException) {
                            this.aborted = true;
                            this.shutdown();
                            this.logger.error("Failed to start container - fatal error or backOffs exhausted", e);
                            this.taskScheduler.schedule(this::stop, Instant.now());
                            break;
                        }
                        this.logger.error("Error creating consumer; retrying in " + nextBackOff, e);
                        this.doShutdown();
                        try {
                            Thread.sleep(nextBackOff);
                        }
                        catch (InterruptedException e1) {
                            Thread.currentThread().interrupt();
                        }
                        continue;
                    }
                    this.started = true;
                    this.startedLatch.countDown();
                }
            }
            finally {
                this.consumersLock.unlock();
            }
        }
    }

    protected void doRedeclareElementsIfNecessary() {
        String routingLookupKey = this.getRoutingLookupKey();
        if (routingLookupKey != null) {
            SimpleResourceHolder.push(this.getRoutingConnectionFactory(), routingLookupKey);
        }
        try {
            this.redeclareElementsIfNecessary();
        }
        catch (Exception e) {
            this.logger.error("Failed to redeclare elements", e);
        }
        finally {
            if (routingLookupKey != null) {
                SimpleResourceHolder.pop(this.getRoutingConnectionFactory());
            }
        }
    }

    protected void processMonitorTask() {
    }

    private void checkMissingQueues(String[] queueNames) {
        if (this.isMissingQueuesFatal()) {
            AmqpAdmin checkAdmin = this.getAmqpAdmin();
            if (checkAdmin == null) {
                try {
                    Class<?> clazz = ClassUtils.forName("org.springframework.amqp.rabbit.core.RabbitAdmin", ClassUtils.getDefaultClassLoader());
                    Constructor<?> ctor = clazz.getConstructor(ConnectionFactory.class);
                    checkAdmin = (AmqpAdmin)ctor.newInstance(this.getConnectionFactory());
                    this.setAmqpAdmin(checkAdmin);
                }
                catch (Exception e) {
                    this.logger.error("Failed to create a RabbitAdmin", e);
                }
            }
            if (checkAdmin != null) {
                for (String queue : queueNames) {
                    Properties queueProperties = checkAdmin.getQueueProperties(queue);
                    if (queueProperties != null || !this.isMissingQueuesFatal()) continue;
                    throw new IllegalStateException("At least one of the configured queues is missing");
                }
            }
        }
    }

    private void consumeFromQueue(String queue) {
        List list = (List)this.consumersByQueue.get(queue);
        if (CollectionUtils.isEmpty(list)) {
            for (int i2 = 0; i2 < this.consumersPerQueue; ++i2) {
                this.doConsumeFromQueue(queue, i2);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doConsumeFromQueue(String queue, int index) {
        if (!this.isActive()) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Consume from queue " + queue + " ignore, container stopping");
            }
            return;
        }
        String routingLookupKey = this.getRoutingLookupKey();
        if (routingLookupKey != null) {
            SimpleResourceHolder.push(this.getRoutingConnectionFactory(), routingLookupKey);
        }
        Connection connection = null;
        try {
            connection = this.getConnectionFactory().createConnection();
        }
        catch (Exception e) {
            this.publishConsumerFailedEvent(e.getMessage(), false, e);
            this.addConsumerToRestart(new SimpleConsumer(null, null, queue, index));
            throw e instanceof AmqpConnectException ? (AmqpConnectException)e : new AmqpConnectException(e);
        }
        finally {
            if (routingLookupKey != null) {
                SimpleResourceHolder.pop(this.getRoutingConnectionFactory());
            }
        }
        SimpleConsumer consumer = this.consume(queue, index, connection);
        this.consumersLock.lock();
        try {
            if (consumer != null) {
                ApplicationEventPublisher applicationEventPublisher;
                this.cancellationLock.add(consumer);
                this.consumers.add(consumer);
                this.consumersByQueue.add(queue, consumer);
                if (this.logger.isInfoEnabled()) {
                    this.logger.info(consumer + " started");
                }
                if ((applicationEventPublisher = this.getApplicationEventPublisher()) != null) {
                    applicationEventPublisher.publishEvent(new AsyncConsumerStartedEvent((Object)this, consumer));
                }
            }
        }
        finally {
            this.consumersLock.unlock();
        }
    }

    @Nullable
    private SimpleConsumer consume(String queue, int index, Connection connection) {
        Channel channel = null;
        SimpleConsumer consumer = null;
        try {
            if (this.getConsumeDelay() > 0L) {
                try {
                    Thread.sleep(this.getConsumeDelay());
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            channel = connection.createChannel(this.isChannelTransacted());
            channel.basicQos(this.getPrefetchCount(), this.isGlobalQos());
            consumer = new SimpleConsumer(connection, channel, queue, index);
            channel.queueDeclarePassive(queue);
            consumer.consumerTag = channel.basicConsume(queue, this.getAcknowledgeMode().isAutoAck(), this.getConsumerTagStrategy() != null ? this.getConsumerTagStrategy().createConsumerTag(queue) : "", this.isNoLocal(), this.isExclusive(), this.getConsumerArguments(), consumer);
        }
        catch (AmqpApplicationContextClosedException e) {
            throw new AmqpConnectException(e);
        }
        catch (AmqpTimeoutException timeoutException) {
            throw timeoutException;
        }
        catch (Exception e) {
            RabbitUtils.closeChannel(channel);
            RabbitUtils.closeConnection(connection);
            consumer = this.handleConsumeException(queue, index, consumer, e);
        }
        return consumer;
    }

    @Nullable
    private SimpleConsumer handleConsumeException(String queue, int index, @Nullable SimpleConsumer consumerArg, Exception ex) {
        SimpleConsumer consumer = consumerArg;
        if (RabbitUtils.exclusiveAccesssRefused(ex)) {
            this.getExclusiveConsumerExceptionLogger().log(this.logger, "Exclusive consumer failure", ex.getCause());
            this.publishConsumerFailedEvent("Consumer raised exception, attempting restart", false, ex);
        } else if (ex.getCause() instanceof ShutdownSignalException && RabbitUtils.isPassiveDeclarationChannelClose((ShutdownSignalException)ex.getCause())) {
            this.publishMissingQueueEvent(queue);
            this.logger.error("Queue not present, scheduling consumer " + (consumer == null ? "for queue " + queue : consumer) + " for restart", ex);
        } else if (this.logger.isWarnEnabled()) {
            this.logger.warn("basicConsume failed, scheduling consumer " + (consumer == null ? "for queue " + queue : consumer) + " for restart", ex);
        }
        if (consumer == null) {
            this.addConsumerToRestart(new SimpleConsumer(null, null, queue, index));
        } else {
            this.addConsumerToRestart(consumer);
            consumer = null;
        }
        return consumer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void shutdownAndWaitOrCallback(@Nullable Runnable callback) {
        LinkedList<SimpleConsumer> canceledConsumers = null;
        boolean waitForConsumers = false;
        this.consumersLock.lock();
        try {
            if (this.started || this.aborted) {
                canceledConsumers = new LinkedList<SimpleConsumer>(this.consumers);
                this.actualShutDown(canceledConsumers);
                waitForConsumers = true;
            }
        }
        finally {
            this.consumersLock.unlock();
        }
        if (waitForConsumers) {
            LinkedList<SimpleConsumer> consumersToWait = canceledConsumers;
            Runnable awaitShutdown = () -> {
                try {
                    if (this.cancellationLock.await(this.getShutdownTimeout(), TimeUnit.MILLISECONDS)) {
                        this.logger.info("Successfully waited for consumers to finish.");
                    } else {
                        this.logger.info("Consumers not finished.");
                        if (this.isForceCloseChannel() || this.stopNow.get()) {
                            consumersToWait.forEach(consumer -> {
                                String eventMessage = "Closing channel for unresponsive consumer: " + consumer;
                                if (this.logger.isWarnEnabled()) {
                                    this.logger.warn(eventMessage);
                                }
                                consumer.cancelConsumer(eventMessage);
                            });
                        }
                    }
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    this.logger.warn("Interrupted waiting for consumers. Continuing with shutdown.");
                }
                finally {
                    this.startedLatch = new CountDownLatch(1);
                    this.started = false;
                    this.aborted = false;
                    this.hasStopped = true;
                }
                this.stopNow.set(false);
                this.runCallbackIfNotNull(callback);
            };
            if (callback == null) {
                awaitShutdown.run();
            } else {
                this.getTaskExecutor().execute(awaitShutdown);
            }
        }
    }

    private void runCallbackIfNotNull(@Nullable Runnable callback) {
        if (callback != null) {
            callback.run();
        }
    }

    private void actualShutDown(List<SimpleConsumer> consumers) {
        Assert.state(this.getTaskExecutor() != null, "Cannot shut down if not initialized");
        this.logger.debug("Shutting down");
        if (this.isForceStop()) {
            this.stopNow.set(true);
        } else {
            consumers.forEach(this::cancelConsumer);
        }
        this.consumers.clear();
        this.consumersByQueue.clear();
        this.logger.debug("All consumers canceled");
        if (this.consumerMonitorTask != null) {
            this.consumerMonitorTask.cancel(true);
            this.consumerMonitorTask = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void cancelConsumer(SimpleConsumer consumer) {
        try {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Canceling " + consumer);
            }
            consumer.lock.lock();
            try {
                consumer.setCanceled(true);
                if (this.messagesPerAck > 1) {
                    try {
                        consumer.ackIfNecessary(0L);
                    }
                    catch (Exception e) {
                        this.logger.error("Exception while sending delayed ack", e);
                    }
                }
            }
            finally {
                consumer.lock.unlock();
            }
            RabbitUtils.cancel(consumer.getChannel(), consumer.getConsumerTag());
        }
        finally {
            this.consumers.remove(consumer);
            this.consumerRemoved(consumer);
        }
    }

    private void addConsumerToRestart(SimpleConsumer consumer) {
        this.consumersToRestart.add(consumer);
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Consumers to restart now: " + this.consumersToRestart);
        }
    }

    protected void consumerRemoved(SimpleConsumer consumer) {
    }

    protected final class SimpleConsumer
    extends DefaultConsumer {
        private final Log logger;
        private final Connection connection;
        private final String queue;
        private final int index;
        private final boolean ackRequired;
        private final ConnectionFactory connectionFactory;
        private final PlatformTransactionManager transactionManager;
        private final TransactionAttribute transactionAttribute;
        private final boolean isRabbitTxManager;
        private final int messagesPerAck;
        private final long ackTimeout;
        private final Channel targetChannel;
        private final Lock lock;
        private int pendingAcks;
        private long lastAck;
        private long latestDeferredDeliveryTag;
        private volatile String consumerTag;
        private volatile int epoch;
        private volatile TransactionTemplate transactionTemplate;
        private volatile boolean canceled;
        private volatile boolean ackFailed;

        SimpleConsumer(@Nullable Connection connection, Channel channel, String queue, int index) {
            super(channel);
            this.logger = DirectMessageListenerContainer.this.logger;
            this.connectionFactory = DirectMessageListenerContainer.this.getConnectionFactory();
            this.transactionManager = DirectMessageListenerContainer.this.getTransactionManager();
            this.transactionAttribute = DirectMessageListenerContainer.this.getTransactionAttribute();
            this.isRabbitTxManager = this.transactionManager instanceof RabbitTransactionManager;
            this.messagesPerAck = DirectMessageListenerContainer.this.messagesPerAck;
            this.ackTimeout = DirectMessageListenerContainer.this.ackTimeout;
            this.lock = new ReentrantLock();
            this.lastAck = System.currentTimeMillis();
            this.connection = connection;
            this.queue = queue;
            this.index = index;
            boolean bl = this.ackRequired = !DirectMessageListenerContainer.this.getAcknowledgeMode().isAutoAck() && !DirectMessageListenerContainer.this.getAcknowledgeMode().isManual();
            if (channel instanceof ChannelProxy) {
                ChannelProxy proxy = (ChannelProxy)channel;
                this.targetChannel = proxy.getTargetChannel();
            } else {
                this.targetChannel = null;
            }
        }

        String getQueue() {
            return this.queue;
        }

        int getIndex() {
            return this.index;
        }

        @Override
        public String getConsumerTag() {
            return this.consumerTag;
        }

        int getEpoch() {
            return this.epoch;
        }

        void setCanceled(boolean canceled) {
            this.canceled = canceled;
        }

        boolean isAckFailed() {
            return this.ackFailed;
        }

        boolean targetChanged() {
            return this.targetChannel != null && !((ChannelProxy)this.getChannel()).getTargetChannel().equals(this.targetChannel);
        }

        int incrementAndGetEpoch() {
            return ++this.epoch;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Unable to fully structure code
         */
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
            if (!this.getChannel().isOpen()) {
                this.logger.debug("Discarding prefetch, channel closed");
                return;
            }
            messageProperties = DirectMessageListenerContainer.this.getMessagePropertiesConverter().toMessageProperties(properties, envelope, "UTF-8");
            messageProperties.setConsumerTag(consumerTag);
            messageProperties.setConsumerQueue(this.queue);
            message = new Message(body, messageProperties);
            deliveryTag = envelope.getDeliveryTag();
            if (this.logger.isDebugEnabled()) {
                this.logger.debug(this + " received " + message);
            }
            DirectMessageListenerContainer.this.updateLastReceive();
            data = message;
            debatched = DirectMessageListenerContainer.this.debatch(message);
            if (debatched != null) {
                data = debatched;
            }
            if (this.transactionManager != null) {
                try {
                    this.executeListenerInTransaction(data, deliveryTag);
                }
                catch (AbstractMessageListenerContainer.WrappedTransactionException ex) {
                    var13_13 = ex.getCause();
                    if (!(var13_13 instanceof Error)) ** GOTO lbl36
                    error = (Error)var13_13;
                    throw error;
                }
                catch (Exception var11_11) {
                }
                finally {
                    if (this.isRabbitTxManager) {
                        ConsumerChannelRegistry.unRegisterConsumerChannel();
                    }
                }
            } else {
                try {
                    this.callExecuteListener(data, deliveryTag);
                }
                catch (Exception var11_12) {
                    // empty catch block
                }
            }
lbl36:
            // 5 sources

            if (DirectMessageListenerContainer.this.stopNow.get()) {
                this.closeChannel();
            }
        }

        private void executeListenerInTransaction(Object data, long deliveryTag) {
            if (this.isRabbitTxManager) {
                ConsumerChannelRegistry.registerConsumerChannel(this.getChannel(), this.connectionFactory);
            }
            if (this.transactionTemplate == null) {
                this.transactionTemplate = new TransactionTemplate(this.transactionManager, this.transactionAttribute);
            }
            this.transactionTemplate.execute(s -> {
                RabbitResourceHolder resourceHolder = ConnectionFactoryUtils.bindResourceToTransaction(new RabbitResourceHolder(this.getChannel(), false), this.connectionFactory, true);
                if (resourceHolder != null) {
                    resourceHolder.addDeliveryTag(this.getChannel(), deliveryTag);
                }
                try {
                    this.callExecuteListener(data, deliveryTag);
                }
                catch (RuntimeException e1) {
                    DirectMessageListenerContainer.this.prepareHolderForRollback(resourceHolder, e1);
                    throw e1;
                }
                catch (Throwable e2) {
                    throw new AbstractMessageListenerContainer.WrappedTransactionException(e2);
                }
                return null;
            });
        }

        private void callExecuteListener(Object data, long deliveryTag) {
            boolean channelLocallyTransacted = DirectMessageListenerContainer.this.isChannelLocallyTransacted();
            try {
                DirectMessageListenerContainer.this.executeListener(this.getChannel(), data);
                this.handleAck(deliveryTag, channelLocallyTransacted);
            }
            catch (ImmediateAcknowledgeAmqpException e) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("User requested ack for failed delivery '" + e.getMessage() + "': " + deliveryTag);
                }
                this.handleAck(deliveryTag, channelLocallyTransacted);
            }
            catch (Exception e) {
                if (DirectMessageListenerContainer.this.causeChainHasImmediateAcknowledgeAmqpException(e)) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("User requested ack for failed delivery: " + deliveryTag);
                    }
                    this.handleAck(deliveryTag, channelLocallyTransacted);
                } else {
                    this.logger.error("Failed to invoke listener", e);
                    if (this.transactionManager != null) {
                        if (this.transactionAttribute.rollbackOn(e)) {
                            RabbitResourceHolder resourceHolder = (RabbitResourceHolder)TransactionSynchronizationManager.getResource(DirectMessageListenerContainer.this.getConnectionFactory());
                            if (resourceHolder == null) {
                                this.rollback(deliveryTag, e);
                            }
                            throw e;
                        }
                        if (this.logger.isDebugEnabled()) {
                            this.logger.debug("No rollback for " + e);
                        }
                    } else {
                        this.rollback(deliveryTag, e);
                    }
                }
            }
            catch (Error e) {
                this.logger.error("Failed to invoke listener", e);
                DirectMessageListenerContainer.this.getJavaLangErrorHandler().handle(e);
                throw e;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void handleAck(long deliveryTag, boolean channelLocallyTransacted) {
            boolean isLocallyTransacted = channelLocallyTransacted || DirectMessageListenerContainer.this.isChannelTransacted() && TransactionSynchronizationManager.getResource(this.connectionFactory) == null;
            try {
                if (this.ackRequired) {
                    if (this.messagesPerAck > 1) {
                        this.lock.lock();
                        try {
                            this.latestDeferredDeliveryTag = deliveryTag;
                            ++this.pendingAcks;
                            this.ackIfNecessary(this.lastAck);
                        }
                        finally {
                            this.lock.unlock();
                        }
                    } else if (!DirectMessageListenerContainer.this.isChannelTransacted() || isLocallyTransacted) {
                        this.sendAckWithNotify(deliveryTag, false);
                    }
                }
                if (isLocallyTransacted) {
                    RabbitUtils.commitIfNecessary(this.getChannel());
                }
            }
            catch (Exception e) {
                this.ackFailed = true;
                this.logger.error("Error acking", e);
            }
        }

        void ackIfNecessary(long now) throws Exception {
            if (this.pendingAcks >= this.messagesPerAck || this.pendingAcks > 0 && (now - this.lastAck > this.ackTimeout || this.canceled)) {
                this.sendAck(now);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void rollback(long deliveryTag, Exception e) {
            if (DirectMessageListenerContainer.this.isChannelTransacted()) {
                RabbitUtils.rollbackIfNecessary(this.getChannel());
            }
            if (this.ackRequired || ContainerUtils.isRejectManual(e)) {
                try {
                    if (this.messagesPerAck > 1) {
                        this.lock.lock();
                        try {
                            if (this.pendingAcks > 0) {
                                this.sendAck(System.currentTimeMillis());
                            }
                        }
                        finally {
                            this.lock.unlock();
                        }
                    }
                    this.getChannel().basicNack(deliveryTag, !DirectMessageListenerContainer.this.isAsyncReplies(), ContainerUtils.shouldRequeue(DirectMessageListenerContainer.this.isDefaultRequeueRejected(), e, this.logger));
                }
                catch (Exception e1) {
                    this.logger.error("Failed to nack message", e1);
                }
            }
            if (DirectMessageListenerContainer.this.isChannelTransacted()) {
                RabbitUtils.commitIfNecessary(this.getChannel());
            }
        }

        void sendAck(long now) throws Exception {
            this.sendAckWithNotify(this.latestDeferredDeliveryTag, true);
            this.lastAck = now;
            this.pendingAcks = 0;
        }

        private void sendAckWithNotify(long deliveryTag, boolean multiple) throws Exception {
            try {
                this.getChannel().basicAck(deliveryTag, multiple);
                this.notifyMessageAckListener(true, deliveryTag, null);
            }
            catch (Exception e) {
                this.notifyMessageAckListener(false, deliveryTag, e);
                throw e;
            }
        }

        private void notifyMessageAckListener(boolean success, long deliveryTag, @Nullable Throwable cause) {
            try {
                DirectMessageListenerContainer.this.getMessageAckListener().onComplete(success, deliveryTag, cause);
            }
            catch (Exception e) {
                this.logger.error("An exception occurred on MessageAckListener.", e);
            }
        }

        @Override
        public void handleConsumeOk(String consumerTag) {
            super.handleConsumeOk(consumerTag);
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("New " + this + " consumeOk");
            }
            if (DirectMessageListenerContainer.this.getApplicationEventPublisher() != null) {
                DirectMessageListenerContainer.this.getApplicationEventPublisher().publishEvent(new ConsumeOkEvent(this, this.getQueue(), consumerTag));
            }
        }

        @Override
        public void handleCancelOk(String consumerTag) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("CancelOk " + this);
            }
            this.finalizeConsumer();
        }

        @Override
        public void handleCancel(String consumerTag) {
            this.logger.error("Consumer canceled - queue deleted? " + this);
            this.cancelConsumer("Consumer " + this + " canceled");
        }

        void cancelConsumer(String eventMessage) {
            DirectMessageListenerContainer.this.publishConsumerFailedEvent(eventMessage, true, null);
            DirectMessageListenerContainer.this.consumersLock.lock();
            try {
                List list = (List)DirectMessageListenerContainer.this.consumersByQueue.get(this.queue);
                if (list != null) {
                    list.remove(this);
                }
                DirectMessageListenerContainer.this.consumers.remove(this);
                DirectMessageListenerContainer.this.addConsumerToRestart(this);
            }
            finally {
                DirectMessageListenerContainer.this.consumersLock.unlock();
            }
            this.finalizeConsumer();
        }

        private void finalizeConsumer() {
            this.closeChannel();
            DirectMessageListenerContainer.this.consumerRemoved(this);
        }

        private void closeChannel() {
            RabbitUtils.setPhysicalCloseRequired(this.getChannel(), true);
            RabbitUtils.closeChannel(this.getChannel());
            RabbitUtils.closeConnection(this.connection);
            DirectMessageListenerContainer.this.cancellationLock.release(this);
        }

        public int hashCode() {
            int prime = 31;
            int result = 1;
            result = 31 * result + this.getEnclosingInstance().hashCode();
            result = 31 * result + this.index;
            result = 31 * result + (this.queue == null ? 0 : this.queue.hashCode());
            return result;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null) {
                return false;
            }
            if (this.getClass() != obj.getClass()) {
                return false;
            }
            SimpleConsumer other = (SimpleConsumer)obj;
            if (!this.getEnclosingInstance().equals(other.getEnclosingInstance())) {
                return false;
            }
            if (this.index != other.index) {
                return false;
            }
            if (this.queue == null) {
                return other.queue == null;
            }
            return this.queue.equals(other.queue);
        }

        private DirectMessageListenerContainer getEnclosingInstance() {
            return DirectMessageListenerContainer.this;
        }

        public String toString() {
            return "SimpleConsumer [queue=" + this.queue + ", index=" + this.index + ", consumerTag=" + this.consumerTag + " identity=" + ObjectUtils.getIdentityHexString(this) + "]";
        }
    }
}

