package org.apache.rocketmq.broker.out;

import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.security.Security;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.impl.consumer.PullResultExt;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.AbstractBrokerRunnable;
import org.apache.rocketmq.common.BrokerIdentity;
import org.apache.rocketmq.common.LockCallback;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.UnlockCallback;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageBatch;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.namesrv.DefaultTopAddressing;
import org.apache.rocketmq.common.namesrv.TopAddressing;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.protocol.BrokerSyncInfo;
import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.body.ConsumerOffsetSerializeWrapper;
import org.apache.rocketmq.remoting.protocol.body.GetBrokerMemberGroupResponseBody;
import org.apache.rocketmq.remoting.protocol.body.KVTable;
import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.body.LockBatchResponseBody;
import org.apache.rocketmq.remoting.protocol.body.MessageRequestModeSerializeWrapper;
import org.apache.rocketmq.remoting.protocol.body.RegisterBrokerBody;
import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
import org.apache.rocketmq.remoting.protocol.body.TopicConfigAndMappingSerializeWrapper;
import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.header.ExchangeHAInfoRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ExchangeHAInfoResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.GetBrokerMemberGroupRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.GetMinOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetMinOffsetResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.PullMessageResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeaderV2;
import org.apache.rocketmq.remoting.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.RegisterBrokerToControllerRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.RegisterBrokerToControllerResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.BrokerHeartbeatRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.GetRouteInfoRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.QueryDataVersionRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.QueryDataVersionResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.RegisterBrokerRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.RegisterBrokerResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
import org.apache.rocketmq.remoting.protocol.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.rpc.ClientMetadata;
import org.apache.rocketmq.remoting.rpc.RpcClient;
import org.apache.rocketmq.remoting.rpc.RpcClientImpl;
import org.apache.rocketmq.remoting.rpchook.DynamicalExtFieldRPCHook;
import org.apache.rocketmq.store.timer.TimerCheckpoint;
import org.apache.rocketmq.store.timer.TimerMetrics;

/* loaded from: input_file:org/apache/rocketmq/broker/out/BrokerOuterAPI.class */
public class BrokerOuterAPI {
    private static final Logger LOGGER;
    private final RemotingClient remotingClient;
    private final TopAddressing topAddressing;
    private String nameSrvAddr;
    private BrokerFixedThreadPoolExecutor brokerOuterExecutor;
    private ClientMetadata clientMetadata;
    private RpcClient rpcClient;
    static final /* synthetic */ boolean $assertionsDisabled;

    public BrokerOuterAPI(NettyClientConfig nettyClientConfig) {
        this(nettyClientConfig, new DynamicalExtFieldRPCHook(), new ClientMetadata());
    }

    private BrokerOuterAPI(NettyClientConfig nettyClientConfig, RPCHook rPCHook, ClientMetadata clientMetadata) {
        this.topAddressing = new DefaultTopAddressing(MixAll.getWSAddr());
        this.nameSrvAddr = null;
        this.brokerOuterExecutor = new BrokerFixedThreadPoolExecutor(4, 10, 1L, TimeUnit.MINUTES, (BlockingQueue<Runnable>) new ArrayBlockingQueue(32), (ThreadFactory) new ThreadFactoryImpl("brokerOutApi_thread_", true));
        this.remotingClient = new NettyRemotingClient(nettyClientConfig);
        this.clientMetadata = clientMetadata;
        this.remotingClient.registerRPCHook(rPCHook);
        this.rpcClient = new RpcClientImpl(this.clientMetadata, this.remotingClient);
    }

    public void start() {
        this.remotingClient.start();
    }

    public void shutdown() {
        this.remotingClient.shutdown();
        this.brokerOuterExecutor.shutdown();
    }

    public List<String> getNameServerAddressList() {
        return this.remotingClient.getNameServerAddressList();
    }

    public String fetchNameServerAddr() {
        try {
            String fetchNSAddr = this.topAddressing.fetchNSAddr();
            if (!UtilAll.isBlank(fetchNSAddr) && !fetchNSAddr.equals(this.nameSrvAddr)) {
                LOGGER.info("name server address changed, old: {} new: {}", this.nameSrvAddr, fetchNSAddr);
                updateNameServerAddressList(fetchNSAddr);
                this.nameSrvAddr = fetchNSAddr;
                return this.nameSrvAddr;
            }
        } catch (Exception e) {
            LOGGER.error("fetchNameServerAddr Exception", e);
        }
        return this.nameSrvAddr;
    }

    public List<String> dnsLookupAddressByDomain(String str) {
        ArrayList arrayList = new ArrayList();
        try {
            Security.setProperty("networkaddress.cache.ttl", "10");
            int indexOf = str.indexOf(":");
            String substring = str.substring(indexOf);
            for (InetAddress inetAddress : InetAddress.getAllByName(str.substring(0, indexOf))) {
                arrayList.add(inetAddress.getHostAddress() + substring);
            }
            LOGGER.info("dns lookup address by domain success, domain={}, result={}", str, arrayList);
        } catch (Exception e) {
            LOGGER.error("dns lookup address by domain error, domain={}", str, e);
        }
        return arrayList;
    }

    public boolean checkAddressReachable(String str) {
        return this.remotingClient.isAddressReachable(str);
    }

    public void updateNameServerAddressList(String str) {
        this.remotingClient.updateNameServerAddressList(new ArrayList(Arrays.asList(str.split(";"))));
    }

    public void updateNameServerAddressListByDnsLookup(String str) {
        this.remotingClient.updateNameServerAddressList(dnsLookupAddressByDomain(str));
    }

    public BrokerMemberGroup syncBrokerMemberGroup(String str, String str2) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
        return syncBrokerMemberGroup(str, str2, false);
    }

    public BrokerMemberGroup syncBrokerMemberGroup(String str, String str2, boolean z) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
        return z ? getBrokerMemberGroupCompatible(str, str2) : getBrokerMemberGroup(str, str2);
    }

    public BrokerMemberGroup getBrokerMemberGroup(String str, String str2) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
        BrokerMemberGroup brokerMemberGroup = new BrokerMemberGroup(str, str2);
        GetBrokerMemberGroupRequestHeader getBrokerMemberGroupRequestHeader = new GetBrokerMemberGroupRequestHeader();
        getBrokerMemberGroupRequestHeader.setClusterName(str);
        getBrokerMemberGroupRequestHeader.setBrokerName(str2);
        RemotingCommand invokeSync = this.remotingClient.invokeSync((String) null, RemotingCommand.createRequestCommand(901, getBrokerMemberGroupRequestHeader), 3000L);
        if (!$assertionsDisabled && invokeSync == null) {
            throw new AssertionError();
        }
        switch (invokeSync.getCode()) {
            case 0:
                byte[] body = invokeSync.getBody();
                if (body != null) {
                    return ((GetBrokerMemberGroupResponseBody) GetBrokerMemberGroupResponseBody.decode(body, GetBrokerMemberGroupResponseBody.class)).getBrokerMemberGroup();
                }
                break;
        }
        return brokerMemberGroup;
    }

    public BrokerMemberGroup getBrokerMemberGroupCompatible(String str, String str2) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
        BrokerMemberGroup brokerMemberGroup = new BrokerMemberGroup(str, str2);
        GetRouteInfoRequestHeader getRouteInfoRequestHeader = new GetRouteInfoRequestHeader();
        getRouteInfoRequestHeader.setTopic("rmq_sys_SYNC_BROKER_MEMBER_" + str2);
        RemotingCommand invokeSync = this.remotingClient.invokeSync((String) null, RemotingCommand.createRequestCommand(105, getRouteInfoRequestHeader), 3000L);
        if (!$assertionsDisabled && invokeSync == null) {
            throw new AssertionError();
        }
        switch (invokeSync.getCode()) {
            case 0:
                byte[] body = invokeSync.getBody();
                if (body != null) {
                    Iterator it = ((TopicRouteData) TopicRouteData.decode(body, TopicRouteData.class)).getBrokerDatas().iterator();
                    while (true) {
                        if (it.hasNext()) {
                            BrokerData brokerData = (BrokerData) it.next();
                            if (brokerData != null && brokerData.getBrokerName().equals(str2) && brokerData.getCluster().equals(str)) {
                                brokerMemberGroup.getBrokerAddrs().putAll(brokerData.getBrokerAddrs());
                            }
                        }
                    }
                    return brokerMemberGroup;
                }
                break;
        }
        return brokerMemberGroup;
    }

    public void sendHeartbeatViaDataVersion(String str, String str2, String str3, Long l, final int i, final DataVersion dataVersion, boolean z) {
        List<String> availableNameSrvList = this.remotingClient.getAvailableNameSrvList();
        if (availableNameSrvList == null || availableNameSrvList.size() <= 0) {
            return;
        }
        final QueryDataVersionRequestHeader queryDataVersionRequestHeader = new QueryDataVersionRequestHeader();
        queryDataVersionRequestHeader.setBrokerAddr(str2);
        queryDataVersionRequestHeader.setBrokerName(str3);
        queryDataVersionRequestHeader.setBrokerId(l);
        queryDataVersionRequestHeader.setClusterName(str);
        for (final String str4 : availableNameSrvList) {
            this.brokerOuterExecutor.execute(new AbstractBrokerRunnable(new BrokerIdentity(str, str3, l.longValue(), z)) { // from class: org.apache.rocketmq.broker.out.BrokerOuterAPI.1
                public void run0() {
                    RemotingCommand createRequestCommand = RemotingCommand.createRequestCommand(322, queryDataVersionRequestHeader);
                    createRequestCommand.setBody(dataVersion.encode());
                    try {
                        BrokerOuterAPI.this.remotingClient.invokeOneway(str4, createRequestCommand, i);
                    } catch (Exception e) {
                        BrokerOuterAPI.LOGGER.error("sendHeartbeat Exception " + str4, e);
                    }
                }
            });
        }
    }

    public void sendHeartbeat(String str, String str2, String str3, Long l, final int i, boolean z) {
        List<String> availableNameSrvList = this.remotingClient.getAvailableNameSrvList();
        final BrokerHeartbeatRequestHeader brokerHeartbeatRequestHeader = new BrokerHeartbeatRequestHeader();
        brokerHeartbeatRequestHeader.setClusterName(str);
        brokerHeartbeatRequestHeader.setBrokerAddr(str2);
        brokerHeartbeatRequestHeader.setBrokerName(str3);
        if (availableNameSrvList == null || availableNameSrvList.size() <= 0) {
            return;
        }
        for (final String str4 : availableNameSrvList) {
            this.brokerOuterExecutor.execute(new AbstractBrokerRunnable(new BrokerIdentity(str, str3, l.longValue(), z)) { // from class: org.apache.rocketmq.broker.out.BrokerOuterAPI.2
                public void run0() {
                    try {
                        BrokerOuterAPI.this.remotingClient.invokeOneway(str4, RemotingCommand.createRequestCommand(904, brokerHeartbeatRequestHeader), i);
                    } catch (Exception e) {
                        BrokerOuterAPI.LOGGER.error("sendHeartbeat Exception " + str4, e);
                    }
                }
            });
        }
    }

    public BrokerSyncInfo retrieveBrokerHaInfo(String str) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException, RemotingCommandException {
        ExchangeHAInfoRequestHeader exchangeHAInfoRequestHeader = new ExchangeHAInfoRequestHeader();
        exchangeHAInfoRequestHeader.setMasterHaAddress((String) null);
        RemotingCommand invokeSync = this.remotingClient.invokeSync(str, RemotingCommand.createRequestCommand(906, exchangeHAInfoRequestHeader), 3000L);
        if (!$assertionsDisabled && invokeSync == null) {
            throw new AssertionError();
        }
        switch (invokeSync.getCode()) {
            case 0:
                ExchangeHAInfoResponseHeader decodeCommandCustomHeader = invokeSync.decodeCommandCustomHeader(ExchangeHAInfoResponseHeader.class);
                return new BrokerSyncInfo(decodeCommandCustomHeader.getMasterHaAddress(), decodeCommandCustomHeader.getMasterFlushOffset().longValue(), decodeCommandCustomHeader.getMasterAddress());
            default:
                throw new MQBrokerException(invokeSync.getCode(), invokeSync.getRemark());
        }
    }

    public void sendBrokerHaInfo(String str, String str2, long j, String str3) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
        ExchangeHAInfoRequestHeader exchangeHAInfoRequestHeader = new ExchangeHAInfoRequestHeader();
        exchangeHAInfoRequestHeader.setMasterHaAddress(str2);
        exchangeHAInfoRequestHeader.setMasterFlushOffset(Long.valueOf(j));
        exchangeHAInfoRequestHeader.setMasterAddress(str3);
        RemotingCommand invokeSync = this.remotingClient.invokeSync(str, RemotingCommand.createRequestCommand(906, exchangeHAInfoRequestHeader), 3000L);
        if (!$assertionsDisabled && invokeSync == null) {
            throw new AssertionError();
        }
        switch (invokeSync.getCode()) {
            case 0:
                return;
            default:
                throw new MQBrokerException(invokeSync.getCode(), invokeSync.getRemark());
        }
    }

    public List<RegisterBrokerResult> registerBrokerAll(String str, String str2, String str3, long j, String str4, TopicConfigSerializeWrapper topicConfigSerializeWrapper, List<String> list, boolean z, int i, boolean z2, boolean z3, BrokerIdentity brokerIdentity) {
        return registerBrokerAll(str, str2, str3, j, str4, topicConfigSerializeWrapper, list, z, i, z2, z3, null, brokerIdentity);
    }

    public List<RegisterBrokerResult> registerBrokerAll(String str, String str2, String str3, long j, String str4, TopicConfigSerializeWrapper topicConfigSerializeWrapper, List<String> list, final boolean z, final int i, boolean z2, boolean z3, Long l, BrokerIdentity brokerIdentity) {
        final CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        List<String> availableNameSrvList = this.remotingClient.getAvailableNameSrvList();
        if (availableNameSrvList != null && availableNameSrvList.size() > 0) {
            final RegisterBrokerRequestHeader registerBrokerRequestHeader = new RegisterBrokerRequestHeader();
            registerBrokerRequestHeader.setBrokerAddr(str2);
            registerBrokerRequestHeader.setBrokerId(Long.valueOf(j));
            registerBrokerRequestHeader.setBrokerName(str3);
            registerBrokerRequestHeader.setClusterName(str);
            registerBrokerRequestHeader.setHaServerAddr(str4);
            registerBrokerRequestHeader.setEnableActingMaster(Boolean.valueOf(z2));
            registerBrokerRequestHeader.setCompressed(false);
            if (l != null) {
                registerBrokerRequestHeader.setHeartbeatTimeoutMillis(l);
            }
            RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
            registerBrokerBody.setTopicConfigSerializeWrapper(TopicConfigAndMappingSerializeWrapper.from(topicConfigSerializeWrapper));
            registerBrokerBody.setFilterServerList(list);
            final byte[] encode = registerBrokerBody.encode(z3);
            registerBrokerRequestHeader.setBodyCrc32(Integer.valueOf(UtilAll.crc32(encode)));
            final CountDownLatch countDownLatch = new CountDownLatch(availableNameSrvList.size());
            for (final String str5 : availableNameSrvList) {
                this.brokerOuterExecutor.execute(new AbstractBrokerRunnable(brokerIdentity) { // from class: org.apache.rocketmq.broker.out.BrokerOuterAPI.3
                    public void run0() {
                        try {
                            RegisterBrokerResult registerBroker = BrokerOuterAPI.this.registerBroker(str5, z, i, registerBrokerRequestHeader, encode);
                            if (registerBroker != null) {
                                copyOnWriteArrayList.add(registerBroker);
                            }
                            BrokerOuterAPI.LOGGER.info("Registering current broker to name server completed. TargetHost={}", str5);
                        } catch (Exception e) {
                            BrokerOuterAPI.LOGGER.error("Failed to register current broker to name server. TargetHost={}", str5, e);
                        } finally {
                            countDownLatch.countDown();
                        }
                    }
                });
            }
            try {
                if (!countDownLatch.await(i, TimeUnit.MILLISECONDS)) {
                    LOGGER.warn("Registration to one or more name servers does NOT complete within deadline. Timeout threshold: {}ms", Integer.valueOf(i));
                }
            } catch (InterruptedException e) {
            }
        }
        return copyOnWriteArrayList;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public RegisterBrokerResult registerBroker(String str, boolean z, int i, RegisterBrokerRequestHeader registerBrokerRequestHeader, byte[] bArr) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
        RemotingCommand createRequestCommand = RemotingCommand.createRequestCommand(103, registerBrokerRequestHeader);
        createRequestCommand.setBody(bArr);
        if (z) {
            try {
                this.remotingClient.invokeOneway(str, createRequestCommand, i);
                return null;
            } catch (RemotingTooMuchRequestException e) {
                return null;
            }
        }
        RemotingCommand invokeSync = this.remotingClient.invokeSync(str, createRequestCommand, i);
        if (!$assertionsDisabled && invokeSync == null) {
            throw new AssertionError();
        }
        switch (invokeSync.getCode()) {
            case 0:
                RegisterBrokerResponseHeader decodeCommandCustomHeader = invokeSync.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
                RegisterBrokerResult registerBrokerResult = new RegisterBrokerResult();
                registerBrokerResult.setMasterAddr(decodeCommandCustomHeader.getMasterAddr());
                registerBrokerResult.setHaServerAddr(decodeCommandCustomHeader.getHaServerAddr());
                if (invokeSync.getBody() != null) {
                    registerBrokerResult.setKvTable((KVTable) KVTable.decode(invokeSync.getBody(), KVTable.class));
                }
                return registerBrokerResult;
            default:
                throw new MQBrokerException(invokeSync.getCode(), invokeSync.getRemark(), registerBrokerRequestHeader == null ? null : registerBrokerRequestHeader.getBrokerAddr());
        }
    }

    public void unregisterBrokerAll(String str, String str2, String str3, long j) {
        List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
        if (nameServerAddressList != null) {
            for (String str4 : nameServerAddressList) {
                try {
                    unregisterBroker(str4, str, str2, str3, j);
                    LOGGER.info("unregisterBroker OK, NamesrvAddr: {}", str4);
                } catch (Exception e) {
                    LOGGER.warn("unregisterBroker Exception, NamesrvAddr: {}", str4, e);
                }
            }
        }
    }

    public void unregisterBroker(String str, String str2, String str3, String str4, long j) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
        UnRegisterBrokerRequestHeader unRegisterBrokerRequestHeader = new UnRegisterBrokerRequestHeader();
        unRegisterBrokerRequestHeader.setBrokerAddr(str3);
        unRegisterBrokerRequestHeader.setBrokerId(Long.valueOf(j));
        unRegisterBrokerRequestHeader.setBrokerName(str4);
        unRegisterBrokerRequestHeader.setClusterName(str2);
        RemotingCommand invokeSync = this.remotingClient.invokeSync(str, RemotingCommand.createRequestCommand(104, unRegisterBrokerRequestHeader), 3000L);
        if (!$assertionsDisabled && invokeSync == null) {
            throw new AssertionError();
        }
        switch (invokeSync.getCode()) {
            case 0:
                return;
            case 1:
                throw new MQBrokerException(invokeSync.getCode(), invokeSync.getRemark(), str3);
            default:
                throw new MQBrokerException(invokeSync.getCode(), invokeSync.getRemark(), str3);
        }
    }

    public List<Boolean> needRegister(final String str, final String str2, final String str3, final long j, final TopicConfigSerializeWrapper topicConfigSerializeWrapper, final int i, boolean z) {
        final CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
        if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
            final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
            for (final String str4 : nameServerAddressList) {
                this.brokerOuterExecutor.execute(new AbstractBrokerRunnable(new BrokerIdentity(str, str3, j, z)) { // from class: org.apache.rocketmq.broker.out.BrokerOuterAPI.4
                    public void run0() {
                        try {
                            try {
                                QueryDataVersionRequestHeader queryDataVersionRequestHeader = new QueryDataVersionRequestHeader();
                                queryDataVersionRequestHeader.setBrokerAddr(str2);
                                queryDataVersionRequestHeader.setBrokerId(Long.valueOf(j));
                                queryDataVersionRequestHeader.setBrokerName(str3);
                                queryDataVersionRequestHeader.setClusterName(str);
                                RemotingCommand createRequestCommand = RemotingCommand.createRequestCommand(322, queryDataVersionRequestHeader);
                                createRequestCommand.setBody(topicConfigSerializeWrapper.getDataVersion().encode());
                                RemotingCommand invokeSync = BrokerOuterAPI.this.remotingClient.invokeSync(str4, createRequestCommand, i);
                                DataVersion dataVersion = null;
                                Boolean bool = false;
                                switch (invokeSync.getCode()) {
                                    case 0:
                                        bool = invokeSync.decodeCommandCustomHeader(QueryDataVersionResponseHeader.class).getChanged();
                                        byte[] body = invokeSync.getBody();
                                        if (body != null) {
                                            dataVersion = (DataVersion) DataVersion.decode(body, DataVersion.class);
                                            if (!topicConfigSerializeWrapper.getDataVersion().equals(dataVersion)) {
                                                bool = true;
                                            }
                                        }
                                        if (bool == null || bool.booleanValue()) {
                                            copyOnWriteArrayList.add(Boolean.TRUE);
                                            break;
                                        }
                                        break;
                                }
                                Logger logger = BrokerOuterAPI.LOGGER;
                                Object[] objArr = new Object[4];
                                objArr[0] = str4;
                                objArr[1] = bool;
                                objArr[2] = topicConfigSerializeWrapper.getDataVersion();
                                objArr[3] = dataVersion == null ? "" : dataVersion;
                                logger.warn("Query data version from name server {} OK, changed {}, broker {},name server {}", objArr);
                                countDownLatch.countDown();
                            } catch (Exception e) {
                                copyOnWriteArrayList.add(Boolean.TRUE);
                                BrokerOuterAPI.LOGGER.error("Query data version from name server {}  Exception, {}", str4, e);
                                countDownLatch.countDown();
                            }
                        } catch (Throwable th) {
                            countDownLatch.countDown();
                            throw th;
                        }
                    }
                });
            }
            try {
                countDownLatch.await(i, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                LOGGER.error("query dataversion from nameserver countDownLatch await Exception", e);
            }
        }
        return copyOnWriteArrayList;
    }

    public TopicConfigAndMappingSerializeWrapper getAllTopicConfig(String str) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
        RemotingCommand invokeSync = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(true, str), RemotingCommand.createRequestCommand(21, (CommandCustomHeader) null), 3000L);
        if (!$assertionsDisabled && invokeSync == null) {
            throw new AssertionError();
        }
        switch (invokeSync.getCode()) {
            case 0:
                return (TopicConfigAndMappingSerializeWrapper) TopicConfigSerializeWrapper.decode(invokeSync.getBody(), TopicConfigAndMappingSerializeWrapper.class);
            default:
                throw new MQBrokerException(invokeSync.getCode(), invokeSync.getRemark(), str);
        }
    }

    public TimerCheckpoint getTimerCheckPoint(String str) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
        RemotingCommand invokeSync = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(true, str), RemotingCommand.createRequestCommand(60, (CommandCustomHeader) null), 3000L);
        if (!$assertionsDisabled && invokeSync == null) {
            throw new AssertionError();
        }
        switch (invokeSync.getCode()) {
            case 0:
                return TimerCheckpoint.decode(ByteBuffer.wrap(invokeSync.getBody()));
            default:
                throw new MQBrokerException(invokeSync.getCode(), invokeSync.getRemark(), str);
        }
    }

    public TimerMetrics.TimerMetricsSerializeWrapper getTimerMetrics(String str) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
        RemotingCommand invokeSync = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(true, str), RemotingCommand.createRequestCommand(61, (CommandCustomHeader) null), 3000L);
        if (!$assertionsDisabled && invokeSync == null) {
            throw new AssertionError();
        }
        switch (invokeSync.getCode()) {
            case 0:
                return (TimerMetrics.TimerMetricsSerializeWrapper) TimerMetrics.TimerMetricsSerializeWrapper.decode(invokeSync.getBody(), TimerMetrics.TimerMetricsSerializeWrapper.class);
            default:
                throw new MQBrokerException(invokeSync.getCode(), invokeSync.getRemark(), str);
        }
    }

    public ConsumerOffsetSerializeWrapper getAllConsumerOffset(String str) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
        RemotingCommand invokeSync = this.remotingClient.invokeSync(str, RemotingCommand.createRequestCommand(43, (CommandCustomHeader) null), 3000L);
        if (!$assertionsDisabled && invokeSync == null) {
            throw new AssertionError();
        }
        switch (invokeSync.getCode()) {
            case 0:
                return (ConsumerOffsetSerializeWrapper) ConsumerOffsetSerializeWrapper.decode(invokeSync.getBody(), ConsumerOffsetSerializeWrapper.class);
            default:
                throw new MQBrokerException(invokeSync.getCode(), invokeSync.getRemark(), str);
        }
    }

    public String getAllDelayOffset(String str) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException, UnsupportedEncodingException {
        RemotingCommand invokeSync = this.remotingClient.invokeSync(str, RemotingCommand.createRequestCommand(45, (CommandCustomHeader) null), 3000L);
        if (!$assertionsDisabled && invokeSync == null) {
            throw new AssertionError();
        }
        switch (invokeSync.getCode()) {
            case 0:
                return new String(invokeSync.getBody(), "UTF-8");
            default:
                throw new MQBrokerException(invokeSync.getCode(), invokeSync.getRemark(), str);
        }
    }

    public SubscriptionGroupWrapper getAllSubscriptionGroupConfig(String str) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
        RemotingCommand invokeSync = this.remotingClient.invokeSync(str, RemotingCommand.createRequestCommand(201, (CommandCustomHeader) null), 3000L);
        if (!$assertionsDisabled && invokeSync == null) {
            throw new AssertionError();
        }
        switch (invokeSync.getCode()) {
            case 0:
                return (SubscriptionGroupWrapper) SubscriptionGroupWrapper.decode(invokeSync.getBody(), SubscriptionGroupWrapper.class);
            default:
                throw new MQBrokerException(invokeSync.getCode(), invokeSync.getRemark(), str);
        }
    }

    public void registerRPCHook(RPCHook rPCHook) {
        this.remotingClient.registerRPCHook(rPCHook);
    }

    public void clearRPCHook() {
        this.remotingClient.clearRPCHook();
    }

    public long getMaxOffset(String str, String str2, int i, boolean z, boolean z2) throws RemotingException, MQBrokerException, InterruptedException {
        GetMaxOffsetRequestHeader getMaxOffsetRequestHeader = new GetMaxOffsetRequestHeader();
        getMaxOffsetRequestHeader.setTopic(str2);
        getMaxOffsetRequestHeader.setQueueId(Integer.valueOf(i));
        getMaxOffsetRequestHeader.setCommitted(z);
        RemotingCommand invokeSync = this.remotingClient.invokeSync(str, RemotingCommand.createRequestCommand(30, getMaxOffsetRequestHeader), 3000L);
        if (!$assertionsDisabled && invokeSync == null) {
            throw new AssertionError();
        }
        switch (invokeSync.getCode()) {
            case 0:
                return invokeSync.decodeCommandCustomHeader(GetMaxOffsetResponseHeader.class).getOffset().longValue();
            default:
                throw new MQBrokerException(invokeSync.getCode(), invokeSync.getRemark());
        }
    }

    public long getMinOffset(String str, String str2, int i, boolean z) throws RemotingException, MQBrokerException, InterruptedException {
        GetMinOffsetRequestHeader getMinOffsetRequestHeader = new GetMinOffsetRequestHeader();
        getMinOffsetRequestHeader.setTopic(str2);
        getMinOffsetRequestHeader.setQueueId(Integer.valueOf(i));
        RemotingCommand invokeSync = this.remotingClient.invokeSync(str, RemotingCommand.createRequestCommand(31, getMinOffsetRequestHeader), 3000L);
        if (!$assertionsDisabled && invokeSync == null) {
            throw new AssertionError();
        }
        switch (invokeSync.getCode()) {
            case 0:
                return invokeSync.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class).getOffset().longValue();
            default:
                throw new MQBrokerException(invokeSync.getCode(), invokeSync.getRemark());
        }
    }

    public void lockBatchMQAsync(String str, LockBatchRequestBody lockBatchRequestBody, long j, LockCallback lockCallback) throws RemotingException, InterruptedException {
        RemotingCommand createRequestCommand = RemotingCommand.createRequestCommand(41, (CommandCustomHeader) null);
        createRequestCommand.setBody(lockBatchRequestBody.encode());
        this.remotingClient.invokeAsync(str, createRequestCommand, j, responseFuture -> {
            if (lockCallback == null) {
                return;
            }
            try {
                RemotingCommand responseCommand = responseFuture.getResponseCommand();
                if (responseCommand != null) {
                    if (responseCommand.getCode() == 0) {
                        lockCallback.onSuccess(((LockBatchResponseBody) LockBatchResponseBody.decode(responseCommand.getBody(), LockBatchResponseBody.class)).getLockOKMQSet());
                    } else {
                        lockCallback.onException(new MQBrokerException(responseCommand.getCode(), responseCommand.getRemark()));
                    }
                }
            } catch (Throwable th) {
            }
        });
    }

    public void unlockBatchMQAsync(String str, UnlockBatchRequestBody unlockBatchRequestBody, long j, UnlockCallback unlockCallback) throws RemotingException, InterruptedException {
        RemotingCommand createRequestCommand = RemotingCommand.createRequestCommand(42, (CommandCustomHeader) null);
        createRequestCommand.setBody(unlockBatchRequestBody.encode());
        this.remotingClient.invokeAsync(str, createRequestCommand, j, responseFuture -> {
            if (unlockCallback == null) {
                return;
            }
            try {
                RemotingCommand responseCommand = responseFuture.getResponseCommand();
                if (responseCommand != null) {
                    if (responseCommand.getCode() == 0) {
                        unlockCallback.onSuccess();
                    } else {
                        unlockCallback.onException(new MQBrokerException(responseCommand.getCode(), responseCommand.getRemark()));
                    }
                }
            } catch (Throwable th) {
            }
        });
    }

    public RemotingClient getRemotingClient() {
        return this.remotingClient;
    }

    public SendResult sendMessageToSpecificBroker(String str, String str2, MessageExt messageExt, String str3, long j) throws RemotingException, MQBrokerException, InterruptedException {
        return processSendResponse(str2, messageExt, this.remotingClient.invokeSync(str, buildSendMessageRequest(messageExt, str3), j));
    }

    public CompletableFuture<SendResult> sendMessageToSpecificBrokerAsync(String str, String str2, MessageExt messageExt, String str3, long j) {
        RemotingCommand buildSendMessageRequest = buildSendMessageRequest(messageExt, str3);
        CompletableFuture<SendResult> completableFuture = new CompletableFuture<>();
        String msgId = messageExt.getMsgId();
        try {
            this.remotingClient.invokeAsync(str, buildSendMessageRequest, j, responseFuture -> {
                RemotingCommand responseCommand = responseFuture.getResponseCommand();
                if (null == responseCommand) {
                    completableFuture.complete(null);
                    return;
                }
                try {
                    completableFuture.complete(processSendResponse(str2, messageExt, responseCommand));
                } catch (MQBrokerException | RemotingCommandException e) {
                    LOGGER.error("processSendResponse in sendMessageToSpecificBrokerAsync failed, msgId=" + msgId, e);
                    completableFuture.completeExceptionally(e);
                }
            });
        } catch (Throwable th) {
            LOGGER.error("invokeAsync failed in sendMessageToSpecificBrokerAsync, msgId=" + msgId, th);
            completableFuture.completeExceptionally(th);
        }
        return completableFuture;
    }

    private static RemotingCommand buildSendMessageRequest(MessageExt messageExt, String str) {
        RemotingCommand createRequestCommand = RemotingCommand.createRequestCommand(310, buildSendMessageRequestHeaderV2(messageExt, str));
        createRequestCommand.setBody(messageExt.getBody());
        return createRequestCommand;
    }

    private static SendMessageRequestHeaderV2 buildSendMessageRequestHeaderV2(MessageExt messageExt, String str) {
        SendMessageRequestHeader sendMessageRequestHeader = new SendMessageRequestHeader();
        sendMessageRequestHeader.setProducerGroup(str);
        sendMessageRequestHeader.setTopic(messageExt.getTopic());
        sendMessageRequestHeader.setDefaultTopic("TBW102");
        sendMessageRequestHeader.setDefaultTopicQueueNums(8);
        sendMessageRequestHeader.setQueueId(Integer.valueOf(messageExt.getQueueId()));
        sendMessageRequestHeader.setSysFlag(Integer.valueOf(messageExt.getSysFlag()));
        sendMessageRequestHeader.setBornTimestamp(Long.valueOf(messageExt.getBornTimestamp()));
        sendMessageRequestHeader.setFlag(Integer.valueOf(messageExt.getFlag()));
        sendMessageRequestHeader.setProperties(MessageDecoder.messageProperties2String(messageExt.getProperties()));
        sendMessageRequestHeader.setReconsumeTimes(Integer.valueOf(messageExt.getReconsumeTimes()));
        sendMessageRequestHeader.setBatch(false);
        return SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(sendMessageRequestHeader);
    }

    private SendResult processSendResponse(String str, Message message, RemotingCommand remotingCommand) throws MQBrokerException, RemotingCommandException {
        switch (remotingCommand.getCode()) {
            case 0:
            case 10:
            case 11:
            case 12:
                SendStatus sendStatus = SendStatus.SEND_OK;
                switch (remotingCommand.getCode()) {
                    case 0:
                        sendStatus = SendStatus.SEND_OK;
                        break;
                    case 10:
                        sendStatus = SendStatus.FLUSH_DISK_TIMEOUT;
                        break;
                    case 11:
                        sendStatus = SendStatus.SLAVE_NOT_AVAILABLE;
                        break;
                    case 12:
                        sendStatus = SendStatus.FLUSH_SLAVE_TIMEOUT;
                        break;
                    default:
                        if (!$assertionsDisabled) {
                            throw new AssertionError();
                        }
                        break;
                }
                SendMessageResponseHeader decodeCommandCustomHeader = remotingCommand.decodeCommandCustomHeader(SendMessageResponseHeader.class);
                MessageQueue messageQueue = new MessageQueue(message.getTopic(), str, decodeCommandCustomHeader.getQueueId().intValue());
                String uniqID = MessageClientIDSetter.getUniqID(message);
                if (message instanceof MessageBatch) {
                    StringBuilder sb = new StringBuilder();
                    Iterator it = ((MessageBatch) message).iterator();
                    while (it.hasNext()) {
                        sb.append(sb.length() == 0 ? "" : TransactionalMessageUtil.OFFSET_SEPARATOR).append(MessageClientIDSetter.getUniqID((Message) it.next()));
                    }
                    uniqID = sb.toString();
                }
                SendResult sendResult = new SendResult(sendStatus, uniqID, decodeCommandCustomHeader.getMsgId(), messageQueue, decodeCommandCustomHeader.getQueueOffset().longValue());
                sendResult.setTransactionId(decodeCommandCustomHeader.getTransactionId());
                String str2 = (String) remotingCommand.getExtFields().get("MSG_REGION");
                String str3 = (String) remotingCommand.getExtFields().get("TRACE_ON");
                if (str2 == null || str2.isEmpty()) {
                    str2 = "DefaultRegion";
                }
                if (str3 == null || !str3.equals("false")) {
                    sendResult.setTraceOn(true);
                } else {
                    sendResult.setTraceOn(false);
                }
                sendResult.setRegionId(str2);
                return sendResult;
            default:
                throw new MQBrokerException(remotingCommand.getCode(), remotingCommand.getRemark());
        }
    }

    public BrokerFixedThreadPoolExecutor getBrokerOuterExecutor() {
        return this.brokerOuterExecutor;
    }

    public TopicRouteData getTopicRouteInfoFromNameServer(String str, long j) throws RemotingException, MQBrokerException, InterruptedException {
        return getTopicRouteInfoFromNameServer(str, j, true);
    }

    public TopicRouteData getTopicRouteInfoFromNameServer(String str, long j, boolean z) throws MQBrokerException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
        GetRouteInfoRequestHeader getRouteInfoRequestHeader = new GetRouteInfoRequestHeader();
        getRouteInfoRequestHeader.setTopic(str);
        RemotingCommand invokeSync = this.remotingClient.invokeSync((String) null, RemotingCommand.createRequestCommand(105, getRouteInfoRequestHeader), j);
        if (!$assertionsDisabled && invokeSync == null) {
            throw new AssertionError();
        }
        switch (invokeSync.getCode()) {
            case 0:
                byte[] body = invokeSync.getBody();
                if (body != null) {
                    return (TopicRouteData) TopicRouteData.decode(body, TopicRouteData.class);
                }
                break;
            case 17:
                if (z) {
                    LOGGER.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", str);
                    break;
                }
                break;
        }
        throw new MQBrokerException(invokeSync.getCode(), invokeSync.getRemark());
    }

    public ClusterInfo getBrokerClusterInfo() throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
        RemotingCommand invokeSync = this.remotingClient.invokeSync((String) null, RemotingCommand.createRequestCommand(106, (CommandCustomHeader) null), 3000L);
        if (!$assertionsDisabled && invokeSync == null) {
            throw new AssertionError();
        }
        switch (invokeSync.getCode()) {
            case 0:
                return (ClusterInfo) ClusterInfo.decode(invokeSync.getBody(), ClusterInfo.class);
            default:
                throw new MQBrokerException(invokeSync.getCode(), invokeSync.getRemark());
        }
    }

    public void forwardRequest(String str, RemotingCommand remotingCommand, long j, InvokeCallback invokeCallback) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException, RemotingTooMuchRequestException, RemotingConnectException {
        this.remotingClient.invokeAsync(str, remotingCommand, j, invokeCallback);
    }

    public void refreshMetadata() throws Exception {
        this.clientMetadata.refreshClusterInfo(getBrokerClusterInfo());
    }

    public ClientMetadata getClientMetadata() {
        return this.clientMetadata;
    }

    public RpcClient getRpcClient() {
        return this.rpcClient;
    }

    public MessageRequestModeSerializeWrapper getAllMessageRequestMode(String str) throws RemotingSendRequestException, RemotingConnectException, MQBrokerException, RemotingTimeoutException, InterruptedException {
        RemotingCommand invokeSync = this.remotingClient.invokeSync(str, RemotingCommand.createRequestCommand(402, (CommandCustomHeader) null), 3000L);
        if (!$assertionsDisabled && invokeSync == null) {
            throw new AssertionError();
        }
        switch (invokeSync.getCode()) {
            case 0:
                return (MessageRequestModeSerializeWrapper) MessageRequestModeSerializeWrapper.decode(invokeSync.getBody(), MessageRequestModeSerializeWrapper.class);
            default:
                throw new MQBrokerException(invokeSync.getCode(), invokeSync.getRemark(), str);
        }
    }

    public GetMetaDataResponseHeader getControllerMetaData(String str) throws Exception {
        RemotingCommand invokeSync = this.remotingClient.invokeSync(str, RemotingCommand.createRequestCommand(1005, (CommandCustomHeader) null), 3000L);
        if (!$assertionsDisabled && invokeSync == null) {
            throw new AssertionError();
        }
        if (invokeSync.getCode() == 0) {
            return invokeSync.decodeCommandCustomHeader(GetMetaDataResponseHeader.class);
        }
        throw new MQBrokerException(invokeSync.getCode(), invokeSync.getRemark());
    }

    public SyncStateSet alterSyncStateSet(String str, String str2, String str3, int i, Set<String> set, int i2) throws Exception {
        RemotingCommand createRequestCommand = RemotingCommand.createRequestCommand(1001, new AlterSyncStateSetRequestHeader(str2, str3, i));
        createRequestCommand.setBody(new SyncStateSet(set, i2).encode());
        RemotingCommand invokeSync = this.remotingClient.invokeSync(str, createRequestCommand, 3000L);
        if (!$assertionsDisabled && invokeSync == null) {
            throw new AssertionError();
        }
        switch (invokeSync.getCode()) {
            case 0:
                if ($assertionsDisabled || invokeSync.getBody() != null) {
                    return (SyncStateSet) RemotingSerializable.decode(invokeSync.getBody(), SyncStateSet.class);
                }
                throw new AssertionError();
            case 2007:
                throw new MQBrokerException(invokeSync.getCode(), "Controller leader was changed");
            default:
                throw new MQBrokerException(invokeSync.getCode(), invokeSync.getRemark());
        }
    }

    public RegisterBrokerToControllerResponseHeader registerBrokerToController(String str, String str2, String str3, String str4, long j, int i, long j2, int i2) throws Exception {
        RemotingCommand invokeSync = this.remotingClient.invokeSync(str, RemotingCommand.createRequestCommand(1003, new RegisterBrokerToControllerRequestHeader(str2, str3, str4, Long.valueOf(j), i, j2, i2)), 3000L);
        if (!$assertionsDisabled && invokeSync == null) {
            throw new AssertionError();
        }
        switch (invokeSync.getCode()) {
            case 0:
                return invokeSync.decodeCommandCustomHeader(RegisterBrokerToControllerResponseHeader.class);
            case 2007:
                throw new MQBrokerException(invokeSync.getCode(), "Controller leader was changed");
            default:
                throw new MQBrokerException(invokeSync.getCode(), invokeSync.getRemark());
        }
    }

    public Pair<GetReplicaInfoResponseHeader, SyncStateSet> getReplicaInfo(String str, String str2, String str3) throws Exception {
        RemotingCommand invokeSync = this.remotingClient.invokeSync(str, RemotingCommand.createRequestCommand(1004, new GetReplicaInfoRequestHeader(str2, str3)), 3000L);
        if (!$assertionsDisabled && invokeSync == null) {
            throw new AssertionError();
        }
        switch (invokeSync.getCode()) {
            case 0:
                GetReplicaInfoResponseHeader decodeCommandCustomHeader = invokeSync.decodeCommandCustomHeader(GetReplicaInfoResponseHeader.class);
                if ($assertionsDisabled || invokeSync.getBody() != null) {
                    return new Pair<>(decodeCommandCustomHeader, (SyncStateSet) RemotingSerializable.decode(invokeSync.getBody(), SyncStateSet.class));
                }
                throw new AssertionError();
            case 2007:
                throw new MQBrokerException(invokeSync.getCode(), "Controller leader was changed");
            case 2008:
                throw new MQBrokerException(invokeSync.getCode(), invokeSync.getRemark());
            default:
                throw new MQBrokerException(invokeSync.getCode(), invokeSync.getRemark());
        }
    }

    public void sendHeartbeatToController(final String str, String str2, String str3, String str4, Long l, final int i, boolean z, int i2, long j, long j2, long j3, int i3) {
        if (StringUtils.isEmpty(str)) {
            return;
        }
        final BrokerHeartbeatRequestHeader brokerHeartbeatRequestHeader = new BrokerHeartbeatRequestHeader();
        brokerHeartbeatRequestHeader.setClusterName(str2);
        brokerHeartbeatRequestHeader.setBrokerAddr(str3);
        brokerHeartbeatRequestHeader.setBrokerName(str4);
        brokerHeartbeatRequestHeader.setEpoch(Integer.valueOf(i2));
        brokerHeartbeatRequestHeader.setMaxOffset(Long.valueOf(j));
        brokerHeartbeatRequestHeader.setConfirmOffset(Long.valueOf(j2));
        brokerHeartbeatRequestHeader.setHeartbeatTimeoutMills(Long.valueOf(j3));
        brokerHeartbeatRequestHeader.setElectionPriority(Integer.valueOf(i3));
        this.brokerOuterExecutor.execute(new AbstractBrokerRunnable(new BrokerIdentity(str2, str4, l.longValue(), z)) { // from class: org.apache.rocketmq.broker.out.BrokerOuterAPI.5
            public void run0() {
                try {
                    BrokerOuterAPI.this.remotingClient.invokeOneway(str, RemotingCommand.createRequestCommand(904, brokerHeartbeatRequestHeader), i);
                } catch (Exception e) {
                    BrokerOuterAPI.LOGGER.error("Error happen when send heartbeat to controller {}", str, e);
                }
            }
        });
    }

    public PullResult pullMessageFromSpecificBroker(String str, String str2, String str3, String str4, int i, long j, int i2, long j2) throws MQBrokerException, RemotingException, InterruptedException {
        PullMessageRequestHeader pullMessageRequestHeader = new PullMessageRequestHeader();
        pullMessageRequestHeader.setConsumerGroup(str3);
        pullMessageRequestHeader.setTopic(str4);
        pullMessageRequestHeader.setQueueId(Integer.valueOf(i));
        pullMessageRequestHeader.setQueueOffset(Long.valueOf(j));
        pullMessageRequestHeader.setMaxMsgNums(Integer.valueOf(i2));
        pullMessageRequestHeader.setSysFlag(Integer.valueOf(PullSysFlag.buildSysFlag(false, false, true, false)));
        pullMessageRequestHeader.setCommitOffset(0L);
        pullMessageRequestHeader.setSuspendTimeoutMillis(0L);
        pullMessageRequestHeader.setSubscription("*");
        pullMessageRequestHeader.setSubVersion(Long.valueOf(System.currentTimeMillis()));
        pullMessageRequestHeader.setMaxMsgBytes(Integer.MAX_VALUE);
        pullMessageRequestHeader.setExpressionType("TAG");
        pullMessageRequestHeader.setBname(str);
        PullResultExt processPullResponse = processPullResponse(this.remotingClient.invokeSync(str2, RemotingCommand.createRequestCommand(11, pullMessageRequestHeader), j2), str2);
        processPullResult(processPullResponse, str, i);
        return processPullResponse;
    }

    public CompletableFuture<PullResult> pullMessageFromSpecificBrokerAsync(String str, String str2, String str3, String str4, int i, long j, int i2, long j2) throws RemotingException, InterruptedException {
        PullMessageRequestHeader pullMessageRequestHeader = new PullMessageRequestHeader();
        pullMessageRequestHeader.setConsumerGroup(str3);
        pullMessageRequestHeader.setTopic(str4);
        pullMessageRequestHeader.setQueueId(Integer.valueOf(i));
        pullMessageRequestHeader.setQueueOffset(Long.valueOf(j));
        pullMessageRequestHeader.setMaxMsgNums(Integer.valueOf(i2));
        pullMessageRequestHeader.setSysFlag(Integer.valueOf(PullSysFlag.buildSysFlag(false, false, true, false)));
        pullMessageRequestHeader.setCommitOffset(0L);
        pullMessageRequestHeader.setSuspendTimeoutMillis(0L);
        pullMessageRequestHeader.setSubscription("*");
        pullMessageRequestHeader.setSubVersion(Long.valueOf(System.currentTimeMillis()));
        pullMessageRequestHeader.setMaxMsgBytes(Integer.MAX_VALUE);
        pullMessageRequestHeader.setExpressionType("TAG");
        pullMessageRequestHeader.setBname(str);
        RemotingCommand createRequestCommand = RemotingCommand.createRequestCommand(11, pullMessageRequestHeader);
        CompletableFuture<PullResult> completableFuture = new CompletableFuture<>();
        this.remotingClient.invokeAsync(str2, createRequestCommand, j2, responseFuture -> {
            if (responseFuture.getCause() != null) {
                completableFuture.complete(new PullResult(PullStatus.NO_MATCHED_MSG, -1L, -1L, -1L, new ArrayList()));
                return;
            }
            try {
                PullResultExt processPullResponse = processPullResponse(responseFuture.getResponseCommand(), str2);
                processPullResult(processPullResponse, str, i);
                completableFuture.complete(processPullResponse);
            } catch (Exception e) {
                completableFuture.complete(new PullResult(PullStatus.NO_MATCHED_MSG, -1L, -1L, -1L, new ArrayList()));
            }
        });
        return completableFuture;
    }

    private PullResultExt processPullResponse(RemotingCommand remotingCommand, String str) throws MQBrokerException, RemotingCommandException {
        PullStatus pullStatus;
        PullStatus pullStatus2 = PullStatus.NO_NEW_MSG;
        switch (remotingCommand.getCode()) {
            case 0:
                pullStatus = PullStatus.FOUND;
                break;
            case 19:
                pullStatus = PullStatus.NO_NEW_MSG;
                break;
            case 20:
                pullStatus = PullStatus.NO_MATCHED_MSG;
                break;
            case 21:
                pullStatus = PullStatus.OFFSET_ILLEGAL;
                break;
            default:
                throw new MQBrokerException(remotingCommand.getCode(), remotingCommand.getRemark(), str);
        }
        PullMessageResponseHeader decodeCommandCustomHeader = remotingCommand.decodeCommandCustomHeader(PullMessageResponseHeader.class);
        return new PullResultExt(pullStatus, decodeCommandCustomHeader.getNextBeginOffset().longValue(), decodeCommandCustomHeader.getMinOffset().longValue(), decodeCommandCustomHeader.getMaxOffset().longValue(), (List) null, decodeCommandCustomHeader.getSuggestWhichBrokerId().longValue(), remotingCommand.getBody(), decodeCommandCustomHeader.getOffsetDelta());
    }

    private PullResult processPullResult(PullResultExt pullResultExt, String str, int i) {
        if (PullStatus.FOUND == pullResultExt.getPullStatus()) {
            List<MessageExt> decodesBatch = MessageDecoder.decodesBatch(ByteBuffer.wrap(pullResultExt.getMessageBinary()), true, true, true);
            for (MessageExt messageExt : decodesBatch) {
                if (Boolean.parseBoolean(messageExt.getProperty("TRAN_MSG"))) {
                    messageExt.setTransactionId(messageExt.getProperty("UNIQ_KEY"));
                }
                MessageAccessor.putProperty(messageExt, "MIN_OFFSET", Long.toString(pullResultExt.getMinOffset()));
                MessageAccessor.putProperty(messageExt, "MAX_OFFSET", Long.toString(pullResultExt.getMaxOffset()));
                messageExt.setBrokerName(str);
                messageExt.setQueueId(i);
                if (pullResultExt.getOffsetDelta() != null) {
                    messageExt.setQueueOffset(pullResultExt.getOffsetDelta().longValue() + messageExt.getQueueOffset());
                }
            }
            pullResultExt.setMsgFoundList(decodesBatch);
        }
        pullResultExt.setMessageBinary((byte[]) null);
        return pullResultExt;
    }

    static {
        $assertionsDisabled = !BrokerOuterAPI.class.desiredAssertionStatus();
        LOGGER = LoggerFactory.getLogger("RocketmqBroker");
    }
}
