/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rocketmq.client.impl.consumer;

import io.netty.util.internal.ConcurrentSet;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.impl.consumer.ConsumeMessageService;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.apache.rocketmq.client.impl.consumer.MessageQueueLock;
import org.apache.rocketmq.client.impl.consumer.PopProcessQueue;
import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
import org.apache.rocketmq.client.stat.ConsumerStatsManager;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
import org.apache.rocketmq.remoting.protocol.body.CMResult;
import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;

public class ConsumeMessagePopOrderlyService
implements ConsumeMessageService {
    private static final Logger log = LoggerFactory.getLogger(ConsumeMessagePopOrderlyService.class);
    private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
    private final DefaultMQPushConsumer defaultMQPushConsumer;
    private final MessageListenerOrderly messageListener;
    private final BlockingQueue<Runnable> consumeRequestQueue;
    private final ConcurrentSet<ConsumeRequest> consumeRequestSet = new ConcurrentSet();
    private final ThreadPoolExecutor consumeExecutor;
    private final String consumerGroup;
    private final MessageQueueLock messageQueueLock = new MessageQueueLock();
    private final MessageQueueLock consumeRequestLock = new MessageQueueLock();
    private final ScheduledExecutorService scheduledExecutorService;
    private volatile boolean stopped = false;

    public ConsumeMessagePopOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, MessageListenerOrderly messageListener) {
        this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
        this.messageListener = messageListener;
        this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
        this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
        this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
        this.consumeExecutor = new ThreadPoolExecutor(this.defaultMQPushConsumer.getConsumeThreadMin(), this.defaultMQPushConsumer.getConsumeThreadMax(), 60000L, TimeUnit.MILLISECONDS, this.consumeRequestQueue, new ThreadFactoryImpl("ConsumeMessageThread_"));
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
    }

    @Override
    public void start() {
        if (MessageModel.CLUSTERING.equals((Object)this.defaultMQPushConsumerImpl.messageModel())) {
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable(){

                @Override
                public void run() {
                    ConsumeMessagePopOrderlyService.this.lockMQPeriodically();
                }
            }, 1000L, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
        }
    }

    @Override
    public void shutdown(long awaitTerminateMillis) {
        this.stopped = true;
        this.scheduledExecutorService.shutdown();
        ThreadUtils.shutdownGracefully(this.consumeExecutor, awaitTerminateMillis, TimeUnit.MILLISECONDS);
        if (MessageModel.CLUSTERING.equals((Object)this.defaultMQPushConsumerImpl.messageModel())) {
            this.unlockAllMessageQueues();
        }
    }

    public synchronized void unlockAllMessageQueues() {
        this.defaultMQPushConsumerImpl.getRebalanceImpl().unlockAll(false);
    }

    @Override
    public void updateCorePoolSize(int corePoolSize) {
        if (corePoolSize > 0 && corePoolSize <= Short.MAX_VALUE && corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) {
            this.consumeExecutor.setCorePoolSize(corePoolSize);
        }
    }

    @Override
    public void incCorePoolSize() {
    }

    @Override
    public void decCorePoolSize() {
    }

    @Override
    public int getCorePoolSize() {
        return this.consumeExecutor.getCorePoolSize();
    }

    @Override
    public ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt msg, String brokerName) {
        ConsumeMessageDirectlyResult result = new ConsumeMessageDirectlyResult();
        result.setOrder(true);
        ArrayList<MessageExt> msgs = new ArrayList<MessageExt>();
        msgs.add(msg);
        MessageQueue mq = new MessageQueue();
        mq.setBrokerName(brokerName);
        mq.setTopic(msg.getTopic());
        mq.setQueueId(msg.getQueueId());
        ConsumeOrderlyContext context = new ConsumeOrderlyContext(mq);
        this.defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, this.consumerGroup);
        long beginTime = System.currentTimeMillis();
        log.info("consumeMessageDirectly receive new message: {}", (Object)msg);
        try {
            ConsumeOrderlyStatus status = this.messageListener.consumeMessage(msgs, context);
            if (status != null) {
                switch (status) {
                    case COMMIT: {
                        result.setConsumeResult(CMResult.CR_COMMIT);
                        break;
                    }
                    case ROLLBACK: {
                        result.setConsumeResult(CMResult.CR_ROLLBACK);
                        break;
                    }
                    case SUCCESS: {
                        result.setConsumeResult(CMResult.CR_SUCCESS);
                        break;
                    }
                    case SUSPEND_CURRENT_QUEUE_A_MOMENT: {
                        result.setConsumeResult(CMResult.CR_LATER);
                        break;
                    }
                }
            } else {
                result.setConsumeResult(CMResult.CR_RETURN_NULL);
            }
        }
        catch (Throwable e) {
            result.setConsumeResult(CMResult.CR_THROW_EXCEPTION);
            result.setRemark(UtilAll.exceptionSimpleDesc(e));
            log.warn("consumeMessageDirectly exception: {} Group: {} Msgs: {} MQ: {}", UtilAll.exceptionSimpleDesc(e), this.consumerGroup, msgs, mq, e);
        }
        result.setAutoCommit(context.isAutoCommit());
        result.setSpentTimeMills(System.currentTimeMillis() - beginTime);
        log.info("consumeMessageDirectly Result: {}", (Object)result);
        return result;
    }

    @Override
    public void submitConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue, boolean dispathToConsume) {
        throw new UnsupportedOperationException();
    }

    @Override
    public void submitPopConsumeRequest(List<MessageExt> msgs, PopProcessQueue processQueue, MessageQueue messageQueue) {
        ConsumeRequest req = new ConsumeRequest(processQueue, messageQueue);
        this.submitConsumeRequest(req, false);
    }

    public synchronized void lockMQPeriodically() {
        if (!this.stopped) {
            this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();
        }
    }

    private void removeConsumeRequest(ConsumeRequest consumeRequest) {
        this.consumeRequestSet.remove(consumeRequest);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void submitConsumeRequest(ConsumeRequest consumeRequest, boolean force) {
        Object lock;
        Object object = lock = this.consumeRequestLock.fetchLockObject(consumeRequest.getMessageQueue(), consumeRequest.shardingKeyIndex);
        synchronized (object) {
            boolean isNewReq = this.consumeRequestSet.add(consumeRequest);
            if (force || isNewReq) {
                try {
                    this.consumeExecutor.submit(consumeRequest);
                }
                catch (Exception e) {
                    log.error("error submit consume request: {}, mq: {}, shardingKeyIndex: {}", e.toString(), consumeRequest.getMessageQueue(), consumeRequest.getShardingKeyIndex());
                }
            }
        }
    }

    private void submitConsumeRequestLater(final ConsumeRequest consumeRequest, long suspendTimeMillis) {
        long timeMillis = suspendTimeMillis;
        if (timeMillis == -1L) {
            timeMillis = this.defaultMQPushConsumer.getSuspendCurrentQueueTimeMillis();
        }
        if (timeMillis < 10L) {
            timeMillis = 10L;
        } else if (timeMillis > 30000L) {
            timeMillis = 30000L;
        }
        this.scheduledExecutorService.schedule(new Runnable(){

            @Override
            public void run() {
                ConsumeMessagePopOrderlyService.this.submitConsumeRequest(consumeRequest, true);
            }
        }, timeMillis, TimeUnit.MILLISECONDS);
    }

    public boolean processConsumeResult(List<MessageExt> msgs, ConsumeOrderlyStatus status, ConsumeOrderlyContext context, ConsumeRequest consumeRequest) {
        return true;
    }

    public ConsumerStatsManager getConsumerStatsManager() {
        return this.defaultMQPushConsumerImpl.getConsumerStatsManager();
    }

    private int getMaxReconsumeTimes() {
        if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) {
            return Integer.MAX_VALUE;
        }
        return this.defaultMQPushConsumer.getMaxReconsumeTimes();
    }

    private boolean checkReconsumeTimes(List<MessageExt> msgs) {
        boolean suspend = false;
        if (msgs != null && !msgs.isEmpty()) {
            for (MessageExt msg : msgs) {
                if (msg.getReconsumeTimes() >= this.getMaxReconsumeTimes()) {
                    MessageAccessor.setReconsumeTime(msg, String.valueOf(msg.getReconsumeTimes()));
                    if (this.sendMessageBack(msg)) continue;
                    suspend = true;
                    msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                    continue;
                }
                suspend = true;
                msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
            }
        }
        return suspend;
    }

    public boolean sendMessageBack(MessageExt msg) {
        try {
            Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
            MessageAccessor.setProperties(newMsg, msg.getProperties());
            String originMsgId = MessageAccessor.getOriginMessageId(msg);
            MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
            newMsg.setFlag(msg.getFlag());
            MessageAccessor.putProperty(newMsg, "RETRY_TOPIC", msg.getTopic());
            MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes()));
            MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(this.getMaxReconsumeTimes()));
            newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
            this.defaultMQPushConsumerImpl.getmQClientFactory().getDefaultMQProducer().send(newMsg);
            return true;
        }
        catch (Exception e) {
            log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
            return false;
        }
    }

    public void resetNamespace(List<MessageExt> msgs) {
        for (MessageExt msg : msgs) {
            if (!StringUtils.isNotEmpty(this.defaultMQPushConsumer.getNamespace())) continue;
            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
        }
    }

    class ConsumeRequest
    implements Runnable {
        private final PopProcessQueue processQueue;
        private final MessageQueue messageQueue;
        private int shardingKeyIndex = 0;

        public ConsumeRequest(PopProcessQueue processQueue, MessageQueue messageQueue) {
            this.processQueue = processQueue;
            this.messageQueue = messageQueue;
            this.shardingKeyIndex = 0;
        }

        public ConsumeRequest(PopProcessQueue processQueue, MessageQueue messageQueue, int shardingKeyIndex) {
            this.processQueue = processQueue;
            this.messageQueue = messageQueue;
            this.shardingKeyIndex = shardingKeyIndex;
        }

        public PopProcessQueue getProcessQueue() {
            return this.processQueue;
        }

        public MessageQueue getMessageQueue() {
            return this.messageQueue;
        }

        public int getShardingKeyIndex() {
            return this.shardingKeyIndex;
        }

        @Override
        public void run() {
            if (this.processQueue.isDropped()) {
                log.warn("run, message queue not be able to consume, because it's dropped. {}", (Object)this.messageQueue);
                ConsumeMessagePopOrderlyService.this.removeConsumeRequest(this);
                return;
            }
            Object objLock = ConsumeMessagePopOrderlyService.this.messageQueueLock.fetchLockObject(this.messageQueue, this.shardingKeyIndex);
        }

        public int hashCode() {
            int hash = this.shardingKeyIndex;
            if (this.processQueue != null) {
                hash += this.processQueue.hashCode() * 31;
            }
            if (this.messageQueue != null) {
                hash += this.messageQueue.hashCode() * 31;
            }
            return hash;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null) {
                return false;
            }
            if (this.getClass() != obj.getClass()) {
                return false;
            }
            ConsumeRequest other = (ConsumeRequest)obj;
            if (this.shardingKeyIndex != other.shardingKeyIndex) {
                return false;
            }
            if (this.processQueue != other.processQueue) {
                return false;
            }
            if (this.messageQueue == other.messageQueue) {
                return true;
            }
            return this.messageQueue != null && this.messageQueue.equals(other.messageQueue);
        }
    }
}

