/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rocketmq.client.impl.consumer;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.consumer.PopProcessQueue;
import org.apache.rocketmq.client.impl.consumer.PopRequest;
import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
import org.apache.rocketmq.client.impl.consumer.PullRequest;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageQueueAssignment;
import org.apache.rocketmq.common.message.MessageRequestMode;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;

public abstract class RebalanceImpl {
    protected static final Logger log = LoggerFactory.getLogger(RebalanceImpl.class);
    protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
    protected final ConcurrentMap<MessageQueue, PopProcessQueue> popProcessQueueTable = new ConcurrentHashMap<MessageQueue, PopProcessQueue>(64);
    protected final ConcurrentMap<String, Set<MessageQueue>> topicSubscribeInfoTable = new ConcurrentHashMap<String, Set<MessageQueue>>();
    protected final ConcurrentMap<String, SubscriptionData> subscriptionInner = new ConcurrentHashMap<String, SubscriptionData>();
    protected String consumerGroup;
    protected MessageModel messageModel;
    protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;
    protected MQClientInstance mQClientFactory;
    private static final int TIMEOUT_CHECK_TIMES = 3;
    private static final int QUERY_ASSIGNMENT_TIMEOUT = 3000;
    private Map<String, String> topicBrokerRebalance = new ConcurrentHashMap<String, String>();
    private Map<String, String> topicClientRebalance = new ConcurrentHashMap<String, String>();

    public RebalanceImpl(String consumerGroup, MessageModel messageModel, AllocateMessageQueueStrategy allocateMessageQueueStrategy, MQClientInstance mQClientFactory) {
        this.consumerGroup = consumerGroup;
        this.messageModel = messageModel;
        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
        this.mQClientFactory = mQClientFactory;
    }

    public void unlock(MessageQueue mq, boolean oneway) {
        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq), 0L, true);
        if (findBrokerResult != null) {
            UnlockBatchRequestBody requestBody = new UnlockBatchRequestBody();
            requestBody.setConsumerGroup(this.consumerGroup);
            requestBody.setClientId(this.mQClientFactory.getClientId());
            requestBody.getMqSet().add(mq);
            try {
                this.mQClientFactory.getMQClientAPIImpl().unlockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000L, oneway);
                log.warn("unlock messageQueue. group:{}, clientId:{}, mq:{}", this.consumerGroup, this.mQClientFactory.getClientId(), mq);
            }
            catch (Exception e) {
                log.error("unlockBatchMQ exception, " + mq, e);
            }
        }
    }

    public void unlockAll(boolean oneway) {
        HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();
        for (Map.Entry<String, Set<MessageQueue>> entry : brokerMqs.entrySet()) {
            FindBrokerResult findBrokerResult;
            String brokerName = entry.getKey();
            Set<MessageQueue> mqs = entry.getValue();
            if (mqs.isEmpty() || (findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, 0L, true)) == null) continue;
            UnlockBatchRequestBody requestBody = new UnlockBatchRequestBody();
            requestBody.setConsumerGroup(this.consumerGroup);
            requestBody.setClientId(this.mQClientFactory.getClientId());
            requestBody.setMqSet(mqs);
            try {
                this.mQClientFactory.getMQClientAPIImpl().unlockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000L, oneway);
                for (MessageQueue mq : mqs) {
                    ProcessQueue processQueue = (ProcessQueue)this.processQueueTable.get(mq);
                    if (processQueue == null) continue;
                    processQueue.setLocked(false);
                    log.info("the message queue unlock OK, Group: {} {}", (Object)this.consumerGroup, (Object)mq);
                }
            }
            catch (Exception e) {
                log.error("unlockBatchMQ exception, " + mqs, e);
            }
        }
    }

    private HashMap<String, Set<MessageQueue>> buildProcessQueueTableByBrokerName() {
        HashMap<String, Set<MessageQueue>> result = new HashMap<String, Set<MessageQueue>>();
        for (Map.Entry entry : this.processQueueTable.entrySet()) {
            MessageQueue mq = (MessageQueue)entry.getKey();
            ProcessQueue pq = (ProcessQueue)entry.getValue();
            if (pq.isDropped()) continue;
            String destBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
            Set<MessageQueue> mqs = result.get(destBrokerName);
            if (null == mqs) {
                mqs = new HashSet<MessageQueue>();
                result.put(mq.getBrokerName(), mqs);
            }
            mqs.add(mq);
        }
        return result;
    }

    public boolean lock(MessageQueue mq) {
        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq), 0L, true);
        if (findBrokerResult != null) {
            LockBatchRequestBody requestBody = new LockBatchRequestBody();
            requestBody.setConsumerGroup(this.consumerGroup);
            requestBody.setClientId(this.mQClientFactory.getClientId());
            requestBody.getMqSet().add(mq);
            try {
                Set<MessageQueue> lockedMq = this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000L);
                for (MessageQueue mmqq : lockedMq) {
                    ProcessQueue processQueue = (ProcessQueue)this.processQueueTable.get(mmqq);
                    if (processQueue == null) continue;
                    processQueue.setLocked(true);
                    processQueue.setLastLockTimestamp(System.currentTimeMillis());
                }
                boolean lockOK = lockedMq.contains(mq);
                log.info("message queue lock {}, {} {}", lockOK ? "OK" : "Failed", this.consumerGroup, mq);
                return lockOK;
            }
            catch (Exception e) {
                log.error("lockBatchMQ exception, " + mq, e);
            }
        }
        return false;
    }

    public void lockAll() {
        HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();
        for (Map.Entry<String, Set<MessageQueue>> entry : brokerMqs.entrySet()) {
            FindBrokerResult findBrokerResult;
            String brokerName = entry.getKey();
            Set<MessageQueue> mqs = entry.getValue();
            if (mqs.isEmpty() || (findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, 0L, true)) == null) continue;
            LockBatchRequestBody requestBody = new LockBatchRequestBody();
            requestBody.setConsumerGroup(this.consumerGroup);
            requestBody.setClientId(this.mQClientFactory.getClientId());
            requestBody.setMqSet(mqs);
            try {
                Set<MessageQueue> lockOKMQSet = this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000L);
                for (MessageQueue mq : mqs) {
                    ProcessQueue processQueue = (ProcessQueue)this.processQueueTable.get(mq);
                    if (processQueue == null) continue;
                    if (lockOKMQSet.contains(mq)) {
                        if (!processQueue.isLocked()) {
                            log.info("the message queue locked OK, Group: {} {}", (Object)this.consumerGroup, (Object)mq);
                        }
                        processQueue.setLocked(true);
                        processQueue.setLastLockTimestamp(System.currentTimeMillis());
                        continue;
                    }
                    processQueue.setLocked(false);
                    log.warn("the message queue locked Failed, Group: {} {}", (Object)this.consumerGroup, (Object)mq);
                }
            }
            catch (Exception e) {
                log.error("lockBatchMQ exception, " + mqs, e);
            }
        }
    }

    public boolean clientRebalance(String topic) {
        return true;
    }

    public boolean doRebalance(boolean isOrder) {
        boolean balanced = true;
        ConcurrentMap<String, SubscriptionData> subTable = this.getSubscriptionInner();
        if (subTable != null) {
            for (Map.Entry entry : subTable.entrySet()) {
                String topic = (String)entry.getKey();
                try {
                    boolean result;
                    if (!this.clientRebalance(topic) && this.tryQueryAssignment(topic)) {
                        result = this.getRebalanceResultFromBroker(topic, isOrder);
                        if (result) continue;
                        balanced = false;
                        continue;
                    }
                    result = this.rebalanceByTopic(topic, isOrder);
                    if (result) continue;
                    balanced = false;
                }
                catch (Throwable e) {
                    if (topic.startsWith("%RETRY%")) continue;
                    log.warn("rebalance Exception", e);
                    balanced = false;
                }
            }
        }
        this.truncateMessageQueueNotMyTopic();
        return balanced;
    }

    private boolean tryQueryAssignment(String topic) {
        if (this.topicClientRebalance.containsKey(topic)) {
            return false;
        }
        if (this.topicBrokerRebalance.containsKey(topic)) {
            return true;
        }
        String strategyName = this.allocateMessageQueueStrategy != null ? this.allocateMessageQueueStrategy.getName() : null;
        int retryTimes = 0;
        while (retryTimes++ < 3) {
            try {
                Set<MessageQueueAssignment> resultSet = this.mQClientFactory.queryAssignment(topic, this.consumerGroup, strategyName, this.messageModel, 1000 * retryTimes);
                this.topicBrokerRebalance.put(topic, topic);
                return true;
            }
            catch (Throwable t2) {
                if (t2 instanceof RemotingTimeoutException) continue;
                log.error("tryQueryAssignment error.", t2);
                this.topicClientRebalance.put(topic, topic);
                return false;
            }
        }
        if (retryTimes >= 3) {
            this.topicClientRebalance.put(topic, topic);
            return false;
        }
        return true;
    }

    public ConcurrentMap<String, SubscriptionData> getSubscriptionInner() {
        return this.subscriptionInner;
    }

    private boolean rebalanceByTopic(String topic, boolean isOrder) {
        boolean balanced = true;
        switch (this.messageModel) {
            case BROADCASTING: {
                Set mqSet = (Set)this.topicSubscribeInfoTable.get(topic);
                if (mqSet != null) {
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
                    if (changed) {
                        this.messageQueueChanged(topic, mqSet, mqSet);
                        log.info("messageQueueChanged {} {} {} {}", this.consumerGroup, topic, mqSet, mqSet);
                    }
                    balanced = mqSet.equals(this.getWorkingMessageQueue(topic));
                    break;
                }
                this.messageQueueChanged(topic, Collections.emptySet(), Collections.emptySet());
                log.warn("doRebalance, {}, but the topic[{}] not exist.", (Object)this.consumerGroup, (Object)topic);
                break;
            }
            case CLUSTERING: {
                boolean changed;
                Set mqSet = (Set)this.topicSubscribeInfoTable.get(topic);
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, this.consumerGroup);
                if (null == mqSet && !topic.startsWith("%RETRY%")) {
                    this.messageQueueChanged(topic, Collections.emptySet(), Collections.emptySet());
                    log.warn("doRebalance, {}, but the topic[{}] not exist.", (Object)this.consumerGroup, (Object)topic);
                }
                if (null == cidAll) {
                    log.warn("doRebalance, {} {}, get consumer id list failed", (Object)this.consumerGroup, (Object)topic);
                }
                if (mqSet == null || cidAll == null) break;
                ArrayList<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                mqAll.addAll(mqSet);
                Collections.sort(mqAll);
                Collections.sort(cidAll);
                AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
                List<MessageQueue> allocateResult = null;
                try {
                    allocateResult = strategy.allocate(this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll);
                }
                catch (Throwable e) {
                    log.error("allocate message queue exception. strategy name: {}, ex: {}", (Object)strategy.getName(), (Object)e);
                    return false;
                }
                HashSet<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                if (allocateResult != null) {
                    allocateResultSet.addAll(allocateResult);
                }
                if (changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder)) {
                    log.info("client rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", strategy.getName(), this.consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), allocateResultSet.size(), allocateResultSet);
                    this.messageQueueChanged(topic, mqSet, allocateResultSet);
                }
                balanced = allocateResultSet.equals(this.getWorkingMessageQueue(topic));
                break;
            }
        }
        return balanced;
    }

    private boolean getRebalanceResultFromBroker(String topic, boolean isOrder) {
        Set<MessageQueueAssignment> messageQueueAssignments;
        String strategyName = this.allocateMessageQueueStrategy.getName();
        try {
            messageQueueAssignments = this.mQClientFactory.queryAssignment(topic, this.consumerGroup, strategyName, this.messageModel, 3000);
        }
        catch (Exception e) {
            log.error("allocate message queue exception. strategy name: {}, ex: {}", (Object)strategyName, (Object)e);
            return false;
        }
        if (messageQueueAssignments == null) {
            return false;
        }
        HashSet<MessageQueue> mqSet = new HashSet<MessageQueue>();
        for (MessageQueueAssignment messageQueueAssignment : messageQueueAssignments) {
            if (messageQueueAssignment.getMessageQueue() == null) continue;
            mqSet.add(messageQueueAssignment.getMessageQueue());
        }
        Set<MessageQueue> mqAll = null;
        boolean changed = this.updateMessageQueueAssignment(topic, messageQueueAssignments, isOrder);
        if (changed) {
            log.info("broker rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, assignmentSet={}", strategyName, this.consumerGroup, topic, this.mQClientFactory.getClientId(), messageQueueAssignments);
            this.messageQueueChanged(topic, mqAll, mqSet);
        }
        return mqSet.equals(this.getWorkingMessageQueue(topic));
    }

    private Set<MessageQueue> getWorkingMessageQueue(String topic) {
        Object pq;
        MessageQueue mq;
        HashSet<MessageQueue> queueSet = new HashSet<MessageQueue>();
        for (Map.Entry entry : this.processQueueTable.entrySet()) {
            mq = (MessageQueue)entry.getKey();
            pq = (ProcessQueue)entry.getValue();
            if (!mq.getTopic().equals(topic) || ((ProcessQueue)pq).isDropped()) continue;
            queueSet.add(mq);
        }
        for (Map.Entry entry : this.popProcessQueueTable.entrySet()) {
            mq = (MessageQueue)entry.getKey();
            pq = (PopProcessQueue)entry.getValue();
            if (!mq.getTopic().equals(topic) || ((PopProcessQueue)pq).isDropped()) continue;
            queueSet.add(mq);
        }
        return queueSet;
    }

    private void truncateMessageQueueNotMyTopic() {
        Object pq;
        ConcurrentMap<String, SubscriptionData> subTable = this.getSubscriptionInner();
        for (MessageQueue mq : this.processQueueTable.keySet()) {
            if (subTable.containsKey(mq.getTopic()) || (pq = (ProcessQueue)this.processQueueTable.remove(mq)) == null) continue;
            ((ProcessQueue)pq).setDropped(true);
            log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", (Object)this.consumerGroup, (Object)mq);
        }
        for (MessageQueue mq : this.popProcessQueueTable.keySet()) {
            if (subTable.containsKey(mq.getTopic()) || (pq = (PopProcessQueue)this.popProcessQueueTable.remove(mq)) == null) continue;
            ((PopProcessQueue)pq).setDropped(true);
            log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary pop mq, {}", (Object)this.consumerGroup, (Object)mq);
        }
        Iterator<Map.Entry<String, String>> clientIter = this.topicClientRebalance.entrySet().iterator();
        while (clientIter.hasNext()) {
            if (subTable.containsKey(clientIter.next().getKey())) continue;
            clientIter.remove();
        }
        Iterator<Map.Entry<String, String>> brokerIter = this.topicBrokerRebalance.entrySet().iterator();
        while (brokerIter.hasNext()) {
            if (subTable.containsKey(brokerIter.next().getKey())) continue;
            brokerIter.remove();
        }
    }

    private boolean updateProcessQueueTableInRebalance(String topic, Set<MessageQueue> mqSet, boolean isOrder) {
        boolean bl;
        boolean changed = false;
        HashMap<MessageQueue, ProcessQueue> removeQueueMap = new HashMap<MessageQueue, ProcessQueue>(this.processQueueTable.size());
        for (Map.Entry entry : this.processQueueTable.entrySet()) {
            MessageQueue messageQueue = (MessageQueue)entry.getKey();
            ProcessQueue pq = (ProcessQueue)entry.getValue();
            if (!messageQueue.getTopic().equals(topic)) continue;
            if (!mqSet.contains(messageQueue)) {
                pq.setDropped(true);
                removeQueueMap.put(messageQueue, pq);
                continue;
            }
            if (!pq.isPullExpired() || this.consumeType() != ConsumeType.CONSUME_PASSIVELY) continue;
            pq.setDropped(true);
            removeQueueMap.put(messageQueue, pq);
            log.error("[BUG]doRebalance, {}, try remove unnecessary mq, {}, because pull is pause, so try to fixed it", (Object)this.consumerGroup, (Object)messageQueue);
        }
        for (Map.Entry entry : removeQueueMap.entrySet()) {
            ProcessQueue pq;
            MessageQueue mq = (MessageQueue)entry.getKey();
            if (!this.removeUnnecessaryMessageQueue(mq, pq = (ProcessQueue)entry.getValue())) continue;
            this.processQueueTable.remove(mq);
            changed = true;
            log.info("doRebalance, {}, remove unnecessary mq, {}", (Object)this.consumerGroup, (Object)mq);
        }
        boolean bl2 = true;
        ArrayList<PullRequest> arrayList = new ArrayList<PullRequest>();
        for (MessageQueue mq : mqSet) {
            if (this.processQueueTable.containsKey(mq)) continue;
            if (isOrder && !this.lock(mq)) {
                log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", (Object)this.consumerGroup, (Object)mq);
                bl = false;
                continue;
            }
            this.removeDirtyOffset(mq);
            ProcessQueue pq = this.createProcessQueue();
            pq.setLocked(true);
            long nextOffset = this.computePullFromWhere(mq);
            if (nextOffset >= 0L) {
                ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                if (pre != null) {
                    log.info("doRebalance, {}, mq already exists, {}", (Object)this.consumerGroup, (Object)mq);
                    continue;
                }
                log.info("doRebalance, {}, add a new mq, {}", (Object)this.consumerGroup, (Object)mq);
                PullRequest pullRequest = new PullRequest();
                pullRequest.setConsumerGroup(this.consumerGroup);
                pullRequest.setNextOffset(nextOffset);
                pullRequest.setMessageQueue(mq);
                pullRequest.setProcessQueue(pq);
                arrayList.add(pullRequest);
                changed = true;
                continue;
            }
            log.warn("doRebalance, {}, add new mq failed, {}", (Object)this.consumerGroup, (Object)mq);
        }
        if (!bl) {
            this.mQClientFactory.rebalanceLater(500L);
        }
        this.dispatchPullRequest(arrayList, 500L);
        return changed;
    }

    private boolean updateMessageQueueAssignment(String topic, Set<MessageQueueAssignment> assignments, boolean isOrder) {
        Object pq;
        Object pq2;
        MessageQueue mq;
        boolean changed = false;
        HashMap<MessageQueue, MessageQueueAssignment> mq2PushAssignment = new HashMap<MessageQueue, MessageQueueAssignment>();
        HashMap<MessageQueue, MessageQueueAssignment> mq2PopAssignment = new HashMap<MessageQueue, MessageQueueAssignment>();
        for (MessageQueueAssignment assignment : assignments) {
            MessageQueue messageQueue = assignment.getMessageQueue();
            if (messageQueue == null) continue;
            if (MessageRequestMode.POP == assignment.getMode()) {
                mq2PopAssignment.put(messageQueue, assignment);
                continue;
            }
            mq2PushAssignment.put(messageQueue, assignment);
        }
        if (!topic.startsWith("%RETRY%")) {
            if (mq2PopAssignment.isEmpty() && !mq2PushAssignment.isEmpty()) {
                try {
                    String retryTopic2 = KeyBuilder.buildPopRetryTopic(topic, this.getConsumerGroup());
                    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic2, "*");
                    this.getSubscriptionInner().put(retryTopic2, subscriptionData);
                }
                catch (Exception retryTopic2) {}
            } else if (!mq2PopAssignment.isEmpty() && mq2PushAssignment.isEmpty()) {
                try {
                    String retryTopic3 = KeyBuilder.buildPopRetryTopic(topic, this.getConsumerGroup());
                    this.getSubscriptionInner().remove(retryTopic3);
                }
                catch (Exception retryTopic3) {
                    // empty catch block
                }
            }
        }
        HashMap<MessageQueue, Object> removeQueueMap = new HashMap<MessageQueue, Object>(this.processQueueTable.size());
        for (Map.Entry entry : this.processQueueTable.entrySet()) {
            MessageQueue messageQueue = (MessageQueue)entry.getKey();
            ProcessQueue pq22 = (ProcessQueue)entry.getValue();
            if (!messageQueue.getTopic().equals(topic)) continue;
            if (!mq2PushAssignment.containsKey(messageQueue)) {
                pq22.setDropped(true);
                removeQueueMap.put(messageQueue, pq22);
                continue;
            }
            if (!pq22.isPullExpired() || this.consumeType() != ConsumeType.CONSUME_PASSIVELY) continue;
            pq22.setDropped(true);
            removeQueueMap.put(messageQueue, pq22);
            log.error("[BUG]doRebalance, {}, try remove unnecessary mq, {}, because pull is pause, so try to fixed it", (Object)this.consumerGroup, (Object)messageQueue);
        }
        for (Map.Entry entry : removeQueueMap.entrySet()) {
            mq = (MessageQueue)entry.getKey();
            if (!this.removeUnnecessaryMessageQueue(mq, (ProcessQueue)(pq2 = (ProcessQueue)entry.getValue()))) continue;
            this.processQueueTable.remove(mq);
            changed = true;
            log.info("doRebalance, {}, remove unnecessary mq, {}", (Object)this.consumerGroup, (Object)mq);
        }
        removeQueueMap = new HashMap(this.popProcessQueueTable.size());
        for (Map.Entry entry : this.popProcessQueueTable.entrySet()) {
            MessageQueue messageQueue = (MessageQueue)entry.getKey();
            pq = (PopProcessQueue)entry.getValue();
            if (!messageQueue.getTopic().equals(topic)) continue;
            if (!mq2PopAssignment.containsKey(messageQueue)) {
                ((PopProcessQueue)pq).setDropped(true);
                removeQueueMap.put(messageQueue, pq);
                continue;
            }
            if (!((PopProcessQueue)pq).isPullExpired() || this.consumeType() != ConsumeType.CONSUME_PASSIVELY) continue;
            ((PopProcessQueue)pq).setDropped(true);
            removeQueueMap.put(messageQueue, pq);
            log.error("[BUG]doRebalance, {}, try remove unnecessary pop mq, {}, because pop is pause, so try to fixed it", (Object)this.consumerGroup, (Object)messageQueue);
        }
        for (Map.Entry entry : removeQueueMap.entrySet()) {
            mq = (MessageQueue)entry.getKey();
            if (!this.removeUnnecessaryPopMessageQueue(mq, (PopProcessQueue)(pq2 = (PopProcessQueue)entry.getValue()))) continue;
            this.popProcessQueueTable.remove(mq);
            changed = true;
            log.info("doRebalance, {}, remove unnecessary pop mq, {}", (Object)this.consumerGroup, (Object)mq);
        }
        boolean allMQLocked = true;
        ArrayList<PullRequest> pullRequestList = new ArrayList<PullRequest>();
        for (MessageQueue messageQueue : mq2PushAssignment.keySet()) {
            if (this.processQueueTable.containsKey(messageQueue)) continue;
            if (isOrder && !this.lock(messageQueue)) {
                log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", (Object)this.consumerGroup, (Object)messageQueue);
                allMQLocked = false;
                continue;
            }
            this.removeDirtyOffset(messageQueue);
            pq = this.createProcessQueue();
            ((ProcessQueue)pq).setLocked(true);
            long nextOffset = -1L;
            try {
                nextOffset = this.computePullFromWhereWithException(messageQueue);
            }
            catch (Exception e) {
                log.info("doRebalance, {}, compute offset failed, {}", (Object)this.consumerGroup, (Object)messageQueue);
                continue;
            }
            if (nextOffset >= 0L) {
                ProcessQueue pre = this.processQueueTable.putIfAbsent(messageQueue, (ProcessQueue)pq);
                if (pre != null) {
                    log.info("doRebalance, {}, mq already exists, {}", (Object)this.consumerGroup, (Object)messageQueue);
                    continue;
                }
                log.info("doRebalance, {}, add a new mq, {}", (Object)this.consumerGroup, (Object)messageQueue);
                PullRequest pullRequest = new PullRequest();
                pullRequest.setConsumerGroup(this.consumerGroup);
                pullRequest.setNextOffset(nextOffset);
                pullRequest.setMessageQueue(messageQueue);
                pullRequest.setProcessQueue((ProcessQueue)pq);
                pullRequestList.add(pullRequest);
                changed = true;
                continue;
            }
            log.warn("doRebalance, {}, add new mq failed, {}", (Object)this.consumerGroup, (Object)messageQueue);
        }
        if (!allMQLocked) {
            this.mQClientFactory.rebalanceLater(500L);
        }
        this.dispatchPullRequest(pullRequestList, 500L);
        ArrayList<PopRequest> popRequestList = new ArrayList<PopRequest>();
        for (MessageQueue messageQueue : mq2PopAssignment.keySet()) {
            if (this.popProcessQueueTable.containsKey(messageQueue)) continue;
            PopProcessQueue popProcessQueue = this.createPopProcessQueue();
            PopProcessQueue pre = this.popProcessQueueTable.putIfAbsent(messageQueue, popProcessQueue);
            if (pre != null) {
                log.info("doRebalance, {}, mq pop already exists, {}", (Object)this.consumerGroup, (Object)messageQueue);
                continue;
            }
            log.info("doRebalance, {}, add a new pop mq, {}", (Object)this.consumerGroup, (Object)messageQueue);
            PopRequest popRequest = new PopRequest();
            popRequest.setTopic(topic);
            popRequest.setConsumerGroup(this.consumerGroup);
            popRequest.setMessageQueue(messageQueue);
            popRequest.setPopProcessQueue(popProcessQueue);
            popRequest.setInitMode(this.getConsumeInitMode());
            popRequestList.add(popRequest);
            changed = true;
        }
        this.dispatchPopPullRequest(popRequestList, 500L);
        return changed;
    }

    public abstract void messageQueueChanged(String var1, Set<MessageQueue> var2, Set<MessageQueue> var3);

    public abstract boolean removeUnnecessaryMessageQueue(MessageQueue var1, ProcessQueue var2);

    public boolean removeUnnecessaryPopMessageQueue(MessageQueue mq, PopProcessQueue pq) {
        return true;
    }

    public abstract ConsumeType consumeType();

    public abstract void removeDirtyOffset(MessageQueue var1);

    @Deprecated
    public abstract long computePullFromWhere(MessageQueue var1);

    public abstract long computePullFromWhereWithException(MessageQueue var1) throws MQClientException;

    public abstract int getConsumeInitMode();

    public abstract void dispatchPullRequest(List<PullRequest> var1, long var2);

    public abstract void dispatchPopPullRequest(List<PopRequest> var1, long var2);

    public abstract ProcessQueue createProcessQueue();

    public abstract PopProcessQueue createPopProcessQueue();

    public void removeProcessQueue(MessageQueue mq) {
        ProcessQueue prev = (ProcessQueue)this.processQueueTable.remove(mq);
        if (prev != null) {
            boolean droped = prev.isDropped();
            prev.setDropped(true);
            this.removeUnnecessaryMessageQueue(mq, prev);
            log.info("Fix Offset, {}, remove unnecessary mq, {} Droped: {}", this.consumerGroup, mq, droped);
        }
    }

    public ConcurrentMap<MessageQueue, ProcessQueue> getProcessQueueTable() {
        return this.processQueueTable;
    }

    public ConcurrentMap<MessageQueue, PopProcessQueue> getPopProcessQueueTable() {
        return this.popProcessQueueTable;
    }

    public ConcurrentMap<String, Set<MessageQueue>> getTopicSubscribeInfoTable() {
        return this.topicSubscribeInfoTable;
    }

    public String getConsumerGroup() {
        return this.consumerGroup;
    }

    public void setConsumerGroup(String consumerGroup) {
        this.consumerGroup = consumerGroup;
    }

    public MessageModel getMessageModel() {
        return this.messageModel;
    }

    public void setMessageModel(MessageModel messageModel) {
        this.messageModel = messageModel;
    }

    public AllocateMessageQueueStrategy getAllocateMessageQueueStrategy() {
        return this.allocateMessageQueueStrategy;
    }

    public void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
    }

    public MQClientInstance getmQClientFactory() {
        return this.mQClientFactory;
    }

    public void setmQClientFactory(MQClientInstance mQClientFactory) {
        this.mQClientFactory = mQClientFactory;
    }

    public void destroy() {
        for (Map.Entry next : this.processQueueTable.entrySet()) {
            ((ProcessQueue)next.getValue()).setDropped(true);
        }
        this.processQueueTable.clear();
        for (Map.Entry next : this.popProcessQueueTable.entrySet()) {
            ((PopProcessQueue)next.getValue()).setDropped(true);
        }
        this.popProcessQueueTable.clear();
    }
}

