/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rocketmq.client.impl;

import com.alibaba.fastjson.JSON;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.consumer.AckCallback;
import org.apache.rocketmq.client.consumer.AckResult;
import org.apache.rocketmq.client.consumer.AckStatus;
import org.apache.rocketmq.client.consumer.PopCallback;
import org.apache.rocketmq.client.consumer.PopResult;
import org.apache.rocketmq.client.consumer.PopStatus;
import org.apache.rocketmq.client.consumer.PullCallback;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.exception.OffsetNotFoundException;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.impl.ClientRemotingProcessor;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.consumer.PullResultExt;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.rpchook.NamespaceRpcHook;
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.attribute.AttributeParser;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageBatch;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageQueueAssignment;
import org.apache.rocketmq.common.message.MessageRequestMode;
import org.apache.rocketmq.common.namesrv.DefaultTopAddressing;
import org.apache.rocketmq.common.namesrv.NameServerUpdateCallback;
import org.apache.rocketmq.common.namesrv.TopAddressing;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.common.HeartbeatV2Result;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
import org.apache.rocketmq.remoting.protocol.body.AclInfo;
import org.apache.rocketmq.remoting.protocol.body.BatchAck;
import org.apache.rocketmq.remoting.protocol.body.BatchAckMessageRequestBody;
import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup;
import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData;
import org.apache.rocketmq.remoting.protocol.body.CheckClientRequestBody;
import org.apache.rocketmq.remoting.protocol.body.ClusterAclVersionInfo;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList;
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.remoting.protocol.body.CreateTopicListRequestBody;
import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache;
import org.apache.rocketmq.remoting.protocol.body.GetConsumerStatusBody;
import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
import org.apache.rocketmq.remoting.protocol.body.KVTable;
import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.body.LockBatchResponseBody;
import org.apache.rocketmq.remoting.protocol.body.ProducerConnection;
import org.apache.rocketmq.remoting.protocol.body.ProducerTableInfo;
import org.apache.rocketmq.remoting.protocol.body.QueryAssignmentRequestBody;
import org.apache.rocketmq.remoting.protocol.body.QueryAssignmentResponseBody;
import org.apache.rocketmq.remoting.protocol.body.QueryConsumeQueueResponseBody;
import org.apache.rocketmq.remoting.protocol.body.QueryConsumeTimeSpanBody;
import org.apache.rocketmq.remoting.protocol.body.QueryCorrectionOffsetBody;
import org.apache.rocketmq.remoting.protocol.body.QuerySubscriptionResponseBody;
import org.apache.rocketmq.remoting.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.remoting.protocol.body.ResetOffsetBody;
import org.apache.rocketmq.remoting.protocol.body.SetMessageRequestModeRequestBody;
import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.remoting.protocol.body.TopicList;
import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.body.UserInfo;
import org.apache.rocketmq.remoting.protocol.header.AckMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.AddBrokerRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ChangeInvisibleTimeRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ChangeInvisibleTimeResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.CloneGroupOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ConsumerSendMsgBackRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.CreateAccessConfigRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.CreateAclRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.CreateTopicListRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.CreateTopicRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.CreateUserRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.DeleteAccessConfigRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.DeleteAclRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.DeleteSubscriptionGroupRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.DeleteTopicRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.DeleteUserRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.EndTransactionRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
import org.apache.rocketmq.remoting.protocol.header.GetAclRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetAllProducerInfoRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetBrokerAclConfigResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.GetConsumeStatsInBrokerHeader;
import org.apache.rocketmq.remoting.protocol.header.GetConsumeStatsRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetConsumerConnectionListRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetConsumerListByGroupRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetConsumerListByGroupResponseBody;
import org.apache.rocketmq.remoting.protocol.header.GetConsumerRunningInfoRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetConsumerStatusRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetEarliestMsgStoretimeRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetEarliestMsgStoretimeResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.GetMinOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetMinOffsetResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.GetProducerConnectionListRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetSubscriptionGroupConfigRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetTopicConfigRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetTopicStatsInfoRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetTopicsByClusterRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetUserRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.HeartbeatRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ListAclsRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ListUsersRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.LockBatchMqRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.PopMessageResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.PullMessageResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.QueryConsumeQueueRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.QueryConsumeTimeSpanRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.QueryConsumerOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.QueryConsumerOffsetResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.QueryCorrectionOffsetHeader;
import org.apache.rocketmq.remoting.protocol.header.QueryMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.QuerySubscriptionByConsumerRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.QueryTopicConsumeByWhoRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.QueryTopicsByConsumerRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.RemoveBrokerRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ResetMasterFlushOffsetHeader;
import org.apache.rocketmq.remoting.protocol.header.ResetOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ResumeCheckHalfMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.SearchOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.SearchOffsetResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeaderV2;
import org.apache.rocketmq.remoting.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.UnlockBatchMqRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.UnregisterClientRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.UpdateAclRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.UpdateGlobalWhiteAddrsConfigRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.UpdateGroupForbiddenRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.UpdateUserRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ViewBrokerStatsDataRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ViewMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanControllerBrokerDataRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.AddWritePermOfBrokerRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.AddWritePermOfBrokerResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.DeleteKVConfigRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.DeleteTopicFromNamesrvRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.GetKVConfigRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.GetKVConfigResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.GetKVListByNamespaceRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.GetRouteInfoRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.PutKVConfigRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.WipeWritePermOfBrokerRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.WipeWritePermOfBrokerResponseHeader;
import org.apache.rocketmq.remoting.protocol.heartbeat.HeartbeatData;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.protocol.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.remoting.protocol.subscription.GroupForbidden;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.rpchook.DynamicalExtFieldRPCHook;
import org.apache.rocketmq.remoting.rpchook.StreamTypeRPCHook;

public class MQClientAPIImpl
implements NameServerUpdateCallback {
    private static final Logger log = LoggerFactory.getLogger(MQClientAPIImpl.class);
    private static boolean sendSmartMsg = Boolean.parseBoolean(System.getProperty("org.apache.rocketmq.client.sendSmartMsg", "true"));
    private final RemotingClient remotingClient;
    private final TopAddressing topAddressing;
    private final ClientRemotingProcessor clientRemotingProcessor;
    private String nameSrvAddr = null;
    private ClientConfig clientConfig;

    public MQClientAPIImpl(NettyClientConfig nettyClientConfig, ClientRemotingProcessor clientRemotingProcessor, RPCHook rpcHook, ClientConfig clientConfig) {
        this(nettyClientConfig, clientRemotingProcessor, rpcHook, clientConfig, null);
    }

    public MQClientAPIImpl(NettyClientConfig nettyClientConfig, ClientRemotingProcessor clientRemotingProcessor, RPCHook rpcHook, ClientConfig clientConfig, ChannelEventListener channelEventListener) {
        this.clientConfig = clientConfig;
        this.topAddressing = new DefaultTopAddressing(MixAll.getWSAddr(), clientConfig.getUnitName());
        this.topAddressing.registerChangeCallBack(this);
        this.remotingClient = new NettyRemotingClient(nettyClientConfig, channelEventListener);
        this.clientRemotingProcessor = clientRemotingProcessor;
        this.remotingClient.registerRPCHook(new NamespaceRpcHook(clientConfig));
        if (clientConfig.isEnableStreamRequestType()) {
            this.remotingClient.registerRPCHook(new StreamTypeRPCHook());
        }
        this.remotingClient.registerRPCHook(rpcHook);
        this.remotingClient.registerRPCHook(new DynamicalExtFieldRPCHook());
        this.remotingClient.registerProcessor(39, this.clientRemotingProcessor, null);
        this.remotingClient.registerProcessor(40, this.clientRemotingProcessor, null);
        this.remotingClient.registerProcessor(220, this.clientRemotingProcessor, null);
        this.remotingClient.registerProcessor(221, this.clientRemotingProcessor, null);
        this.remotingClient.registerProcessor(307, this.clientRemotingProcessor, null);
        this.remotingClient.registerProcessor(309, this.clientRemotingProcessor, null);
        this.remotingClient.registerProcessor(326, this.clientRemotingProcessor, null);
    }

    public List<String> getNameServerAddressList() {
        return this.remotingClient.getNameServerAddressList();
    }

    public RemotingClient getRemotingClient() {
        return this.remotingClient;
    }

    public String fetchNameServerAddr() {
        try {
            String addrs = this.topAddressing.fetchNSAddr();
            if (!UtilAll.isBlank(addrs) && !addrs.equals(this.nameSrvAddr)) {
                log.info("name server address changed, old=" + this.nameSrvAddr + ", new=" + addrs);
                this.updateNameServerAddressList(addrs);
                this.nameSrvAddr = addrs;
                return this.nameSrvAddr;
            }
        }
        catch (Exception e) {
            log.error("fetchNameServerAddr Exception", e);
        }
        return this.nameSrvAddr;
    }

    @Override
    public String onNameServerAddressChange(String namesrvAddress) {
        if (namesrvAddress != null && !namesrvAddress.equals(this.nameSrvAddr)) {
            log.info("name server address changed, old=" + this.nameSrvAddr + ", new=" + namesrvAddress);
            this.updateNameServerAddressList(namesrvAddress);
            this.nameSrvAddr = namesrvAddress;
            return this.nameSrvAddr;
        }
        return this.nameSrvAddr;
    }

    public void updateNameServerAddressList(String addrs) {
        String[] addrArray = addrs.split(";");
        List<String> list = Arrays.asList(addrArray);
        this.remotingClient.updateNameServerAddressList(list);
    }

    public void start() {
        this.remotingClient.start();
    }

    public void shutdown() {
        this.remotingClient.shutdown();
    }

    public Set<MessageQueueAssignment> queryAssignment(String addr, String topic, String consumerGroup, String clientId, String strategyName, MessageModel messageModel, long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {
        QueryAssignmentRequestBody requestBody = new QueryAssignmentRequestBody();
        requestBody.setTopic(topic);
        requestBody.setConsumerGroup(consumerGroup);
        requestBody.setClientId(clientId);
        requestBody.setMessageModel(messageModel);
        requestBody.setStrategyName(strategyName);
        RemotingCommand request = RemotingCommand.createRequestCommand(400, null);
        request.setBody(requestBody.encode());
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
        switch (response.getCode()) {
            case 0: {
                QueryAssignmentResponseBody queryAssignmentResponseBody = QueryAssignmentResponseBody.decode(response.getBody(), QueryAssignmentResponseBody.class);
                return queryAssignmentResponseBody.getMessageQueueAssignments();
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark());
    }

    public void createSubscriptionGroup(String addr, SubscriptionGroupConfig config, long timeoutMillis) throws RemotingException, InterruptedException, MQClientException {
        RemotingCommand request = RemotingCommand.createRequestCommand(200, null);
        byte[] body = RemotingSerializable.encode(config);
        request.setBody(body);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                return;
            }
        }
        throw new MQClientException(response.getCode(), response.getRemark());
    }

    public void createTopic(String addr, String defaultTopic, TopicConfig topicConfig, long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        Validators.checkTopicConfig(topicConfig);
        CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
        requestHeader.setTopic(topicConfig.getTopicName());
        requestHeader.setDefaultTopic(defaultTopic);
        requestHeader.setReadQueueNums(topicConfig.getReadQueueNums());
        requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums());
        requestHeader.setPerm(topicConfig.getPerm());
        requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());
        requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
        requestHeader.setOrder(topicConfig.isOrder());
        requestHeader.setAttributes(AttributeParser.parseToString(topicConfig.getAttributes()));
        RemotingCommand request = RemotingCommand.createRequestCommand(17, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                return;
            }
        }
        throw new MQClientException(response.getCode(), response.getRemark());
    }

    public void createTopicList(String address, List<TopicConfig> topicConfigList, long timeoutMillis) throws InterruptedException, RemotingException, MQClientException {
        CreateTopicListRequestHeader requestHeader = new CreateTopicListRequestHeader();
        CreateTopicListRequestBody requestBody = new CreateTopicListRequestBody(topicConfigList);
        RemotingCommand request = RemotingCommand.createRequestCommand(18, requestHeader);
        request.setBody(requestBody.encode());
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), address), request, timeoutMillis);
        assert (response != null);
        if (response.getCode() == 0) {
            return;
        }
        throw new MQClientException(response.getCode(), response.getRemark());
    }

    public void createPlainAccessConfig(String addr, PlainAccessConfig plainAccessConfig, long timeoutMillis) throws RemotingException, InterruptedException, MQClientException {
        CreateAccessConfigRequestHeader requestHeader = new CreateAccessConfigRequestHeader();
        requestHeader.setAccessKey(plainAccessConfig.getAccessKey());
        requestHeader.setSecretKey(plainAccessConfig.getSecretKey());
        requestHeader.setAdmin(plainAccessConfig.isAdmin());
        requestHeader.setDefaultGroupPerm(plainAccessConfig.getDefaultGroupPerm());
        requestHeader.setDefaultTopicPerm(plainAccessConfig.getDefaultTopicPerm());
        requestHeader.setWhiteRemoteAddress(plainAccessConfig.getWhiteRemoteAddress());
        requestHeader.setTopicPerms(UtilAll.join(plainAccessConfig.getTopicPerms(), ","));
        requestHeader.setGroupPerms(UtilAll.join(plainAccessConfig.getGroupPerms(), ","));
        RemotingCommand request = RemotingCommand.createRequestCommand(50, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                return;
            }
        }
        throw new MQClientException(response.getCode(), response.getRemark());
    }

    public void deleteAccessConfig(String addr, String accessKey, long timeoutMillis) throws RemotingException, InterruptedException, MQClientException {
        DeleteAccessConfigRequestHeader requestHeader = new DeleteAccessConfigRequestHeader();
        requestHeader.setAccessKey(accessKey);
        RemotingCommand request = RemotingCommand.createRequestCommand(51, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                return;
            }
        }
        throw new MQClientException(response.getCode(), response.getRemark());
    }

    public void updateGlobalWhiteAddrsConfig(String addr, String globalWhiteAddrs, String aclFileFullPath, long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        UpdateGlobalWhiteAddrsConfigRequestHeader requestHeader = new UpdateGlobalWhiteAddrsConfigRequestHeader();
        requestHeader.setGlobalWhiteAddrs(globalWhiteAddrs);
        requestHeader.setAclFileFullPath(aclFileFullPath);
        RemotingCommand request = RemotingCommand.createRequestCommand(53, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                return;
            }
        }
        throw new MQClientException(response.getCode(), response.getRemark());
    }

    public ClusterAclVersionInfo getBrokerClusterAclInfo(String addr, long timeoutMillis) throws RemotingCommandException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
        RemotingCommand request = RemotingCommand.createRequestCommand(52, null);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                GetBrokerAclConfigResponseHeader responseHeader = response.decodeCommandCustomHeader(GetBrokerAclConfigResponseHeader.class);
                ClusterAclVersionInfo clusterAclVersionInfo = new ClusterAclVersionInfo();
                clusterAclVersionInfo.setClusterName(responseHeader.getClusterName());
                clusterAclVersionInfo.setBrokerName(responseHeader.getBrokerName());
                clusterAclVersionInfo.setBrokerAddr(responseHeader.getBrokerAddr());
                clusterAclVersionInfo.setAclConfigDataVersion(DataVersion.fromJson(responseHeader.getVersion(), DataVersion.class));
                HashMap dataVersionMap = JSON.parseObject(responseHeader.getAllAclFileVersion(), HashMap.class);
                HashMap<String, DataVersion> allAclConfigDataVersion = new HashMap<String, DataVersion>(dataVersionMap.size(), 1.0f);
                for (Map.Entry entry : dataVersionMap.entrySet()) {
                    allAclConfigDataVersion.put((String)entry.getKey(), DataVersion.fromJson(JSON.toJSONString(entry.getValue()), DataVersion.class));
                }
                clusterAclVersionInfo.setAllAclConfigDataVersion(allAclConfigDataVersion);
                return clusterAclVersionInfo;
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
    }

    public SendResult sendMessage(String addr, String brokerName, Message msg, SendMessageRequestHeader requestHeader, long timeoutMillis, CommunicationMode communicationMode, SendMessageContext context, DefaultMQProducerImpl producer) throws RemotingException, MQBrokerException, InterruptedException {
        return this.sendMessage(addr, brokerName, msg, requestHeader, timeoutMillis, communicationMode, null, null, null, 0, context, producer);
    }

    public SendResult sendMessage(String addr, String brokerName, Message msg, SendMessageRequestHeader requestHeader, long timeoutMillis, CommunicationMode communicationMode, SendCallback sendCallback, TopicPublishInfo topicPublishInfo, MQClientInstance instance, int retryTimesWhenSendFailed, SendMessageContext context, DefaultMQProducerImpl producer) throws RemotingException, MQBrokerException, InterruptedException {
        SendMessageRequestHeaderV2 requestHeaderV2;
        boolean isReply;
        long beginStartTime = System.currentTimeMillis();
        RemotingCommand request = null;
        String msgType = msg.getProperty("MSG_TYPE");
        boolean bl = isReply = msgType != null && msgType.equals("reply");
        if (isReply) {
            if (sendSmartMsg) {
                requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
                request = RemotingCommand.createRequestCommand(325, requestHeaderV2);
            } else {
                request = RemotingCommand.createRequestCommand(324, requestHeader);
            }
        } else if (sendSmartMsg || msg instanceof MessageBatch) {
            requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
            request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? 320 : 310, requestHeaderV2);
        } else {
            request = RemotingCommand.createRequestCommand(10, requestHeader);
        }
        request.setBody(msg.getBody());
        switch (communicationMode) {
            case ONEWAY: {
                this.remotingClient.invokeOneway(addr, request, timeoutMillis);
                return null;
            }
            case ASYNC: {
                AtomicInteger times = new AtomicInteger();
                long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTimeAsync) {
                    throw new RemotingTooMuchRequestException("sendMessage call timeout");
                }
                this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, context, producer);
                return null;
            }
            case SYNC: {
                long costTimeSync = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTimeSync) {
                    throw new RemotingTooMuchRequestException("sendMessage call timeout");
                }
                return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
            }
        }
        assert (false);
        return null;
    }

    private SendResult sendMessageSync(String addr, String brokerName, Message msg, long timeoutMillis, RemotingCommand request) throws RemotingException, MQBrokerException, InterruptedException {
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
        assert (response != null);
        return this.processSendResponse(brokerName, msg, response, addr);
    }

    void execRpcHooksAfterRequest(ResponseFuture responseFuture) {
        if (this.remotingClient instanceof NettyRemotingClient) {
            NettyRemotingClient remotingClient = (NettyRemotingClient)this.remotingClient;
            RemotingCommand response = responseFuture.getResponseCommand();
            remotingClient.doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(responseFuture.getChannel()), responseFuture.getRequestCommand(), response);
        }
    }

    private void sendMessageAsync(final String addr, final String brokerName, final Message msg, final long timeoutMillis, final RemotingCommand request, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final MQClientInstance instance, final int retryTimesWhenSendFailed, final AtomicInteger times, final SendMessageContext context, final DefaultMQProducerImpl producer) {
        final long beginStartTime = System.currentTimeMillis();
        try {
            this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback(){

                @Override
                public void operationComplete(ResponseFuture responseFuture) {
                }

                @Override
                public void operationSucceed(RemotingCommand response) {
                    long cost = System.currentTimeMillis() - beginStartTime;
                    if (null == sendCallback) {
                        try {
                            SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
                            if (context != null && sendResult != null) {
                                context.setSendResult(sendResult);
                                context.getProducer().executeSendMessageHookAfter(context);
                            }
                        }
                        catch (Throwable sendResult) {
                            // empty catch block
                        }
                        producer.updateFaultItem(brokerName, System.currentTimeMillis() - beginStartTime, false, true);
                        return;
                    }
                    try {
                        SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
                        assert (sendResult != null);
                        if (context != null) {
                            context.setSendResult(sendResult);
                            context.getProducer().executeSendMessageHookAfter(context);
                        }
                        try {
                            sendCallback.onSuccess(sendResult);
                        }
                        catch (Throwable throwable) {
                            // empty catch block
                        }
                        producer.updateFaultItem(brokerName, System.currentTimeMillis() - beginStartTime, false, true);
                    }
                    catch (Exception e) {
                        producer.updateFaultItem(brokerName, System.currentTimeMillis() - beginStartTime, true, true);
                        MQClientAPIImpl.this.onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, e, context, false, producer);
                    }
                }

                @Override
                public void operationFail(Throwable throwable) {
                    producer.updateFaultItem(brokerName, System.currentTimeMillis() - beginStartTime, true, true);
                    long cost = System.currentTimeMillis() - beginStartTime;
                    if (throwable instanceof RemotingSendRequestException) {
                        MQClientException ex = new MQClientException("send request failed", throwable);
                        MQClientAPIImpl.this.onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer);
                    } else if (throwable instanceof RemotingTimeoutException) {
                        MQClientException ex = new MQClientException("wait response timeout, cost=" + cost, throwable);
                        MQClientAPIImpl.this.onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer);
                    } else {
                        MQClientException ex = new MQClientException("unknown reason", throwable);
                        boolean needRetry = !(throwable instanceof RemotingTooMuchRequestException);
                        MQClientAPIImpl.this.onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, needRetry, producer);
                    }
                }
            });
        }
        catch (Exception ex) {
            long cost = System.currentTimeMillis() - beginStartTime;
            producer.updateFaultItem(brokerName, cost, true, false);
            this.onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer);
        }
    }

    private void onExceptionImpl(String brokerName, Message msg, long timeoutMillis, RemotingCommand request, SendCallback sendCallback, TopicPublishInfo topicPublishInfo, MQClientInstance instance, int timesTotal, AtomicInteger curTimes, Exception e, SendMessageContext context, boolean needRetry, DefaultMQProducerImpl producer) {
        int tmp = curTimes.incrementAndGet();
        if (needRetry && tmp <= timesTotal) {
            String retryBrokerName = brokerName;
            if (topicPublishInfo != null) {
                MessageQueue mqChosen = producer.selectOneMessageQueue(topicPublishInfo, brokerName, false);
                retryBrokerName = instance.getBrokerNameFromMessageQueue(mqChosen);
            }
            String addr = instance.findBrokerAddressInPublish(retryBrokerName);
            log.warn("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr, retryBrokerName, e);
            request.setOpaque(RemotingCommand.createNewRequestId());
            this.sendMessageAsync(addr, retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, context, producer);
        } else {
            if (context != null) {
                context.setException(e);
                context.getProducer().executeSendMessageHookAfter(context);
            }
            try {
                sendCallback.onException(e);
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
    }

    protected SendResult processSendResponse(String brokerName, Message msg, RemotingCommand response, String addr) throws MQBrokerException, RemotingCommandException {
        SendStatus sendStatus;
        switch (response.getCode()) {
            case 10: {
                sendStatus = SendStatus.FLUSH_DISK_TIMEOUT;
                break;
            }
            case 12: {
                sendStatus = SendStatus.FLUSH_SLAVE_TIMEOUT;
                break;
            }
            case 11: {
                sendStatus = SendStatus.SLAVE_NOT_AVAILABLE;
                break;
            }
            case 0: {
                sendStatus = SendStatus.SEND_OK;
                break;
            }
            default: {
                throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
            }
        }
        SendMessageResponseHeader responseHeader = response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
        String topic = msg.getTopic();
        if (StringUtils.isNotEmpty(this.clientConfig.getNamespace())) {
            topic = NamespaceUtil.withoutNamespace(topic, this.clientConfig.getNamespace());
        }
        MessageQueue messageQueue = new MessageQueue(topic, brokerName, responseHeader.getQueueId());
        String uniqMsgId = MessageClientIDSetter.getUniqID(msg);
        if (msg instanceof MessageBatch && responseHeader.getBatchUniqId() == null) {
            StringBuilder sb = new StringBuilder();
            for (Message message : (MessageBatch)msg) {
                sb.append(sb.length() == 0 ? "" : ",").append(MessageClientIDSetter.getUniqID(message));
            }
            uniqMsgId = sb.toString();
        }
        SendResult sendResult = new SendResult(sendStatus, uniqMsgId, responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset());
        sendResult.setTransactionId(responseHeader.getTransactionId());
        String regionId = response.getExtFields().get("MSG_REGION");
        if (regionId == null || regionId.isEmpty()) {
            regionId = "DefaultRegion";
        }
        sendResult.setRegionId(regionId);
        String traceOn = response.getExtFields().get("TRACE_ON");
        sendResult.setTraceOn(!Boolean.FALSE.toString().equals(traceOn));
        return sendResult;
    }

    public PullResult pullMessage(String addr, PullMessageRequestHeader requestHeader, long timeoutMillis, CommunicationMode communicationMode, PullCallback pullCallback) throws RemotingException, MQBrokerException, InterruptedException {
        RemotingCommand request = PullSysFlag.hasLitePullFlag(requestHeader.getSysFlag()) ? RemotingCommand.createRequestCommand(361, requestHeader) : RemotingCommand.createRequestCommand(11, requestHeader);
        switch (communicationMode) {
            case ONEWAY: {
                assert (false);
                return null;
            }
            case ASYNC: {
                this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);
                return null;
            }
            case SYNC: {
                return this.pullMessageSync(addr, request, timeoutMillis);
            }
        }
        assert (false);
        return null;
    }

    public void popMessageAsync(final String brokerName, String addr, final PopMessageRequestHeader requestHeader, long timeoutMillis, final PopCallback popCallback) throws RemotingException, InterruptedException {
        RemotingCommand request = RemotingCommand.createRequestCommand(200050, requestHeader);
        this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback(){

            @Override
            public void operationComplete(ResponseFuture responseFuture) {
            }

            @Override
            public void operationSucceed(RemotingCommand response) {
                try {
                    PopResult popResult = MQClientAPIImpl.this.processPopResponse(brokerName, response, requestHeader.getTopic(), requestHeader);
                    popCallback.onSuccess(popResult);
                }
                catch (Exception e) {
                    popCallback.onException(e);
                }
            }

            @Override
            public void operationFail(Throwable throwable) {
                popCallback.onException(throwable);
            }
        });
    }

    public void ackMessageAsync(String addr, long timeOut, AckCallback ackCallback, AckMessageRequestHeader requestHeader) throws RemotingException, MQBrokerException, InterruptedException {
        this.ackMessageAsync(addr, timeOut, ackCallback, requestHeader, null);
    }

    public void batchAckMessageAsync(String addr, long timeOut, AckCallback ackCallback, String topic, String consumerGroup, List<String> extraInfoList) throws RemotingException, MQBrokerException, InterruptedException {
        String brokerName = null;
        HashMap<String, BatchAck> batchAckMap = new HashMap<String, BatchAck>();
        for (String extraInfo : extraInfoList) {
            String[] extraInfoData = ExtraInfoUtil.split(extraInfo);
            if (brokerName == null) {
                brokerName = ExtraInfoUtil.getBrokerName(extraInfoData);
            }
            String mergeKey = ExtraInfoUtil.getRetry(extraInfoData) + "@" + ExtraInfoUtil.getQueueId(extraInfoData) + "@" + ExtraInfoUtil.getCkQueueOffset(extraInfoData) + "@" + ExtraInfoUtil.getPopTime(extraInfoData);
            BatchAck bAck = batchAckMap.computeIfAbsent(mergeKey, k -> {
                BatchAck newBatchAck = new BatchAck();
                newBatchAck.setConsumerGroup(consumerGroup);
                newBatchAck.setTopic(topic);
                newBatchAck.setRetry(ExtraInfoUtil.getRetry(extraInfoData));
                newBatchAck.setStartOffset(ExtraInfoUtil.getCkQueueOffset(extraInfoData));
                newBatchAck.setQueueId(ExtraInfoUtil.getQueueId(extraInfoData));
                newBatchAck.setReviveQueueId(ExtraInfoUtil.getReviveQid(extraInfoData));
                newBatchAck.setPopTime(ExtraInfoUtil.getPopTime(extraInfoData));
                newBatchAck.setInvisibleTime(ExtraInfoUtil.getInvisibleTime(extraInfoData));
                newBatchAck.setBitSet(new BitSet());
                return newBatchAck;
            });
            bAck.getBitSet().set((int)(ExtraInfoUtil.getQueueOffset(extraInfoData) - ExtraInfoUtil.getCkQueueOffset(extraInfoData)));
        }
        BatchAckMessageRequestBody requestBody = new BatchAckMessageRequestBody();
        requestBody.setBrokerName(brokerName);
        requestBody.setAcks(new ArrayList<BatchAck>(batchAckMap.values()));
        this.batchAckMessageAsync(addr, timeOut, ackCallback, requestBody);
    }

    public void batchAckMessageAsync(String addr, long timeOut, AckCallback ackCallback, BatchAckMessageRequestBody requestBody) throws RemotingException, MQBrokerException, InterruptedException {
        this.ackMessageAsync(addr, timeOut, ackCallback, null, requestBody);
    }

    protected void ackMessageAsync(String addr, long timeOut, final AckCallback ackCallback, AckMessageRequestHeader requestHeader, BatchAckMessageRequestBody requestBody) throws RemotingException, MQBrokerException, InterruptedException {
        RemotingCommand request;
        if (requestHeader != null) {
            request = RemotingCommand.createRequestCommand(200051, requestHeader);
        } else {
            request = RemotingCommand.createRequestCommand(200151, null);
            if (requestBody != null) {
                request.setBody(requestBody.encode());
            }
        }
        this.remotingClient.invokeAsync(addr, request, timeOut, new InvokeCallback(){

            @Override
            public void operationComplete(ResponseFuture responseFuture) {
            }

            @Override
            public void operationSucceed(RemotingCommand response) {
                AckResult ackResult = new AckResult();
                if (0 == response.getCode()) {
                    ackResult.setStatus(AckStatus.OK);
                } else {
                    ackResult.setStatus(AckStatus.NO_EXIST);
                }
                ackCallback.onSuccess(ackResult);
            }

            @Override
            public void operationFail(Throwable throwable) {
                ackCallback.onException(throwable);
            }
        });
    }

    public void changeInvisibleTimeAsync(final String brokerName, String addr, final ChangeInvisibleTimeRequestHeader requestHeader, long timeoutMillis, final AckCallback ackCallback) throws RemotingException, MQBrokerException, InterruptedException {
        RemotingCommand request = RemotingCommand.createRequestCommand(200053, requestHeader);
        this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback(){

            @Override
            public void operationComplete(ResponseFuture responseFuture) {
            }

            @Override
            public void operationSucceed(RemotingCommand response) {
                try {
                    ChangeInvisibleTimeResponseHeader responseHeader = response.decodeCommandCustomHeader(ChangeInvisibleTimeResponseHeader.class);
                    AckResult ackResult = new AckResult();
                    if (0 == response.getCode()) {
                        ackResult.setStatus(AckStatus.OK);
                        ackResult.setPopTime(responseHeader.getPopTime());
                        ackResult.setExtraInfo(ExtraInfoUtil.buildExtraInfo(requestHeader.getOffset(), responseHeader.getPopTime(), responseHeader.getInvisibleTime(), responseHeader.getReviveQid(), requestHeader.getTopic(), brokerName, requestHeader.getQueueId()) + " " + requestHeader.getOffset());
                    } else {
                        ackResult.setStatus(AckStatus.NO_EXIST);
                    }
                    ackCallback.onSuccess(ackResult);
                }
                catch (Exception e) {
                    ackCallback.onException(e);
                }
            }

            @Override
            public void operationFail(Throwable throwable) {
                ackCallback.onException(throwable);
            }
        });
    }

    private void pullMessageAsync(final String addr, RemotingCommand request, long timeoutMillis, final PullCallback pullCallback) throws RemotingException, InterruptedException {
        this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback(){

            @Override
            public void operationComplete(ResponseFuture responseFuture) {
            }

            @Override
            public void operationSucceed(RemotingCommand response) {
                try {
                    PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response, addr);
                    pullCallback.onSuccess(pullResult);
                }
                catch (Exception e) {
                    pullCallback.onException(e);
                }
            }

            @Override
            public void operationFail(Throwable throwable) {
                pullCallback.onException(throwable);
            }
        });
    }

    private PullResult pullMessageSync(String addr, RemotingCommand request, long timeoutMillis) throws RemotingException, InterruptedException, MQBrokerException {
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
        assert (response != null);
        return this.processPullResponse(response, addr);
    }

    private PullResult processPullResponse(RemotingCommand response, String addr) throws MQBrokerException, RemotingCommandException {
        PullStatus pullStatus = PullStatus.NO_NEW_MSG;
        switch (response.getCode()) {
            case 0: {
                pullStatus = PullStatus.FOUND;
                break;
            }
            case 19: {
                pullStatus = PullStatus.NO_NEW_MSG;
                break;
            }
            case 20: {
                pullStatus = PullStatus.NO_MATCHED_MSG;
                break;
            }
            case 21: {
                pullStatus = PullStatus.OFFSET_ILLEGAL;
                break;
            }
            default: {
                throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
            }
        }
        PullMessageResponseHeader responseHeader = response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
        return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(), responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody(), responseHeader.getOffsetDelta());
    }

    private PopResult processPopResponse(String brokerName, RemotingCommand response, String topic, CommandCustomHeader requestHeader) throws MQBrokerException, RemotingCommandException {
        PopStatus popStatus = PopStatus.NO_NEW_MSG;
        List<MessageExt> msgFoundList = null;
        switch (response.getCode()) {
            case 0: {
                popStatus = PopStatus.FOUND;
                ByteBuffer byteBuffer = ByteBuffer.wrap(response.getBody());
                msgFoundList = MessageDecoder.decodesBatch(byteBuffer, this.clientConfig.isDecodeReadBody(), this.clientConfig.isDecodeDecompressBody(), true);
                break;
            }
            case 209: {
                popStatus = PopStatus.POLLING_FULL;
                break;
            }
            case 210: {
                popStatus = PopStatus.POLLING_NOT_FOUND;
                break;
            }
            case 19: {
                popStatus = PopStatus.POLLING_NOT_FOUND;
                break;
            }
            default: {
                throw new MQBrokerException(response.getCode(), response.getRemark());
            }
        }
        PopResult popResult = new PopResult(popStatus, msgFoundList);
        PopMessageResponseHeader responseHeader = response.decodeCommandCustomHeader(PopMessageResponseHeader.class);
        popResult.setRestNum(responseHeader.getRestNum());
        if (popStatus != PopStatus.FOUND) {
            return popResult;
        }
        Map<String, Long> startOffsetInfo = null;
        Map<String, List<Long>> msgOffsetInfo = null;
        Map<String, Integer> orderCountInfo = null;
        if (requestHeader instanceof PopMessageRequestHeader) {
            popResult.setInvisibleTime(responseHeader.getInvisibleTime());
            popResult.setPopTime(responseHeader.getPopTime());
            startOffsetInfo = ExtraInfoUtil.parseStartOffsetInfo(responseHeader.getStartOffsetInfo());
            msgOffsetInfo = ExtraInfoUtil.parseMsgOffsetInfo(responseHeader.getMsgOffsetInfo());
            orderCountInfo = ExtraInfoUtil.parseOrderCountInfo(responseHeader.getOrderCountInfo());
        }
        Map<String, List<Long>> sortMap = MQClientAPIImpl.buildQueueOffsetSortedMap(topic, msgFoundList);
        HashMap<String, String> map = new HashMap<String, String>(5);
        for (MessageExt messageExt : msgFoundList) {
            if (requestHeader instanceof PopMessageRequestHeader) {
                if (startOffsetInfo == null) {
                    String key = messageExt.getTopic() + messageExt.getQueueId();
                    if (!map.containsKey(messageExt.getTopic() + messageExt.getQueueId())) {
                        map.put(key, ExtraInfoUtil.buildExtraInfo(messageExt.getQueueOffset(), responseHeader.getPopTime(), responseHeader.getInvisibleTime(), responseHeader.getReviveQid(), messageExt.getTopic(), brokerName, messageExt.getQueueId()));
                    }
                    messageExt.getProperties().put("POP_CK", (String)map.get(key) + " " + messageExt.getQueueOffset());
                } else if (messageExt.getProperty("POP_CK") == null) {
                    Long msgQueueOffset;
                    int index;
                    String queueOffsetKey;
                    String queueIdKey;
                    if (MixAll.isLmq(topic) && messageExt.getReconsumeTimes() == 0 && StringUtils.isNotEmpty(messageExt.getProperty("INNER_MULTI_DISPATCH"))) {
                        Object[] queues = messageExt.getProperty("INNER_MULTI_DISPATCH").split(",");
                        String[] queueOffsets = messageExt.getProperty("INNER_MULTI_QUEUE_OFFSET").split(",");
                        long offset = Long.parseLong(queueOffsets[ArrayUtils.indexOf(queues, topic)]);
                        queueIdKey = ExtraInfoUtil.getStartOffsetInfoMapKey(topic, 0L);
                        queueOffsetKey = ExtraInfoUtil.getQueueOffsetMapKey(topic, 0L, offset);
                        index = sortMap.get(queueIdKey).indexOf(offset);
                        msgQueueOffset = msgOffsetInfo.get(queueIdKey).get(index);
                        if (msgQueueOffset != offset) {
                            log.warn("Queue offset[{}] of msg is strange, not equal to the stored in msg, {}", (Object)msgQueueOffset, (Object)messageExt);
                        }
                        messageExt.getProperties().put("POP_CK", ExtraInfoUtil.buildExtraInfo(startOffsetInfo.get(queueIdKey), responseHeader.getPopTime(), responseHeader.getInvisibleTime(), responseHeader.getReviveQid(), topic, brokerName, 0, msgQueueOffset));
                    } else {
                        queueIdKey = ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(), messageExt.getQueueId());
                        queueOffsetKey = ExtraInfoUtil.getQueueOffsetMapKey(messageExt.getTopic(), messageExt.getQueueId(), messageExt.getQueueOffset());
                        index = sortMap.get(queueIdKey).indexOf(messageExt.getQueueOffset());
                        msgQueueOffset = msgOffsetInfo.get(queueIdKey).get(index);
                        if (msgQueueOffset.longValue() != messageExt.getQueueOffset()) {
                            log.warn("Queue offset[{}] of msg is strange, not equal to the stored in msg, {}", (Object)msgQueueOffset, (Object)messageExt);
                        }
                        messageExt.getProperties().put("POP_CK", ExtraInfoUtil.buildExtraInfo(startOffsetInfo.get(queueIdKey), responseHeader.getPopTime(), responseHeader.getInvisibleTime(), responseHeader.getReviveQid(), messageExt.getTopic(), brokerName, messageExt.getQueueId(), msgQueueOffset));
                    }
                    if (((PopMessageRequestHeader)requestHeader).isOrder() && orderCountInfo != null) {
                        Integer count = orderCountInfo.get(queueOffsetKey);
                        if (count == null) {
                            count = orderCountInfo.get(queueIdKey);
                        }
                        if (count != null && count > 0) {
                            messageExt.setReconsumeTimes(count);
                        }
                    }
                }
                messageExt.getProperties().computeIfAbsent("1ST_POP_TIME", k -> String.valueOf(responseHeader.getPopTime()));
            }
            messageExt.setBrokerName(brokerName);
            messageExt.setTopic(NamespaceUtil.withoutNamespace(topic, this.clientConfig.getNamespace()));
        }
        return popResult;
    }

    private static Map<String, List<Long>> buildQueueOffsetSortedMap(String topic, List<MessageExt> msgFoundList) {
        HashMap<String, List<Long>> sortMap = new HashMap<String, List<Long>>(16);
        for (MessageExt messageExt : msgFoundList) {
            String key;
            if (MixAll.isLmq(topic) && messageExt.getReconsumeTimes() == 0 && StringUtils.isNotEmpty(messageExt.getProperty("INNER_MULTI_DISPATCH"))) {
                Object[] queues = messageExt.getProperty("INNER_MULTI_DISPATCH").split(",");
                String[] queueOffsets = messageExt.getProperty("INNER_MULTI_QUEUE_OFFSET").split(",");
                key = ExtraInfoUtil.getStartOffsetInfoMapKey(topic, 0L);
                sortMap.putIfAbsent(key, new ArrayList(4));
                ((List)sortMap.get(key)).add(Long.parseLong(queueOffsets[ArrayUtils.indexOf(queues, topic)]));
                continue;
            }
            key = ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(), messageExt.getProperty("POP_CK"), messageExt.getQueueId());
            if (!sortMap.containsKey(key)) {
                sortMap.put(key, new ArrayList(4));
            }
            ((List)sortMap.get(key)).add(messageExt.getQueueOffset());
        }
        return sortMap;
    }

    public MessageExt viewMessage(String addr, String topic, long phyoffset, long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {
        ViewMessageRequestHeader requestHeader = new ViewMessageRequestHeader();
        requestHeader.setTopic(topic);
        requestHeader.setOffset(phyoffset);
        RemotingCommand request = RemotingCommand.createRequestCommand(33, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                ByteBuffer byteBuffer = ByteBuffer.wrap(response.getBody());
                MessageExt messageExt = MessageDecoder.clientDecode(byteBuffer, true);
                if (StringUtils.isNotEmpty(this.clientConfig.getNamespace())) {
                    messageExt.setTopic(NamespaceUtil.withoutNamespace(messageExt.getTopic(), this.clientConfig.getNamespace()));
                }
                return messageExt;
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
    }

    @Deprecated
    public long searchOffset(String addr, String topic, int queueId, long timestamp, long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {
        SearchOffsetRequestHeader requestHeader = new SearchOffsetRequestHeader();
        requestHeader.setTopic(topic);
        requestHeader.setQueueId(queueId);
        requestHeader.setTimestamp(timestamp);
        RemotingCommand request = RemotingCommand.createRequestCommand(29, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                SearchOffsetResponseHeader responseHeader = response.decodeCommandCustomHeader(SearchOffsetResponseHeader.class);
                return responseHeader.getOffset();
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
    }

    public long searchOffset(String addr, MessageQueue messageQueue, long timestamp, long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {
        return this.searchOffset(addr, messageQueue, timestamp, BoundaryType.LOWER, timeoutMillis);
    }

    public long searchOffset(String addr, MessageQueue messageQueue, long timestamp, BoundaryType boundaryType, long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {
        SearchOffsetRequestHeader requestHeader = new SearchOffsetRequestHeader();
        requestHeader.setTopic(messageQueue.getTopic());
        requestHeader.setQueueId(messageQueue.getQueueId());
        requestHeader.setBrokerName(messageQueue.getBrokerName());
        requestHeader.setTimestamp(timestamp);
        requestHeader.setBoundaryType(boundaryType);
        RemotingCommand request = RemotingCommand.createRequestCommand(29, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                SearchOffsetResponseHeader responseHeader = response.decodeCommandCustomHeader(SearchOffsetResponseHeader.class);
                return responseHeader.getOffset();
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
    }

    public long getMaxOffset(String addr, MessageQueue messageQueue, long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {
        GetMaxOffsetRequestHeader requestHeader = new GetMaxOffsetRequestHeader();
        requestHeader.setTopic(messageQueue.getTopic());
        requestHeader.setQueueId(messageQueue.getQueueId());
        requestHeader.setBrokerName(messageQueue.getBrokerName());
        RemotingCommand request = RemotingCommand.createRequestCommand(30, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                GetMaxOffsetResponseHeader responseHeader = response.decodeCommandCustomHeader(GetMaxOffsetResponseHeader.class);
                return responseHeader.getOffset();
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
    }

    public List<String> getConsumerIdListByGroup(String addr, String consumerGroup, long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException {
        GetConsumerListByGroupRequestHeader requestHeader = new GetConsumerListByGroupRequestHeader();
        requestHeader.setConsumerGroup(consumerGroup);
        RemotingCommand request = RemotingCommand.createRequestCommand(38, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                if (response.getBody() == null) break;
                GetConsumerListByGroupResponseBody body = GetConsumerListByGroupResponseBody.decode(response.getBody(), GetConsumerListByGroupResponseBody.class);
                return body.getConsumerIdList();
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
    }

    public long getMinOffset(String addr, MessageQueue messageQueue, long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {
        GetMinOffsetRequestHeader requestHeader = new GetMinOffsetRequestHeader();
        requestHeader.setTopic(messageQueue.getTopic());
        requestHeader.setQueueId(messageQueue.getQueueId());
        requestHeader.setBrokerName(messageQueue.getBrokerName());
        RemotingCommand request = RemotingCommand.createRequestCommand(31, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                GetMinOffsetResponseHeader responseHeader = response.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class);
                return responseHeader.getOffset();
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
    }

    public long getEarliestMsgStoretime(String addr, MessageQueue mq, long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {
        GetEarliestMsgStoretimeRequestHeader requestHeader = new GetEarliestMsgStoretimeRequestHeader();
        requestHeader.setTopic(mq.getTopic());
        requestHeader.setQueueId(mq.getQueueId());
        requestHeader.setBrokerName(mq.getBrokerName());
        RemotingCommand request = RemotingCommand.createRequestCommand(32, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                GetEarliestMsgStoretimeResponseHeader responseHeader = response.decodeCommandCustomHeader(GetEarliestMsgStoretimeResponseHeader.class);
                return responseHeader.getTimestamp();
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
    }

    public long queryConsumerOffset(String addr, QueryConsumerOffsetRequestHeader requestHeader, long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {
        RemotingCommand request = RemotingCommand.createRequestCommand(14, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                QueryConsumerOffsetResponseHeader responseHeader = response.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);
                return responseHeader.getOffset();
            }
            case 22: {
                throw new OffsetNotFoundException(response.getCode(), response.getRemark(), addr);
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
    }

    public void updateConsumerOffset(String addr, UpdateConsumerOffsetRequestHeader requestHeader, long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {
        RemotingCommand request = RemotingCommand.createRequestCommand(15, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                return;
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
    }

    public void updateConsumerOffsetOneway(String addr, UpdateConsumerOffsetRequestHeader requestHeader, long timeoutMillis) throws RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException, InterruptedException {
        RemotingCommand request = RemotingCommand.createRequestCommand(15, requestHeader);
        this.remotingClient.invokeOneway(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
    }

    public int sendHeartbeat(String addr, HeartbeatData heartbeatData, long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {
        RemotingCommand request = RemotingCommand.createRequestCommand(34, new HeartbeatRequestHeader());
        request.setLanguage(this.clientConfig.getLanguage());
        request.setBody(heartbeatData.encode());
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                return response.getVersion();
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
    }

    public HeartbeatV2Result sendHeartbeatV2(String addr, HeartbeatData heartbeatData, long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {
        RemotingCommand request = RemotingCommand.createRequestCommand(34, new HeartbeatRequestHeader());
        request.setLanguage(this.clientConfig.getLanguage());
        request.setBody(heartbeatData.encode());
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                if (response.getExtFields() != null) {
                    return new HeartbeatV2Result(response.getVersion(), Boolean.parseBoolean(response.getExtFields().get("IS_SUB_CHANGE")), Boolean.parseBoolean(response.getExtFields().get("IS_SUPPORT_HEART_BEAT_V2")));
                }
                return new HeartbeatV2Result(response.getVersion(), false, false);
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark());
    }

    public void unregisterClient(String addr, String clientID, String producerGroup, String consumerGroup, long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {
        UnregisterClientRequestHeader requestHeader = new UnregisterClientRequestHeader();
        requestHeader.setClientID(clientID);
        requestHeader.setProducerGroup(producerGroup);
        requestHeader.setConsumerGroup(consumerGroup);
        RemotingCommand request = RemotingCommand.createRequestCommand(35, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                return;
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
    }

    public void endTransactionOneway(String addr, EndTransactionRequestHeader requestHeader, String remark, long timeoutMillis) throws RemotingException, InterruptedException {
        RemotingCommand request = RemotingCommand.createRequestCommand(37, requestHeader);
        request.setRemark(remark);
        this.remotingClient.invokeOneway(addr, request, timeoutMillis);
    }

    public void queryMessage(String addr, QueryMessageRequestHeader requestHeader, long timeoutMillis, InvokeCallback invokeCallback, Boolean isUnqiueKey) throws RemotingException, MQBrokerException, InterruptedException {
        RemotingCommand request = RemotingCommand.createRequestCommand(12, requestHeader);
        request.addExtField("_UNIQUE_KEY_QUERY", isUnqiueKey.toString());
        this.remotingClient.invokeAsync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis, invokeCallback);
    }

    public boolean registerClient(String addr, HeartbeatData heartbeat, long timeoutMillis) throws RemotingException, InterruptedException {
        RemotingCommand request = RemotingCommand.createRequestCommand(34, new HeartbeatRequestHeader());
        request.setBody(heartbeat.encode());
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
        return response.getCode() == 0;
    }

    public void consumerSendMessageBack(String addr, String brokerName, MessageExt msg, String consumerGroup, int delayLevel, long timeoutMillis, int maxConsumeRetryTimes) throws RemotingException, MQBrokerException, InterruptedException {
        ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();
        RemotingCommand request = RemotingCommand.createRequestCommand(36, requestHeader);
        requestHeader.setGroup(consumerGroup);
        requestHeader.setOriginTopic(msg.getTopic());
        requestHeader.setOffset(msg.getCommitLogOffset());
        requestHeader.setDelayLevel(delayLevel);
        requestHeader.setOriginMsgId(msg.getMsgId());
        requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);
        requestHeader.setBrokerName(brokerName);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                return;
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
    }

    public Set<MessageQueue> lockBatchMQ(String addr, LockBatchRequestBody requestBody, long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {
        RemotingCommand request = RemotingCommand.createRequestCommand(41, new LockBatchMqRequestHeader());
        request.setBody(requestBody.encode());
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
        switch (response.getCode()) {
            case 0: {
                LockBatchResponseBody responseBody = LockBatchResponseBody.decode(response.getBody(), LockBatchResponseBody.class);
                Set<MessageQueue> messageQueues = responseBody.getLockOKMQSet();
                return messageQueues;
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
    }

    public void unlockBatchMQ(String addr, UnlockBatchRequestBody requestBody, long timeoutMillis, boolean oneway) throws RemotingException, MQBrokerException, InterruptedException {
        RemotingCommand request = RemotingCommand.createRequestCommand(42, new UnlockBatchMqRequestHeader());
        request.setBody(requestBody.encode());
        if (!oneway) {
            RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
            switch (response.getCode()) {
                case 0: {
                    return;
                }
            }
            throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
        }
        this.remotingClient.invokeOneway(addr, request, timeoutMillis);
    }

    public TopicStatsTable getTopicStatsInfo(String addr, String topic, long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
        GetTopicStatsInfoRequestHeader requestHeader = new GetTopicStatsInfoRequestHeader();
        requestHeader.setTopic(topic);
        RemotingCommand request = RemotingCommand.createRequestCommand(202, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
        switch (response.getCode()) {
            case 0: {
                TopicStatsTable topicStatsTable = TopicStatsTable.decode(response.getBody(), TopicStatsTable.class);
                return topicStatsTable;
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
    }

    public ConsumeStats getConsumeStats(String addr, String consumerGroup, long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
        return this.getConsumeStats(addr, consumerGroup, null, timeoutMillis);
    }

    public ConsumeStats getConsumeStats(String addr, String consumerGroup, String topic, long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
        GetConsumeStatsRequestHeader requestHeader = new GetConsumeStatsRequestHeader();
        requestHeader.setConsumerGroup(consumerGroup);
        requestHeader.setTopic(topic);
        RemotingCommand request = RemotingCommand.createRequestCommand(208, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
        switch (response.getCode()) {
            case 0: {
                ConsumeStats consumeStats = ConsumeStats.decode(response.getBody(), ConsumeStats.class);
                return consumeStats;
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
    }

    public ProducerConnection getProducerConnectionList(String addr, String producerGroup, long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
        GetProducerConnectionListRequestHeader requestHeader = new GetProducerConnectionListRequestHeader();
        requestHeader.setProducerGroup(producerGroup);
        RemotingCommand request = RemotingCommand.createRequestCommand(204, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
        switch (response.getCode()) {
            case 0: {
                return ProducerConnection.decode(response.getBody(), ProducerConnection.class);
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
    }

    public ProducerTableInfo getAllProducerInfo(String addr, long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
        GetAllProducerInfoRequestHeader requestHeader = new GetAllProducerInfoRequestHeader();
        RemotingCommand request = RemotingCommand.createRequestCommand(328, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
        switch (response.getCode()) {
            case 0: {
                return ProducerTableInfo.decode(response.getBody(), ProducerTableInfo.class);
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
    }

    public ConsumerConnection getConsumerConnectionList(String addr, String consumerGroup, long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
        GetConsumerConnectionListRequestHeader requestHeader = new GetConsumerConnectionListRequestHeader();
        requestHeader.setConsumerGroup(consumerGroup);
        RemotingCommand request = RemotingCommand.createRequestCommand(203, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
        switch (response.getCode()) {
            case 0: {
                return ConsumerConnection.decode(response.getBody(), ConsumerConnection.class);
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
    }

    public KVTable getBrokerRuntimeInfo(String addr, long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
        RemotingCommand request = RemotingCommand.createRequestCommand(28, null);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
        switch (response.getCode()) {
            case 0: {
                return KVTable.decode(response.getBody(), KVTable.class);
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
    }

    public void addBroker(String addr, String brokerConfigPath, long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
        AddBrokerRequestHeader requestHeader = new AddBrokerRequestHeader();
        requestHeader.setConfigPath(brokerConfigPath);
        RemotingCommand request = RemotingCommand.createRequestCommand(902, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                return;
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark());
    }

    public void removeBroker(String addr, String clusterName, String brokerName, long brokerId, long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
        RemoveBrokerRequestHeader requestHeader = new RemoveBrokerRequestHeader();
        requestHeader.setBrokerClusterName(clusterName);
        requestHeader.setBrokerName(brokerName);
        requestHeader.setBrokerId(brokerId);
        RemotingCommand request = RemotingCommand.createRequestCommand(903, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                return;
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark());
    }

    public void updateBrokerConfig(String addr, Properties properties, long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException, MQClientException, UnsupportedEncodingException {
        Validators.checkBrokerConfig(properties);
        RemotingCommand request = RemotingCommand.createRequestCommand(25, null);
        String str = MixAll.properties2String(properties);
        if (str != null && str.length() > 0) {
            request.setBody(str.getBytes("UTF-8"));
            RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
            switch (response.getCode()) {
                case 0: {
                    return;
                }
            }
            throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
        }
    }

    public Properties getBrokerConfig(String addr, long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException, UnsupportedEncodingException {
        RemotingCommand request = RemotingCommand.createRequestCommand(26, null);
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                return MixAll.string2Properties(new String(response.getBody(), "UTF-8"));
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
    }

    public void updateColdDataFlowCtrGroupConfig(String addr, Properties properties, long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException, UnsupportedEncodingException {
        RemotingCommand request = RemotingCommand.createRequestCommand(2001, null);
        String str = MixAll.properties2String(properties);
        if (str != null && str.length() > 0) {
            request.setBody(str.getBytes("UTF-8"));
            RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
            switch (response.getCode()) {
                case 0: {
                    return;
                }
            }
            throw new MQBrokerException(response.getCode(), response.getRemark());
        }
    }

    public void removeColdDataFlowCtrGroupConfig(String addr, String consumerGroup, long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException, UnsupportedEncodingException {
        RemotingCommand request = RemotingCommand.createRequestCommand(2002, null);
        if (consumerGroup != null && consumerGroup.length() > 0) {
            request.setBody(consumerGroup.getBytes("UTF-8"));
            RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
            switch (response.getCode()) {
                case 0: {
                    return;
                }
            }
            throw new MQBrokerException(response.getCode(), response.getRemark());
        }
    }

    public String getColdDataFlowCtrInfo(String addr, long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException, UnsupportedEncodingException {
        RemotingCommand request = RemotingCommand.createRequestCommand(2003, null);
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                if (null != response.getBody() && response.getBody().length > 0) {
                    return new String(response.getBody(), "UTF-8");
                }
                return null;
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark());
    }

    public String setCommitLogReadAheadMode(String addr, String mode, long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
        RemotingCommand request = RemotingCommand.createRequestCommand(2004, null);
        HashMap<String, String> extFields = new HashMap<String, String>();
        extFields.put("READ_AHEAD_MODE", mode);
        request.setExtFields(extFields);
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                if (null != response.getRemark() && response.getRemark().length() > 0) {
                    return response.getRemark();
                }
                return null;
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark());
    }

    public ClusterInfo getBrokerClusterInfo(long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
        RemotingCommand request = RemotingCommand.createRequestCommand(106, null);
        RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                return ClusterInfo.decode(response.getBody(), ClusterInfo.class);
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark());
    }

    public TopicRouteData getDefaultTopicRouteInfoFromNameServer(long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
        return this.getTopicRouteInfoFromNameServer("TBW102", timeoutMillis, false);
    }

    public TopicRouteData getTopicRouteInfoFromNameServer(String topic, long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
        return this.getTopicRouteInfoFromNameServer(topic, timeoutMillis, true);
    }

    public TopicRouteData getTopicRouteInfoFromNameServer(String topic, long timeoutMillis, boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
        GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
        requestHeader.setTopic(topic);
        RemotingCommand request = RemotingCommand.createRequestCommand(105, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 17: {
                if (!allowTopicNotExist) break;
                log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", (Object)topic);
                break;
            }
            case 0: {
                byte[] body = response.getBody();
                if (body == null) break;
                return TopicRouteData.decode(body, TopicRouteData.class);
            }
        }
        throw new MQClientException(response.getCode(), response.getRemark());
    }

    public TopicList getTopicListFromNameServer(long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
        RemotingCommand request = RemotingCommand.createRequestCommand(206, null);
        RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                byte[] body = response.getBody();
                if (body == null) break;
                return TopicList.decode(body, TopicList.class);
            }
        }
        throw new MQClientException(response.getCode(), response.getRemark());
    }

    public int wipeWritePermOfBroker(String namesrvAddr, String brokerName, long timeoutMillis) throws RemotingCommandException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException {
        WipeWritePermOfBrokerRequestHeader requestHeader = new WipeWritePermOfBrokerRequestHeader();
        requestHeader.setBrokerName(brokerName);
        RemotingCommand request = RemotingCommand.createRequestCommand(205, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                WipeWritePermOfBrokerResponseHeader responseHeader = response.decodeCommandCustomHeader(WipeWritePermOfBrokerResponseHeader.class);
                return responseHeader.getWipeTopicCount();
            }
        }
        throw new MQClientException(response.getCode(), response.getRemark());
    }

    public int addWritePermOfBroker(String nameSrvAddr, String brokerName, long timeoutMillis) throws RemotingCommandException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException {
        AddWritePermOfBrokerRequestHeader requestHeader = new AddWritePermOfBrokerRequestHeader();
        requestHeader.setBrokerName(brokerName);
        RemotingCommand request = RemotingCommand.createRequestCommand(327, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(nameSrvAddr, request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                AddWritePermOfBrokerResponseHeader responseHeader = response.decodeCommandCustomHeader(AddWritePermOfBrokerResponseHeader.class);
                return responseHeader.getAddTopicCount();
            }
        }
        throw new MQClientException(response.getCode(), response.getRemark());
    }

    public void deleteTopicInBroker(String addr, String topic, long timeoutMillis) throws RemotingException, InterruptedException, MQClientException {
        DeleteTopicRequestHeader requestHeader = new DeleteTopicRequestHeader();
        requestHeader.setTopic(topic);
        RemotingCommand request = RemotingCommand.createRequestCommand(215, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                return;
            }
        }
        throw new MQClientException(response.getCode(), response.getRemark());
    }

    public void deleteTopicInNameServer(String addr, String topic, long timeoutMillis) throws RemotingException, InterruptedException, MQClientException {
        DeleteTopicFromNamesrvRequestHeader requestHeader = new DeleteTopicFromNamesrvRequestHeader();
        requestHeader.setTopic(topic);
        RemotingCommand request = RemotingCommand.createRequestCommand(216, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                return;
            }
        }
        throw new MQClientException(response.getCode(), response.getRemark());
    }

    public void deleteTopicInNameServer(String addr, String clusterName, String topic, long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        DeleteTopicFromNamesrvRequestHeader requestHeader = new DeleteTopicFromNamesrvRequestHeader();
        requestHeader.setTopic(topic);
        requestHeader.setClusterName(clusterName);
        RemotingCommand request = RemotingCommand.createRequestCommand(216, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                return;
            }
        }
        throw new MQClientException(response.getCode(), response.getRemark());
    }

    public void deleteSubscriptionGroup(String addr, String groupName, boolean removeOffset, long timeoutMillis) throws RemotingException, InterruptedException, MQClientException {
        DeleteSubscriptionGroupRequestHeader requestHeader = new DeleteSubscriptionGroupRequestHeader();
        requestHeader.setGroupName(groupName);
        requestHeader.setCleanOffset(removeOffset);
        RemotingCommand request = RemotingCommand.createRequestCommand(207, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                return;
            }
        }
        throw new MQClientException(response.getCode(), response.getRemark());
    }

    public String getKVConfigValue(String namespace, String key, long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
        GetKVConfigRequestHeader requestHeader = new GetKVConfigRequestHeader();
        requestHeader.setNamespace(namespace);
        requestHeader.setKey(key);
        RemotingCommand request = RemotingCommand.createRequestCommand(101, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                GetKVConfigResponseHeader responseHeader = response.decodeCommandCustomHeader(GetKVConfigResponseHeader.class);
                return responseHeader.getValue();
            }
        }
        throw new MQClientException(response.getCode(), response.getRemark());
    }

    public void putKVConfigValue(String namespace, String key, String value, long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
        PutKVConfigRequestHeader requestHeader = new PutKVConfigRequestHeader();
        requestHeader.setNamespace(namespace);
        requestHeader.setKey(key);
        requestHeader.setValue(value);
        RemotingCommand request = RemotingCommand.createRequestCommand(100, requestHeader);
        List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
        if (nameServerAddressList != null) {
            RemotingCommand errResponse = null;
            block3: for (String namesrvAddr : nameServerAddressList) {
                RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMillis);
                assert (response != null);
                switch (response.getCode()) {
                    case 0: {
                        continue block3;
                    }
                }
                errResponse = response;
            }
            if (errResponse != null) {
                throw new MQClientException(errResponse.getCode(), errResponse.getRemark());
            }
        }
    }

    public void deleteKVConfigValue(String namespace, String key, long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
        DeleteKVConfigRequestHeader requestHeader = new DeleteKVConfigRequestHeader();
        requestHeader.setNamespace(namespace);
        requestHeader.setKey(key);
        RemotingCommand request = RemotingCommand.createRequestCommand(102, requestHeader);
        List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
        if (nameServerAddressList != null) {
            RemotingCommand errResponse = null;
            block3: for (String namesrvAddr : nameServerAddressList) {
                RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMillis);
                assert (response != null);
                switch (response.getCode()) {
                    case 0: {
                        continue block3;
                    }
                }
                errResponse = response;
            }
            if (errResponse != null) {
                throw new MQClientException(errResponse.getCode(), errResponse.getRemark());
            }
        }
    }

    public KVTable getKVListByNamespace(String namespace, long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
        GetKVListByNamespaceRequestHeader requestHeader = new GetKVListByNamespaceRequestHeader();
        requestHeader.setNamespace(namespace);
        RemotingCommand request = RemotingCommand.createRequestCommand(219, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                return KVTable.decode(response.getBody(), KVTable.class);
            }
        }
        throw new MQClientException(response.getCode(), response.getRemark());
    }

    public Map<MessageQueue, Long> invokeBrokerToResetOffset(String addr, String topic, String group, long timestamp, boolean isForce, long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
        return this.invokeBrokerToResetOffset(addr, topic, group, timestamp, isForce, timeoutMillis, false);
    }

    public Map<MessageQueue, Long> invokeBrokerToResetOffset(String addr, String topic, String group, long timestamp, int queueId, Long offset, long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
        ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
        requestHeader.setTopic(topic);
        requestHeader.setGroup(group);
        requestHeader.setQueueId(queueId);
        requestHeader.setTimestamp(timestamp);
        requestHeader.setOffset(offset);
        RemotingCommand request = RemotingCommand.createRequestCommand(222, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
        switch (response.getCode()) {
            case 0: {
                if (null == response.getBody()) break;
                return ResetOffsetBody.decode(response.getBody(), ResetOffsetBody.class).getOffsetTable();
            }
            case 1: 
            case 17: 
            case 24: {
                log.warn("Invoke broker to reset offset error code={}, remark={}", (Object)response.getCode(), (Object)response.getRemark());
                break;
            }
        }
        throw new MQClientException(response.getCode(), response.getRemark());
    }

    public Map<MessageQueue, Long> invokeBrokerToResetOffset(String addr, String topic, String group, long timestamp, boolean isForce, long timeoutMillis, boolean isC) throws RemotingException, MQClientException, InterruptedException {
        ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
        requestHeader.setTopic(topic);
        requestHeader.setGroup(group);
        requestHeader.setTimestamp(timestamp);
        requestHeader.setForce(isForce);
        requestHeader.setOffset(-1L);
        RemotingCommand request = RemotingCommand.createRequestCommand(222, requestHeader);
        if (isC) {
            request.setLanguage(LanguageCode.CPP);
        }
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                if (response.getBody() == null) break;
                ResetOffsetBody body = ResetOffsetBody.decode(response.getBody(), ResetOffsetBody.class);
                return body.getOffsetTable();
            }
        }
        throw new MQClientException(response.getCode(), response.getRemark());
    }

    public Map<String, Map<MessageQueue, Long>> invokeBrokerToGetConsumerStatus(String addr, String topic, String group, String clientAddr, long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
        GetConsumerStatusRequestHeader requestHeader = new GetConsumerStatusRequestHeader();
        requestHeader.setTopic(topic);
        requestHeader.setGroup(group);
        requestHeader.setClientAddr(clientAddr);
        RemotingCommand request = RemotingCommand.createRequestCommand(223, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                if (response.getBody() == null) break;
                GetConsumerStatusBody body = GetConsumerStatusBody.decode(response.getBody(), GetConsumerStatusBody.class);
                return body.getConsumerTable();
            }
        }
        throw new MQClientException(response.getCode(), response.getRemark());
    }

    public GroupList queryTopicConsumeByWho(String addr, String topic, long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
        QueryTopicConsumeByWhoRequestHeader requestHeader = new QueryTopicConsumeByWhoRequestHeader();
        requestHeader.setTopic(topic);
        RemotingCommand request = RemotingCommand.createRequestCommand(300, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
        switch (response.getCode()) {
            case 0: {
                GroupList groupList = GroupList.decode(response.getBody(), GroupList.class);
                return groupList;
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
    }

    public TopicList queryTopicsByConsumer(String addr, String group, long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
        QueryTopicsByConsumerRequestHeader requestHeader = new QueryTopicsByConsumerRequestHeader();
        requestHeader.setGroup(group);
        RemotingCommand request = RemotingCommand.createRequestCommand(343, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
        switch (response.getCode()) {
            case 0: {
                TopicList topicList = TopicList.decode(response.getBody(), TopicList.class);
                return topicList;
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark());
    }

    public SubscriptionData querySubscriptionByConsumer(String addr, String group, String topic, long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
        QuerySubscriptionByConsumerRequestHeader requestHeader = new QuerySubscriptionByConsumerRequestHeader();
        requestHeader.setGroup(group);
        requestHeader.setTopic(topic);
        RemotingCommand request = RemotingCommand.createRequestCommand(345, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
        switch (response.getCode()) {
            case 0: {
                QuerySubscriptionResponseBody subscriptionResponseBody = QuerySubscriptionResponseBody.decode(response.getBody(), QuerySubscriptionResponseBody.class);
                return subscriptionResponseBody.getSubscriptionData();
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark());
    }

    public List<QueueTimeSpan> queryConsumeTimeSpan(String addr, String topic, String group, long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
        QueryConsumeTimeSpanRequestHeader requestHeader = new QueryConsumeTimeSpanRequestHeader();
        requestHeader.setTopic(topic);
        requestHeader.setGroup(group);
        RemotingCommand request = RemotingCommand.createRequestCommand(303, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
        switch (response.getCode()) {
            case 0: {
                QueryConsumeTimeSpanBody consumeTimeSpanBody = GroupList.decode(response.getBody(), QueryConsumeTimeSpanBody.class);
                return consumeTimeSpanBody.getConsumeTimeSpanSet();
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
    }

    public TopicList getTopicsByCluster(String cluster, long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
        GetTopicsByClusterRequestHeader requestHeader = new GetTopicsByClusterRequestHeader();
        requestHeader.setCluster(cluster);
        RemotingCommand request = RemotingCommand.createRequestCommand(224, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                byte[] body = response.getBody();
                if (body == null) break;
                TopicList topicList = TopicList.decode(body, TopicList.class);
                return topicList;
            }
        }
        throw new MQClientException(response.getCode(), response.getRemark());
    }

    public TopicList getSystemTopicList(long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
        RemotingCommand request = RemotingCommand.createRequestCommand(304, null);
        RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                TopicList tmp;
                byte[] body = response.getBody();
                if (body == null) break;
                TopicList topicList = TopicList.decode(response.getBody(), TopicList.class);
                if (!(topicList.getTopicList() == null || topicList.getTopicList().isEmpty() || UtilAll.isBlank(topicList.getBrokerAddr()) || (tmp = this.getSystemTopicListFromBroker(topicList.getBrokerAddr(), timeoutMillis)).getTopicList() == null || tmp.getTopicList().isEmpty())) {
                    topicList.getTopicList().addAll(tmp.getTopicList());
                }
                return topicList;
            }
        }
        throw new MQClientException(response.getCode(), response.getRemark());
    }

    public TopicList getSystemTopicListFromBroker(String addr, long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
        RemotingCommand request = RemotingCommand.createRequestCommand(305, null);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                byte[] body = response.getBody();
                if (body == null) break;
                TopicList topicList = TopicList.decode(body, TopicList.class);
                return topicList;
            }
        }
        throw new MQClientException(response.getCode(), response.getRemark());
    }

    public boolean cleanExpiredConsumeQueue(String addr, long timeoutMillis) throws MQClientException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
        RemotingCommand request = RemotingCommand.createRequestCommand(306, null);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
        switch (response.getCode()) {
            case 0: {
                return true;
            }
        }
        throw new MQClientException(response.getCode(), response.getRemark());
    }

    public boolean deleteExpiredCommitLog(String addr, long timeoutMillis) throws MQClientException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
        RemotingCommand request = RemotingCommand.createRequestCommand(329, null);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
        switch (response.getCode()) {
            case 0: {
                return true;
            }
        }
        throw new MQClientException(response.getCode(), response.getRemark());
    }

    public boolean cleanUnusedTopicByAddr(String addr, long timeoutMillis) throws MQClientException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
        RemotingCommand request = RemotingCommand.createRequestCommand(316, null);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
        switch (response.getCode()) {
            case 0: {
                return true;
            }
        }
        throw new MQClientException(response.getCode(), response.getRemark());
    }

    public ConsumerRunningInfo getConsumerRunningInfo(String addr, String consumerGroup, String clientId, boolean jstack, long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
        GetConsumerRunningInfoRequestHeader requestHeader = new GetConsumerRunningInfoRequestHeader();
        requestHeader.setConsumerGroup(consumerGroup);
        requestHeader.setClientId(clientId);
        requestHeader.setJstackEnable(jstack);
        RemotingCommand request = RemotingCommand.createRequestCommand(307, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                byte[] body = response.getBody();
                if (body == null) break;
                ConsumerRunningInfo info = ConsumerRunningInfo.decode(body, ConsumerRunningInfo.class);
                return info;
            }
        }
        throw new MQClientException(response.getCode(), response.getRemark());
    }

    public ConsumeMessageDirectlyResult consumeMessageDirectly(String addr, String consumerGroup, String clientId, String topic, String msgId, long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
        ConsumeMessageDirectlyResultRequestHeader requestHeader = new ConsumeMessageDirectlyResultRequestHeader();
        requestHeader.setTopic(topic);
        requestHeader.setConsumerGroup(consumerGroup);
        requestHeader.setClientId(clientId);
        requestHeader.setMsgId(msgId);
        RemotingCommand request = RemotingCommand.createRequestCommand(309, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                byte[] body = response.getBody();
                if (body == null) break;
                ConsumeMessageDirectlyResult info = ConsumeMessageDirectlyResult.decode(body, ConsumeMessageDirectlyResult.class);
                return info;
            }
        }
        throw new MQClientException(response.getCode(), response.getRemark());
    }

    public Map<Integer, Long> queryCorrectionOffset(String addr, String topic, String group, Set<String> filterGroup, long timeoutMillis) throws MQClientException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
        QueryCorrectionOffsetHeader requestHeader = new QueryCorrectionOffsetHeader();
        requestHeader.setCompareGroup(group);
        requestHeader.setTopic(topic);
        if (filterGroup != null) {
            StringBuilder sb = new StringBuilder();
            String splitor = "";
            for (String s2 : filterGroup) {
                sb.append(splitor).append(s2);
                splitor = ",";
            }
            requestHeader.setFilterGroups(sb.toString());
        }
        RemotingCommand request = RemotingCommand.createRequestCommand(308, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                if (response.getBody() == null) break;
                QueryCorrectionOffsetBody body = QueryCorrectionOffsetBody.decode(response.getBody(), QueryCorrectionOffsetBody.class);
                return body.getCorrectionOffsets();
            }
        }
        throw new MQClientException(response.getCode(), response.getRemark());
    }

    public TopicList getUnitTopicList(boolean containRetry, long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
        RemotingCommand request = RemotingCommand.createRequestCommand(311, null);
        RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                byte[] body = response.getBody();
                if (body == null) break;
                TopicList topicList = TopicList.decode(response.getBody(), TopicList.class);
                if (!containRetry) {
                    Iterator<String> it = topicList.getTopicList().iterator();
                    while (it.hasNext()) {
                        String topic = it.next();
                        if (!topic.startsWith("%RETRY%")) continue;
                        it.remove();
                    }
                }
                return topicList;
            }
        }
        throw new MQClientException(response.getCode(), response.getRemark());
    }

    public TopicList getHasUnitSubTopicList(boolean containRetry, long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
        RemotingCommand request = RemotingCommand.createRequestCommand(312, null);
        RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                byte[] body = response.getBody();
                if (body == null) break;
                TopicList topicList = TopicList.decode(response.getBody(), TopicList.class);
                if (!containRetry) {
                    Iterator<String> it = topicList.getTopicList().iterator();
                    while (it.hasNext()) {
                        String topic = it.next();
                        if (!topic.startsWith("%RETRY%")) continue;
                        it.remove();
                    }
                }
                return topicList;
            }
        }
        throw new MQClientException(response.getCode(), response.getRemark());
    }

    public TopicList getHasUnitSubUnUnitTopicList(boolean containRetry, long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
        RemotingCommand request = RemotingCommand.createRequestCommand(313, null);
        RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                byte[] body = response.getBody();
                if (body == null) break;
                TopicList topicList = TopicList.decode(response.getBody(), TopicList.class);
                if (!containRetry) {
                    Iterator<String> it = topicList.getTopicList().iterator();
                    while (it.hasNext()) {
                        String topic = it.next();
                        if (!topic.startsWith("%RETRY%")) continue;
                        it.remove();
                    }
                }
                return topicList;
            }
        }
        throw new MQClientException(response.getCode(), response.getRemark());
    }

    public void cloneGroupOffset(String addr, String srcGroup, String destGroup, String topic, boolean isOffline, long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
        CloneGroupOffsetRequestHeader requestHeader = new CloneGroupOffsetRequestHeader();
        requestHeader.setSrcGroup(srcGroup);
        requestHeader.setDestGroup(destGroup);
        requestHeader.setTopic(topic);
        requestHeader.setOffline(isOffline);
        RemotingCommand request = RemotingCommand.createRequestCommand(314, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                return;
            }
        }
        throw new MQClientException(response.getCode(), response.getRemark());
    }

    public BrokerStatsData viewBrokerStatsData(String brokerAddr, String statsName, String statsKey, long timeoutMillis) throws MQClientException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
        ViewBrokerStatsDataRequestHeader requestHeader = new ViewBrokerStatsDataRequestHeader();
        requestHeader.setStatsName(statsName);
        requestHeader.setStatsKey(statsKey);
        RemotingCommand request = RemotingCommand.createRequestCommand(315, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                byte[] body = response.getBody();
                if (body == null) break;
                return BrokerStatsData.decode(body, BrokerStatsData.class);
            }
        }
        throw new MQClientException(response.getCode(), response.getRemark());
    }

    public Set<String> getClusterList(String topic, long timeoutMillis) {
        return Collections.EMPTY_SET;
    }

    public ConsumeStatsList fetchConsumeStatsInBroker(String brokerAddr, boolean isOrder, long timeoutMillis) throws MQClientException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
        GetConsumeStatsInBrokerHeader requestHeader = new GetConsumeStatsInBrokerHeader();
        requestHeader.setIsOrder(isOrder);
        RemotingCommand request = RemotingCommand.createRequestCommand(317, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                byte[] body = response.getBody();
                if (body == null) break;
                return ConsumeStatsList.decode(body, ConsumeStatsList.class);
            }
        }
        throw new MQClientException(response.getCode(), response.getRemark());
    }

    public SubscriptionGroupWrapper getAllSubscriptionGroup(String brokerAddr, long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
        RemotingCommand request = RemotingCommand.createRequestCommand(201, null);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                return SubscriptionGroupWrapper.decode(response.getBody(), SubscriptionGroupWrapper.class);
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark(), brokerAddr);
    }

    public SubscriptionGroupConfig getSubscriptionGroupConfig(String brokerAddr, String group, long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
        GetSubscriptionGroupConfigRequestHeader header = new GetSubscriptionGroupConfigRequestHeader();
        header.setGroup(group);
        RemotingCommand request = RemotingCommand.createRequestCommand(352, header);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                return RemotingSerializable.decode(response.getBody(), SubscriptionGroupConfig.class);
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark(), brokerAddr);
    }

    public TopicConfigSerializeWrapper getAllTopicConfig(String addr, long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
        RemotingCommand request = RemotingCommand.createRequestCommand(21, null);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                return TopicConfigSerializeWrapper.decode(response.getBody(), TopicConfigSerializeWrapper.class);
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
    }

    public void updateNameServerConfig(Properties properties, List<String> nameServers, long timeoutMillis) throws UnsupportedEncodingException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException {
        List<String> invokeNameServers;
        String str = MixAll.properties2String(properties);
        if (str == null || str.length() < 1) {
            return;
        }
        List<String> list = invokeNameServers = nameServers == null || nameServers.isEmpty() ? this.remotingClient.getNameServerAddressList() : nameServers;
        if (invokeNameServers == null || invokeNameServers.isEmpty()) {
            return;
        }
        RemotingCommand request = RemotingCommand.createRequestCommand(318, null);
        request.setBody(str.getBytes("UTF-8"));
        RemotingCommand errResponse = null;
        block3: for (String nameServer : invokeNameServers) {
            RemotingCommand response = this.remotingClient.invokeSync(nameServer, request, timeoutMillis);
            assert (response != null);
            switch (response.getCode()) {
                case 0: {
                    continue block3;
                }
            }
            errResponse = response;
        }
        if (errResponse != null) {
            throw new MQClientException(errResponse.getCode(), errResponse.getRemark());
        }
    }

    public Map<String, Properties> getNameServerConfig(List<String> nameServers, long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException, UnsupportedEncodingException {
        List<String> invokeNameServers;
        List<String> list = invokeNameServers = nameServers == null || nameServers.isEmpty() ? this.remotingClient.getNameServerAddressList() : nameServers;
        if (invokeNameServers == null || invokeNameServers.isEmpty()) {
            return null;
        }
        RemotingCommand request = RemotingCommand.createRequestCommand(319, null);
        HashMap<String, Properties> configMap = new HashMap<String, Properties>(4);
        for (String nameServer : invokeNameServers) {
            RemotingCommand response = this.remotingClient.invokeSync(nameServer, request, timeoutMillis);
            assert (response != null);
            if (0 == response.getCode()) {
                configMap.put(nameServer, MixAll.string2Properties(new String(response.getBody(), "UTF-8")));
                continue;
            }
            throw new MQClientException(response.getCode(), response.getRemark());
        }
        return configMap;
    }

    public QueryConsumeQueueResponseBody queryConsumeQueue(String brokerAddr, String topic, int queueId, long index, int count, String consumerGroup, long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException {
        QueryConsumeQueueRequestHeader requestHeader = new QueryConsumeQueueRequestHeader();
        requestHeader.setTopic(topic);
        requestHeader.setQueueId(queueId);
        requestHeader.setIndex(index);
        requestHeader.setCount(count);
        requestHeader.setConsumerGroup(consumerGroup);
        RemotingCommand request = RemotingCommand.createRequestCommand(321, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis);
        assert (response != null);
        if (0 == response.getCode()) {
            return QueryConsumeQueueResponseBody.decode(response.getBody(), QueryConsumeQueueResponseBody.class);
        }
        throw new MQClientException(response.getCode(), response.getRemark());
    }

    public void checkClientInBroker(String brokerAddr, String consumerGroup, String clientId, SubscriptionData subscriptionData, long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException {
        RemotingCommand request = RemotingCommand.createRequestCommand(46, null);
        CheckClientRequestBody requestBody = new CheckClientRequestBody();
        requestBody.setClientId(clientId);
        requestBody.setGroup(consumerGroup);
        requestBody.setSubscriptionData(subscriptionData);
        request.setBody(requestBody.encode());
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis);
        assert (response != null);
        if (0 != response.getCode()) {
            throw new MQClientException(response.getCode(), response.getRemark());
        }
    }

    public boolean resumeCheckHalfMessage(String addr, String topic, String msgId, long timeoutMillis) throws RemotingException, InterruptedException {
        ResumeCheckHalfMessageRequestHeader requestHeader = new ResumeCheckHalfMessageRequestHeader();
        requestHeader.setTopic(topic);
        requestHeader.setMsgId(msgId);
        RemotingCommand request = RemotingCommand.createRequestCommand(323, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                return true;
            }
        }
        log.error("Failed to resume half message check logic. Remark={}", (Object)response.getRemark());
        return false;
    }

    public void setMessageRequestMode(String brokerAddr, String topic, String consumerGroup, MessageRequestMode mode, int popShareQueueNum, long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException {
        RemotingCommand request = RemotingCommand.createRequestCommand(401, null);
        SetMessageRequestModeRequestBody requestBody = new SetMessageRequestModeRequestBody();
        requestBody.setTopic(topic);
        requestBody.setConsumerGroup(consumerGroup);
        requestBody.setMode(mode);
        requestBody.setPopShareQueueNum(popShareQueueNum);
        request.setBody(requestBody.encode());
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis);
        assert (response != null);
        if (0 != response.getCode()) {
            throw new MQClientException(response.getCode(), response.getRemark());
        }
    }

    public TopicConfigAndQueueMapping getTopicConfig(String brokerAddr, String topic, long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
        GetTopicConfigRequestHeader header = new GetTopicConfigRequestHeader();
        header.setTopic(topic);
        header.setLo(true);
        RemotingCommand request = RemotingCommand.createRequestCommand(351, header);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                return RemotingSerializable.decode(response.getBody(), TopicConfigAndQueueMapping.class);
            }
            case 17: {
                break;
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark());
    }

    public void createStaticTopic(String addr, String defaultTopic, TopicConfig topicConfig, TopicQueueMappingDetail topicQueueMappingDetail, boolean force, long timeoutMillis) throws RemotingException, InterruptedException, MQBrokerException {
        CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
        requestHeader.setTopic(topicConfig.getTopicName());
        requestHeader.setDefaultTopic(defaultTopic);
        requestHeader.setReadQueueNums(topicConfig.getReadQueueNums());
        requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums());
        requestHeader.setPerm(topicConfig.getPerm());
        requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());
        requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
        requestHeader.setOrder(topicConfig.isOrder());
        requestHeader.setForce(force);
        RemotingCommand request = RemotingCommand.createRequestCommand(513, requestHeader);
        request.setBody(topicQueueMappingDetail.encode());
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                return;
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark());
    }

    public GroupForbidden updateAndGetGroupForbidden(String addr, UpdateGroupForbiddenRequestHeader requestHeader, long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
        RemotingCommand request = RemotingCommand.createRequestCommand(353, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                return RemotingSerializable.decode(response.getBody(), GroupForbidden.class);
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
    }

    public void resetMasterFlushOffset(String brokerAddr, long masterFlushOffset) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
        ResetMasterFlushOffsetHeader requestHeader = new ResetMasterFlushOffsetHeader();
        requestHeader.setMasterFlushOffset(masterFlushOffset);
        RemotingCommand request = RemotingCommand.createRequestCommand(908, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, 3000L);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                return;
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark(), brokerAddr);
    }

    public HARuntimeInfo getBrokerHAStatus(String brokerAddr, long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
        RemotingCommand request = RemotingCommand.createRequestCommand(907, null);
        RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, timeoutMillis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                return HARuntimeInfo.decode(response.getBody(), HARuntimeInfo.class);
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark());
    }

    public GetMetaDataResponseHeader getControllerMetaData(String controllerAddress) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, RemotingCommandException, MQBrokerException {
        RemotingCommand request = RemotingCommand.createRequestCommand(1005, null);
        RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000L);
        assert (response != null);
        if (response.getCode() == 0) {
            return response.decodeCommandCustomHeader(GetMetaDataResponseHeader.class);
        }
        throw new MQBrokerException(response.getCode(), response.getRemark());
    }

    public BrokerReplicasInfo getInSyncStateData(String controllerAddress, List<String> brokers) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException, RemotingCommandException {
        GetMetaDataResponseHeader controllerMetaData = this.getControllerMetaData(controllerAddress);
        assert (controllerMetaData != null);
        assert (controllerMetaData.getControllerLeaderAddress() != null);
        String leaderAddress = controllerMetaData.getControllerLeaderAddress();
        RemotingCommand request = RemotingCommand.createRequestCommand(1006, null);
        byte[] body = RemotingSerializable.encode(brokers);
        request.setBody(body);
        RemotingCommand response = this.remotingClient.invokeSync(leaderAddress, request, 3000L);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                return RemotingSerializable.decode(response.getBody(), BrokerReplicasInfo.class);
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark());
    }

    public EpochEntryCache getBrokerEpochCache(String brokerAddr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
        RemotingCommand request = RemotingCommand.createRequestCommand(1007, null);
        RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, 3000L);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                return RemotingSerializable.decode(response.getBody(), EpochEntryCache.class);
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark());
    }

    public Map<String, Properties> getControllerConfig(List<String> controllerServers, long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException, UnsupportedEncodingException {
        List<String> invokeControllerServers;
        List<String> list = invokeControllerServers = controllerServers == null || controllerServers.isEmpty() ? this.remotingClient.getNameServerAddressList() : controllerServers;
        if (invokeControllerServers == null || invokeControllerServers.isEmpty()) {
            return null;
        }
        RemotingCommand request = RemotingCommand.createRequestCommand(1010, null);
        HashMap<String, Properties> configMap = new HashMap<String, Properties>(4);
        for (String controller : invokeControllerServers) {
            RemotingCommand response = this.remotingClient.invokeSync(controller, request, timeoutMillis);
            assert (response != null);
            if (0 == response.getCode()) {
                configMap.put(controller, MixAll.string2Properties(new String(response.getBody(), "UTF-8")));
                continue;
            }
            throw new MQClientException(response.getCode(), response.getRemark());
        }
        return configMap;
    }

    public void updateControllerConfig(Properties properties, List<String> controllers, long timeoutMillis) throws InterruptedException, RemotingConnectException, UnsupportedEncodingException, RemotingSendRequestException, RemotingTimeoutException, MQClientException {
        String str = MixAll.properties2String(properties);
        if (str.length() < 1 || controllers == null || controllers.isEmpty()) {
            return;
        }
        RemotingCommand request = RemotingCommand.createRequestCommand(1009, null);
        request.setBody(str.getBytes("UTF-8"));
        RemotingCommand errResponse = null;
        block3: for (String controller : controllers) {
            RemotingCommand response = this.remotingClient.invokeSync(controller, request, timeoutMillis);
            assert (response != null);
            switch (response.getCode()) {
                case 0: {
                    continue block3;
                }
            }
            errResponse = response;
        }
        if (errResponse != null) {
            throw new MQClientException(errResponse.getCode(), errResponse.getRemark());
        }
    }

    public Pair<ElectMasterResponseHeader, BrokerMemberGroup> electMaster(String controllerAddr, String clusterName, String brokerName, Long brokerId) throws MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, RemotingCommandException {
        GetMetaDataResponseHeader controllerMetaData = this.getControllerMetaData(controllerAddr);
        assert (controllerMetaData != null);
        assert (controllerMetaData.getControllerLeaderAddress() != null);
        String leaderAddress = controllerMetaData.getControllerLeaderAddress();
        ElectMasterRequestHeader electRequestHeader = ElectMasterRequestHeader.ofAdminTrigger(clusterName, brokerName, brokerId);
        RemotingCommand request = RemotingCommand.createRequestCommand(1002, electRequestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(leaderAddress, request, 3000L);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                BrokerMemberGroup brokerMemberGroup = RemotingSerializable.decode(response.getBody(), BrokerMemberGroup.class);
                ElectMasterResponseHeader responseHeader = response.decodeCommandCustomHeader(ElectMasterResponseHeader.class);
                return new Pair<ElectMasterResponseHeader, BrokerMemberGroup>(responseHeader, brokerMemberGroup);
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark());
    }

    public void cleanControllerBrokerData(String controllerAddr, String clusterName, String brokerName, String brokerControllerIdsToClean, boolean isCleanLivingBroker) throws RemotingException, InterruptedException, MQBrokerException {
        GetMetaDataResponseHeader controllerMetaData = this.getControllerMetaData(controllerAddr);
        assert (controllerMetaData != null);
        assert (controllerMetaData.getControllerLeaderAddress() != null);
        String leaderAddress = controllerMetaData.getControllerLeaderAddress();
        CleanControllerBrokerDataRequestHeader cleanHeader = new CleanControllerBrokerDataRequestHeader(clusterName, brokerName, brokerControllerIdsToClean, isCleanLivingBroker);
        RemotingCommand request = RemotingCommand.createRequestCommand(1011, cleanHeader);
        RemotingCommand response = this.remotingClient.invokeSync(leaderAddress, request, 3000L);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                return;
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark());
    }

    public void createUser(String addr, UserInfo userInfo, long millis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
        CreateUserRequestHeader requestHeader = new CreateUserRequestHeader(userInfo.getUsername());
        RemotingCommand request = RemotingCommand.createRequestCommand(3001, requestHeader);
        request.setBody(RemotingSerializable.encode(userInfo));
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, millis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                return;
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark());
    }

    public void updateUser(String addr, UserInfo userInfo, long millis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
        UpdateUserRequestHeader requestHeader = new UpdateUserRequestHeader(userInfo.getUsername());
        RemotingCommand request = RemotingCommand.createRequestCommand(3002, requestHeader);
        request.setBody(RemotingSerializable.encode(userInfo));
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, millis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                return;
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark());
    }

    public void deleteUser(String addr, String username, long millis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
        DeleteUserRequestHeader requestHeader = new DeleteUserRequestHeader(username);
        RemotingCommand request = RemotingCommand.createRequestCommand(3003, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, millis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                return;
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark());
    }

    public UserInfo getUser(String addr, String username, long millis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
        GetUserRequestHeader requestHeader = new GetUserRequestHeader(username);
        RemotingCommand request = RemotingCommand.createRequestCommand(3004, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, millis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                return RemotingSerializable.decode(response.getBody(), UserInfo.class);
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark());
    }

    public List<UserInfo> listUser(String addr, String filter, long millis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
        ListUsersRequestHeader requestHeader = new ListUsersRequestHeader(filter);
        RemotingCommand request = RemotingCommand.createRequestCommand(3005, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, millis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                return RemotingSerializable.decodeList(response.getBody(), UserInfo.class);
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark());
    }

    public void createAcl(String addr, AclInfo aclInfo, long millis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
        CreateAclRequestHeader requestHeader = new CreateAclRequestHeader(aclInfo.getSubject());
        RemotingCommand request = RemotingCommand.createRequestCommand(3006, requestHeader);
        request.setBody(RemotingSerializable.encode(aclInfo));
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, millis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                return;
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark());
    }

    public void updateAcl(String addr, AclInfo aclInfo, long millis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
        UpdateAclRequestHeader requestHeader = new UpdateAclRequestHeader(aclInfo.getSubject());
        RemotingCommand request = RemotingCommand.createRequestCommand(3007, requestHeader);
        request.setBody(RemotingSerializable.encode(aclInfo));
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, millis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                return;
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark());
    }

    public void deleteAcl(String addr, String subject, String resource, long millis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
        DeleteAclRequestHeader requestHeader = new DeleteAclRequestHeader(subject, resource);
        RemotingCommand request = RemotingCommand.createRequestCommand(3008, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, millis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                return;
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark());
    }

    public AclInfo getAcl(String addr, String subject, long millis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
        GetAclRequestHeader requestHeader = new GetAclRequestHeader(subject);
        RemotingCommand request = RemotingCommand.createRequestCommand(3009, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, millis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                return RemotingSerializable.decode(response.getBody(), AclInfo.class);
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark());
    }

    public List<AclInfo> listAcl(String addr, String subjectFilter, String resourceFilter, long millis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
        ListAclsRequestHeader requestHeader = new ListAclsRequestHeader(subjectFilter, resourceFilter);
        RemotingCommand request = RemotingCommand.createRequestCommand(3010, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, millis);
        assert (response != null);
        switch (response.getCode()) {
            case 0: {
                return RemotingSerializable.decodeList(response.getBody(), AclInfo.class);
            }
        }
        throw new MQBrokerException(response.getCode(), response.getRemark());
    }

    static {
        System.setProperty("rocketmq.remoting.version", Integer.toString(MQVersion.CURRENT_VERSION));
    }
}

