/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.amqp.rabbit.listener;

import com.rabbitmq.client.Channel;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.springframework.amqp.AmqpTimeoutException;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

public class DirectReplyToMessageListenerContainer
extends DirectMessageListenerContainer {
    private static final int DEFAULT_IDLE = 60000;
    private final ConcurrentMap<Channel, DirectMessageListenerContainer.SimpleConsumer> inUseConsumerChannels = new ConcurrentHashMap<Channel, DirectMessageListenerContainer.SimpleConsumer>();
    private final ConcurrentMap<DirectMessageListenerContainer.SimpleConsumer, Long> whenUsed = new ConcurrentHashMap<DirectMessageListenerContainer.SimpleConsumer, Long>();
    private final AtomicInteger consumerCount = new AtomicInteger();

    public DirectReplyToMessageListenerContainer(ConnectionFactory connectionFactory) {
        super(connectionFactory);
        super.setQueueNames("amq.rabbitmq.reply-to");
        this.setAcknowledgeMode(AcknowledgeMode.NONE);
        super.setConsumersPerQueue(0);
        super.setIdleEventInterval(60000L);
    }

    @Override
    public final void setConsumersPerQueue(int consumersPerQueue) {
        throw new UnsupportedOperationException();
    }

    @Override
    public final void setMonitorInterval(long monitorInterval) {
        throw new UnsupportedOperationException();
    }

    @Override
    public final void setQueueNames(String ... queueName) {
        throw new UnsupportedOperationException();
    }

    @Override
    public final void addQueueNames(String ... queueNames) {
        throw new UnsupportedOperationException();
    }

    @Override
    public final boolean removeQueueNames(String ... queueNames) {
        throw new UnsupportedOperationException();
    }

    @Override
    public void setMessageListener(MessageListener messageListener) {
        if (messageListener instanceof ChannelAwareMessageListener) {
            super.setMessageListener((message, channel) -> {
                try {
                    ((ChannelAwareMessageListener)messageListener).onMessage(message, channel);
                }
                finally {
                    this.inUseConsumerChannels.remove(channel);
                }
            });
        } else {
            super.setMessageListener((message, channel) -> {
                try {
                    messageListener.onMessage(message);
                }
                finally {
                    this.inUseConsumerChannels.remove(channel);
                }
            });
        }
    }

    @Override
    protected void doStart() {
        if (!this.isRunning()) {
            this.consumerCount.set(0);
            super.setConsumersPerQueue(0);
            super.doStart();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void processMonitorTask() {
        long reduce;
        long now = System.currentTimeMillis();
        this.consumersLock.lock();
        try {
            reduce = this.consumers.stream().filter(c -> this.whenUsed.containsKey(c) && !this.inUseConsumerChannels.containsValue(c) && (Long)this.whenUsed.get(c) < now - this.getIdleEventInterval()).count();
        }
        finally {
            this.consumersLock.unlock();
        }
        if (reduce > 0L) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Reducing idle consumes by " + reduce);
            }
            super.setConsumersPerQueue(this.consumerCount.updateAndGet(current -> (int)Math.max(0L, (long)current - reduce)));
        }
    }

    @Override
    protected int findIdleConsumer() {
        for (int i2 = 0; i2 < this.consumers.size(); ++i2) {
            if (this.inUseConsumerChannels.containsValue(this.consumers.get(i2))) continue;
            return i2;
        }
        return -1;
    }

    @Override
    protected void consumerRemoved(DirectMessageListenerContainer.SimpleConsumer consumer) {
        this.inUseConsumerChannels.remove(consumer.getChannel());
        this.whenUsed.remove(consumer);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ChannelHolder getChannelHolder() {
        ChannelHolder channelHolder = null;
        while (channelHolder == null) {
            if (!this.isRunning()) {
                throw new IllegalStateException("Direct reply-to container is not running");
            }
            this.consumersLock.lock();
            try {
                for (DirectMessageListenerContainer.SimpleConsumer consumer : this.consumers) {
                    Channel candidate = consumer.getChannel();
                    if (!candidate.isOpen() || this.inUseConsumerChannels.putIfAbsent(candidate, consumer) != null) continue;
                    channelHolder = new ChannelHolder(candidate, consumer.incrementAndGetEpoch());
                    this.whenUsed.put(consumer, System.currentTimeMillis());
                    break;
                }
            }
            finally {
                this.consumersLock.unlock();
            }
            if (channelHolder != null) continue;
            try {
                super.setConsumersPerQueue(this.consumerCount.incrementAndGet());
            }
            catch (AmqpTimeoutException timeoutException) {
                this.consumerCount.decrementAndGet();
            }
        }
        return channelHolder;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void releaseConsumerFor(ChannelHolder channelHolder, boolean cancelConsumer, @Nullable String message) {
        this.consumersLock.lock();
        try {
            DirectMessageListenerContainer.SimpleConsumer consumer = (DirectMessageListenerContainer.SimpleConsumer)this.inUseConsumerChannels.get(channelHolder.getChannel());
            if (consumer != null && consumer.getEpoch() == channelHolder.getConsumerEpoch()) {
                this.inUseConsumerChannels.remove(channelHolder.getChannel());
                if (cancelConsumer) {
                    Assert.isTrue(message != null, "A 'message' is required when 'cancelConsumer' is 'true'");
                    consumer.cancelConsumer("Consumer " + this + " canceled due to " + message);
                }
            }
        }
        finally {
            this.consumersLock.unlock();
        }
    }

    public static class ChannelHolder {
        private final Channel channel;
        private final int consumerEpoch;

        ChannelHolder(Channel channel, int consumerEpoch) {
            this.channel = channel;
            this.consumerEpoch = consumerEpoch;
        }

        public Channel getChannel() {
            return this.channel;
        }

        public int getConsumerEpoch() {
            return this.consumerEpoch;
        }

        public String toString() {
            return "ChannelHolder [channel=" + this.channel + ", consumerEpoch=" + this.consumerEpoch + "]";
        }
    }
}

