package org.apache.rocketmq.broker.processor;

import com.alibaba.fastjson.JSON;
import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.opentelemetry.api.common.Attributes;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.filter.ConsumerFilterData;
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
import org.apache.rocketmq.broker.longpolling.PopRequest;
import org.apache.rocketmq.broker.metrics.BrokerMetricsConstant;
import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PopAckConstants;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.common.utils.DataConverter;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.metrics.RemotingMetricsManager;
import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.PopMessageResponseHeader;
import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.logfile.MappedFile;
import org.apache.rocketmq.store.pop.AckMsg;
import org.apache.rocketmq.store.pop.PopCheckPoint;

/* loaded from: input_file:org/apache/rocketmq/broker/processor/PopMessageProcessor.class */
public class PopMessageProcessor implements NettyRequestProcessor {
    private static final Logger POP_LOGGER = LoggerFactory.getLogger("RocketmqPop");
    private final BrokerController brokerController;
    String reviveTopic;
    private static final String BORN_TIME = "bornTime";
    private static final int POLLING_SUC = 0;
    private static final int POLLING_FULL = 1;
    private static final int POLLING_TIMEOUT = 2;
    private static final int NOT_POLLING = 3;
    private ConcurrentHashMap<String, ConcurrentHashMap<String, Byte>> topicCidMap;
    private ConcurrentLinkedHashMap<String, ConcurrentSkipListSet<PopRequest>> pollingMap;
    private PopBufferMergeService popBufferMergeService;
    private final Random random = new Random(System.currentTimeMillis());
    private AtomicLong totalPollingNum = new AtomicLong(0);
    private PopLongPollingService popLongPollingService = new PopLongPollingService();
    private QueueLockManager queueLockManager = new QueueLockManager();
    private AtomicLong ckMessageNumber = new AtomicLong();

    /* loaded from: input_file:org/apache/rocketmq/broker/processor/PopMessageProcessor$PopLongPollingService.class */
    public class PopLongPollingService extends ServiceThread {
        private long lastCleanTime = 0;

        public PopLongPollingService() {
        }

        public String getServiceName() {
            return PopMessageProcessor.this.brokerController.getBrokerConfig().isInBrokerContainer() ? PopMessageProcessor.this.brokerController.getBrokerIdentity().getIdentifier() + PopLongPollingService.class.getSimpleName() : PopLongPollingService.class.getSimpleName();
        }

        private void cleanUnusedResource() {
            String[] split;
            try {
                Iterator it = PopMessageProcessor.this.topicCidMap.entrySet().iterator();
                while (it.hasNext()) {
                    Map.Entry entry = (Map.Entry) it.next();
                    String str = (String) entry.getKey();
                    if (PopMessageProcessor.this.brokerController.getTopicConfigManager().selectTopicConfig(str) == null) {
                        PopMessageProcessor.POP_LOGGER.info("remove not exit topic {} in topicCidMap!", str);
                        it.remove();
                    } else {
                        Iterator it2 = ((ConcurrentHashMap) entry.getValue()).entrySet().iterator();
                        while (it2.hasNext()) {
                            String str2 = (String) ((Map.Entry) it2.next()).getKey();
                            if (!PopMessageProcessor.this.brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().containsKey(str2)) {
                                PopMessageProcessor.POP_LOGGER.info("remove not exit sub {} of topic {} in topicCidMap!", str2, str);
                                it2.remove();
                            }
                        }
                    }
                }
                Iterator it3 = PopMessageProcessor.this.pollingMap.entrySet().iterator();
                while (it3.hasNext()) {
                    Map.Entry entry2 = (Map.Entry) it3.next();
                    if (entry2.getKey() != null && (split = ((String) entry2.getKey()).split(ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR)) != null && split.length == PopMessageProcessor.NOT_POLLING) {
                        String str3 = split[PopMessageProcessor.POLLING_SUC];
                        String str4 = split[PopMessageProcessor.POLLING_FULL];
                        if (PopMessageProcessor.this.brokerController.getTopicConfigManager().selectTopicConfig(str3) == null) {
                            PopMessageProcessor.POP_LOGGER.info("remove not exit topic {} in pollingMap!", str3);
                            it3.remove();
                        } else if (!PopMessageProcessor.this.brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().containsKey(str4)) {
                            PopMessageProcessor.POP_LOGGER.info("remove not exit sub {} of topic {} in pollingMap!", str4, str3);
                            it3.remove();
                        }
                    }
                }
            } catch (Throwable th) {
                PopMessageProcessor.POP_LOGGER.error("cleanUnusedResource", th);
            }
            this.lastCleanTime = System.currentTimeMillis();
        }

        public void run() {
            int i = PopMessageProcessor.POLLING_SUC;
            while (!this.stopped) {
                try {
                    waitForRunning(20L);
                    i += PopMessageProcessor.POLLING_FULL;
                    if (!PopMessageProcessor.this.pollingMap.isEmpty()) {
                        long j = 0;
                        for (Map.Entry entry : PopMessageProcessor.this.pollingMap.entrySet()) {
                            String str = (String) entry.getKey();
                            ConcurrentSkipListSet concurrentSkipListSet = (ConcurrentSkipListSet) entry.getValue();
                            if (concurrentSkipListSet != null) {
                                while (true) {
                                    PopRequest popRequest = (PopRequest) concurrentSkipListSet.pollFirst();
                                    if (popRequest == null) {
                                        break;
                                    }
                                    if (!popRequest.isTimeout()) {
                                        if (concurrentSkipListSet.add(popRequest)) {
                                            break;
                                        } else {
                                            PopMessageProcessor.POP_LOGGER.info("polling, add fail again: {}", popRequest);
                                        }
                                    }
                                    if (PopMessageProcessor.this.brokerController.getBrokerConfig().isEnablePopLog()) {
                                        PopMessageProcessor.POP_LOGGER.info("timeout , wakeUp polling : {}", popRequest);
                                    }
                                    PopMessageProcessor.this.totalPollingNum.decrementAndGet();
                                    PopMessageProcessor.this.wakeUp(popRequest);
                                }
                                if (i >= 100) {
                                    long size = concurrentSkipListSet.size();
                                    j += size;
                                    if (size > 100) {
                                        PopMessageProcessor.POP_LOGGER.info("polling queue {} , size={} ", str, Long.valueOf(size));
                                    }
                                }
                            }
                        }
                        if (i >= 100) {
                            PopMessageProcessor.POP_LOGGER.info("pollingMapSize={},tmpTotalSize={},atomicTotalSize={},diffSize={}", new Object[]{Integer.valueOf(PopMessageProcessor.this.pollingMap.size()), Long.valueOf(j), Long.valueOf(PopMessageProcessor.this.totalPollingNum.get()), Long.valueOf(Math.abs(PopMessageProcessor.this.totalPollingNum.get() - j))});
                            PopMessageProcessor.this.totalPollingNum.set(j);
                            i = PopMessageProcessor.POLLING_SUC;
                        }
                        if (this.lastCleanTime == 0 || System.currentTimeMillis() - this.lastCleanTime > 300000) {
                            cleanUnusedResource();
                        }
                    }
                } catch (Throwable th) {
                    PopMessageProcessor.POP_LOGGER.error("checkPolling error", th);
                }
            }
            try {
                Iterator it = PopMessageProcessor.this.pollingMap.entrySet().iterator();
                while (it.hasNext()) {
                    ConcurrentSkipListSet concurrentSkipListSet2 = (ConcurrentSkipListSet) ((Map.Entry) it.next()).getValue();
                    while (true) {
                        PopRequest popRequest2 = (PopRequest) concurrentSkipListSet2.pollFirst();
                        if (popRequest2 != null) {
                            PopMessageProcessor.this.wakeUp(popRequest2);
                        }
                    }
                }
            } catch (Throwable th2) {
            }
        }
    }

    /* loaded from: input_file:org/apache/rocketmq/broker/processor/PopMessageProcessor$QueueLockManager.class */
    public class QueueLockManager extends ServiceThread {
        private ConcurrentHashMap<String, TimedLock> expiredLocalCache = new ConcurrentHashMap<>(100000);

        public QueueLockManager() {
        }

        public String buildLockKey(String str, String str2, int i) {
            return str + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + str2 + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + i;
        }

        public boolean tryLock(String str, String str2, int i) {
            return tryLock(buildLockKey(str, str2, i));
        }

        public boolean tryLock(String str) {
            TimedLock timedLock = this.expiredLocalCache.get(str);
            if (timedLock == null) {
                if (this.expiredLocalCache.putIfAbsent(str, new TimedLock()) != null) {
                    return false;
                }
                timedLock = this.expiredLocalCache.get(str);
            }
            if (timedLock == null) {
                return false;
            }
            return timedLock.tryLock();
        }

        public int cleanUnusedLock(long j) {
            Iterator<Map.Entry<String, TimedLock>> it = this.expiredLocalCache.entrySet().iterator();
            int i = PopMessageProcessor.POLLING_SUC;
            while (it.hasNext()) {
                Map.Entry<String, TimedLock> next = it.next();
                if (System.currentTimeMillis() - next.getValue().getLockTime() > j) {
                    it.remove();
                    PopMessageProcessor.POP_LOGGER.info("Remove unused queue lock: {}, {}, {}", new Object[]{next.getKey(), Long.valueOf(next.getValue().getLockTime()), Boolean.valueOf(next.getValue().isLock())});
                }
                i += PopMessageProcessor.POLLING_FULL;
            }
            return i;
        }

        public void unLock(String str, String str2, int i) {
            unLock(buildLockKey(str, str2, i));
        }

        public void unLock(String str) {
            TimedLock timedLock = this.expiredLocalCache.get(str);
            if (timedLock != null) {
                timedLock.unLock();
            }
        }

        public String getServiceName() {
            return PopMessageProcessor.this.brokerController.getBrokerConfig().isInBrokerContainer() ? PopMessageProcessor.this.brokerController.getBrokerIdentity().getIdentifier() + QueueLockManager.class.getSimpleName() : QueueLockManager.class.getSimpleName();
        }

        public void run() {
            while (!isStopped()) {
                try {
                    waitForRunning(60000L);
                    PopMessageProcessor.POP_LOGGER.info("QueueLockSize={}", Integer.valueOf(cleanUnusedLock(60000L)));
                } catch (Exception e) {
                    PopMessageProcessor.POP_LOGGER.error("QueueLockManager run error", e);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/rocketmq/broker/processor/PopMessageProcessor$TimedLock.class */
    public static class TimedLock {
        private final AtomicBoolean lock = new AtomicBoolean(true);
        private volatile long lockTime = System.currentTimeMillis();

        public boolean tryLock() {
            if (!this.lock.compareAndSet(true, false)) {
                return false;
            }
            this.lockTime = System.currentTimeMillis();
            return true;
        }

        public void unLock() {
            this.lock.set(true);
        }

        public boolean isLock() {
            return !this.lock.get();
        }

        public long getLockTime() {
            return this.lockTime;
        }
    }

    public PopMessageProcessor(BrokerController brokerController) {
        this.brokerController = brokerController;
        this.reviveTopic = PopAckConstants.buildClusterReviveTopic(this.brokerController.getBrokerConfig().getBrokerClusterName());
        this.topicCidMap = new ConcurrentHashMap<>(this.brokerController.getBrokerConfig().getPopPollingMapSize());
        this.pollingMap = new ConcurrentLinkedHashMap.Builder().maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize()).build();
        this.popBufferMergeService = new PopBufferMergeService(this.brokerController, this);
    }

    public PopLongPollingService getPopLongPollingService() {
        return this.popLongPollingService;
    }

    public PopBufferMergeService getPopBufferMergeService() {
        return this.popBufferMergeService;
    }

    public QueueLockManager getQueueLockManager() {
        return this.queueLockManager;
    }

    public static String genAckUniqueId(AckMsg ackMsg) {
        return ackMsg.getTopic() + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + ackMsg.getQueueId() + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + ackMsg.getAckOffset() + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + ackMsg.getConsumerGroup() + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + ackMsg.getPopTime() + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + ackMsg.getBrokerName() + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + "ack";
    }

    public static String genCkUniqueId(PopCheckPoint popCheckPoint) {
        return popCheckPoint.getTopic() + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + popCheckPoint.getQueueId() + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + popCheckPoint.getStartOffset() + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + popCheckPoint.getCId() + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + popCheckPoint.getPopTime() + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + popCheckPoint.getBrokerName() + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + "ck";
    }

    public RemotingCommand processRequest(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand) throws RemotingCommandException {
        remotingCommand.addExtField(BORN_TIME, String.valueOf(System.currentTimeMillis()));
        return processRequest(channelHandlerContext.channel(), remotingCommand);
    }

    public boolean rejectRequest() {
        return false;
    }

    public ConcurrentLinkedHashMap<String, ConcurrentSkipListSet<PopRequest>> getPollingMap() {
        return this.pollingMap;
    }

    public void notifyLongPollingRequestIfNeed(String str, String str2, int i) {
        if (this.brokerController.getMessageStore().getMaxOffsetInQueue(str, i) > Math.max(this.brokerController.getPopMessageProcessor().getPopBufferMergeService().getLatestOffset(str, str2, i), this.brokerController.getConsumerOffsetManager().queryOffset(str2, str, i))) {
            boolean notifyMessageArriving = this.brokerController.getPopMessageProcessor().notifyMessageArriving(str, str2, -1);
            if (!notifyMessageArriving) {
                notifyMessageArriving = this.brokerController.getPopMessageProcessor().notifyMessageArriving(str, str2, i);
            }
            this.brokerController.getNotificationProcessor().notifyMessageArriving(str, i);
            if (this.brokerController.getBrokerConfig().isEnablePopLog()) {
                POP_LOGGER.info("notify long polling request. topic:{}, group:{}, queueId:{}, success:{}", new Object[]{str, str2, Integer.valueOf(i), Boolean.valueOf(notifyMessageArriving)});
            }
        }
    }

    public void notifyMessageArriving(String str, int i) {
        ConcurrentHashMap<String, Byte> concurrentHashMap = this.topicCidMap.get(str);
        if (concurrentHashMap == null) {
            return;
        }
        for (Map.Entry<String, Byte> entry : concurrentHashMap.entrySet()) {
            if (i >= 0) {
                notifyMessageArriving(str, entry.getKey(), -1);
            }
            notifyMessageArriving(str, entry.getKey(), i);
        }
    }

    public boolean notifyMessageArriving(String str, String str2, int i) {
        PopRequest popRequest;
        ConcurrentSkipListSet concurrentSkipListSet = (ConcurrentSkipListSet) this.pollingMap.get(KeyBuilder.buildPollingKey(str, str2, i));
        if (concurrentSkipListSet == null || concurrentSkipListSet.isEmpty()) {
            return false;
        }
        Object pollFirst = concurrentSkipListSet.pollFirst();
        while (true) {
            popRequest = (PopRequest) pollFirst;
            if (popRequest == null || popRequest.getChannel().isActive()) {
                break;
            }
            this.totalPollingNum.decrementAndGet();
            pollFirst = concurrentSkipListSet.pollFirst();
        }
        if (popRequest == null) {
            return false;
        }
        this.totalPollingNum.decrementAndGet();
        if (this.brokerController.getBrokerConfig().isEnablePopLog()) {
            POP_LOGGER.info("lock release , new msg arrive , wakeUp : {}", popRequest);
        }
        return wakeUp(popRequest);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean wakeUp(PopRequest popRequest) {
        if (popRequest == null || !popRequest.complete() || !popRequest.getChannel().isActive()) {
            return false;
        }
        this.brokerController.getPullMessageExecutor().submit((Runnable) new RequestTask(() -> {
            try {
                RemotingCommand processRequest = processRequest(popRequest.getChannel(), popRequest.getRemotingCommand());
                if (processRequest != null) {
                    processRequest.setOpaque(popRequest.getRemotingCommand().getOpaque());
                    processRequest.markResponseType();
                    NettyRemotingAbstract.writeResponse(popRequest.getChannel(), popRequest.getRemotingCommand(), processRequest, future -> {
                        if (future.isSuccess()) {
                            return;
                        }
                        POP_LOGGER.error("ProcessRequestWrapper response to {} failed", popRequest.getChannel().remoteAddress(), future.cause());
                        POP_LOGGER.error(popRequest.toString());
                        POP_LOGGER.error(processRequest.toString());
                    });
                }
            } catch (RemotingCommandException e) {
                POP_LOGGER.error("ExecuteRequestWhenWakeup run", e);
            }
        }, popRequest.getChannel(), popRequest.getRemotingCommand()));
        return true;
    }

    private RemotingCommand processRequest(Channel channel, RemotingCommand remotingCommand) throws RemotingCommandException {
        TopicConfig selectTopicConfig;
        TopicConfig selectTopicConfig2;
        RemotingCommand createResponseCommand = RemotingCommand.createResponseCommand(PopMessageResponseHeader.class);
        PopMessageResponseHeader readCustomHeader = createResponseCommand.readCustomHeader();
        PopMessageRequestHeader decodeCommandCustomHeader = remotingCommand.decodeCommandCustomHeader(PopMessageRequestHeader.class);
        StringBuilder sb = new StringBuilder(64);
        StringBuilder sb2 = new StringBuilder(64);
        StringBuilder sb3 = POLLING_SUC;
        if (decodeCommandCustomHeader.isOrder()) {
            sb3 = new StringBuilder(64);
        }
        this.brokerController.getConsumerManager().compensateBasicConsumerInfo(decodeCommandCustomHeader.getConsumerGroup(), ConsumeType.CONSUME_POP, MessageModel.CLUSTERING);
        createResponseCommand.setOpaque(remotingCommand.getOpaque());
        if (this.brokerController.getBrokerConfig().isEnablePopLog()) {
            POP_LOGGER.info("receive PopMessage request command, {}", remotingCommand);
        }
        if (decodeCommandCustomHeader.isTimeoutTooMuch()) {
            createResponseCommand.setCode(210);
            createResponseCommand.setRemark(String.format("the broker[%s] poping message is timeout too much", this.brokerController.getBrokerConfig().getBrokerIP1()));
            return createResponseCommand;
        }
        if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
            createResponseCommand.setCode(16);
            createResponseCommand.setRemark(String.format("the broker[%s] poping message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1()));
            return createResponseCommand;
        }
        if (decodeCommandCustomHeader.getMaxMsgNums() > 32) {
            createResponseCommand.setCode(POLLING_FULL);
            createResponseCommand.setRemark(String.format("the broker[%s] poping message's num is greater than 32", this.brokerController.getBrokerConfig().getBrokerIP1()));
            return createResponseCommand;
        }
        TopicConfig selectTopicConfig3 = this.brokerController.getTopicConfigManager().selectTopicConfig(decodeCommandCustomHeader.getTopic());
        if (POLLING_SUC == selectTopicConfig3) {
            POP_LOGGER.error("The topic {} not exist, consumer: {} ", decodeCommandCustomHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));
            createResponseCommand.setCode(17);
            createResponseCommand.setRemark(String.format("topic[%s] not exist, apply first please! %s", decodeCommandCustomHeader.getTopic(), FAQUrl.suggestTodo("https://rocketmq.apache.org/docs/bestPractice/06FAQ")));
            return createResponseCommand;
        }
        if (!PermName.isReadable(selectTopicConfig3.getPerm())) {
            createResponseCommand.setCode(16);
            createResponseCommand.setRemark("the topic[" + decodeCommandCustomHeader.getTopic() + "] peeking message is forbidden");
            return createResponseCommand;
        }
        if (decodeCommandCustomHeader.getQueueId().intValue() >= selectTopicConfig3.getReadQueueNums()) {
            String format = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]", decodeCommandCustomHeader.getQueueId(), decodeCommandCustomHeader.getTopic(), Integer.valueOf(selectTopicConfig3.getReadQueueNums()), channel.remoteAddress());
            POP_LOGGER.warn(format);
            createResponseCommand.setCode(POLLING_FULL);
            createResponseCommand.setRemark(format);
            return createResponseCommand;
        }
        SubscriptionGroupConfig findSubscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(decodeCommandCustomHeader.getConsumerGroup());
        if (POLLING_SUC == findSubscriptionGroupConfig) {
            createResponseCommand.setCode(26);
            createResponseCommand.setRemark(String.format("subscription group [%s] does not exist, %s", decodeCommandCustomHeader.getConsumerGroup(), FAQUrl.suggestTodo("https://rocketmq.apache.org/docs/bestPractice/06FAQ")));
            return createResponseCommand;
        }
        if (!findSubscriptionGroupConfig.isConsumeEnable()) {
            createResponseCommand.setCode(16);
            createResponseCommand.setRemark("subscription group no permission, " + decodeCommandCustomHeader.getConsumerGroup());
            return createResponseCommand;
        }
        ExpressionMessageFilter expressionMessageFilter = POLLING_SUC;
        if (decodeCommandCustomHeader.getExp() == null || decodeCommandCustomHeader.getExp().length() <= 0) {
            try {
                this.brokerController.getConsumerManager().compensateSubscribeData(decodeCommandCustomHeader.getConsumerGroup(), decodeCommandCustomHeader.getTopic(), FilterAPI.build(decodeCommandCustomHeader.getTopic(), "*", "TAG"));
                String buildPopRetryTopic = KeyBuilder.buildPopRetryTopic(decodeCommandCustomHeader.getTopic(), decodeCommandCustomHeader.getConsumerGroup());
                this.brokerController.getConsumerManager().compensateSubscribeData(decodeCommandCustomHeader.getConsumerGroup(), buildPopRetryTopic, FilterAPI.build(buildPopRetryTopic, "*", "TAG"));
            } catch (Exception e) {
                POP_LOGGER.warn("Build default subscription error, group: {}", decodeCommandCustomHeader.getConsumerGroup());
            }
        } else {
            try {
                SubscriptionData build = FilterAPI.build(decodeCommandCustomHeader.getTopic(), decodeCommandCustomHeader.getExp(), decodeCommandCustomHeader.getExpType());
                this.brokerController.getConsumerManager().compensateSubscribeData(decodeCommandCustomHeader.getConsumerGroup(), decodeCommandCustomHeader.getTopic(), build);
                String buildPopRetryTopic2 = KeyBuilder.buildPopRetryTopic(decodeCommandCustomHeader.getTopic(), decodeCommandCustomHeader.getConsumerGroup());
                this.brokerController.getConsumerManager().compensateSubscribeData(decodeCommandCustomHeader.getConsumerGroup(), buildPopRetryTopic2, FilterAPI.build(buildPopRetryTopic2, "*", decodeCommandCustomHeader.getExpType()));
                ConsumerFilterData consumerFilterData = POLLING_SUC;
                if (!ExpressionType.isTagType(build.getExpressionType())) {
                    consumerFilterData = ConsumerFilterManager.build(decodeCommandCustomHeader.getTopic(), decodeCommandCustomHeader.getConsumerGroup(), decodeCommandCustomHeader.getExp(), decodeCommandCustomHeader.getExpType(), System.currentTimeMillis());
                    if (consumerFilterData == null) {
                        POP_LOGGER.warn("Parse the consumer's subscription[{}] failed, group: {}", decodeCommandCustomHeader.getExp(), decodeCommandCustomHeader.getConsumerGroup());
                        createResponseCommand.setCode(23);
                        createResponseCommand.setRemark("parse the consumer's subscription failed");
                        return createResponseCommand;
                    }
                }
                expressionMessageFilter = new ExpressionMessageFilter(build, consumerFilterData, this.brokerController.getConsumerFilterManager());
            } catch (Exception e2) {
                POP_LOGGER.warn("Parse the consumer's subscription[{}] error, group: {}", decodeCommandCustomHeader.getExp(), decodeCommandCustomHeader.getConsumerGroup());
                createResponseCommand.setCode(23);
                createResponseCommand.setRemark("parse the consumer's subscription failed");
                return createResponseCommand;
            }
        }
        int nextInt = this.random.nextInt(100);
        int abs = decodeCommandCustomHeader.isOrder() ? 999 : (int) Math.abs(this.ckMessageNumber.getAndIncrement() % this.brokerController.getBrokerConfig().getReviveQueueNum());
        GetMessageResult getMessageResult = new GetMessageResult(this.brokerController.getBrokerConfig().getCommercialSizePerMsg());
        ExpressionMessageFilter expressionMessageFilter2 = expressionMessageFilter;
        StringBuilder sb4 = sb3;
        boolean z = nextInt % 5 == 0;
        long currentTimeMillis = System.currentTimeMillis();
        CompletableFuture completedFuture = CompletableFuture.completedFuture(0L);
        if (z && !decodeCommandCustomHeader.isOrder() && (selectTopicConfig2 = this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopic(decodeCommandCustomHeader.getTopic(), decodeCommandCustomHeader.getConsumerGroup()))) != null) {
            for (int i = POLLING_SUC; i < selectTopicConfig2.getReadQueueNums(); i += POLLING_FULL) {
                int readQueueNums = (nextInt + i) % selectTopicConfig2.getReadQueueNums();
                int i2 = abs;
                completedFuture = completedFuture.thenCompose(l -> {
                    return popMsgFromQueue(true, getMessageResult, decodeCommandCustomHeader, readQueueNums, l.longValue(), i2, channel, currentTimeMillis, expressionMessageFilter2, sb, sb2, sb4);
                });
            }
        }
        if (decodeCommandCustomHeader.getQueueId().intValue() < 0) {
            for (int i3 = POLLING_SUC; i3 < selectTopicConfig3.getReadQueueNums(); i3 += POLLING_FULL) {
                int readQueueNums2 = (nextInt + i3) % selectTopicConfig3.getReadQueueNums();
                int i4 = abs;
                completedFuture = completedFuture.thenCompose(l2 -> {
                    return popMsgFromQueue(false, getMessageResult, decodeCommandCustomHeader, readQueueNums2, l2.longValue(), i4, channel, currentTimeMillis, expressionMessageFilter2, sb, sb2, sb4);
                });
            }
        } else {
            int intValue = decodeCommandCustomHeader.getQueueId().intValue();
            int i5 = abs;
            completedFuture = completedFuture.thenCompose(l3 -> {
                return popMsgFromQueue(false, getMessageResult, decodeCommandCustomHeader, intValue, l3.longValue(), i5, channel, currentTimeMillis, expressionMessageFilter2, sb, sb2, sb4);
            });
        }
        if (!z && getMessageResult.getMessageMapedList().size() < decodeCommandCustomHeader.getMaxMsgNums() && !decodeCommandCustomHeader.isOrder() && (selectTopicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopic(decodeCommandCustomHeader.getTopic(), decodeCommandCustomHeader.getConsumerGroup()))) != null) {
            for (int i6 = POLLING_SUC; i6 < selectTopicConfig.getReadQueueNums(); i6 += POLLING_FULL) {
                int readQueueNums3 = (nextInt + i6) % selectTopicConfig.getReadQueueNums();
                int i7 = abs;
                completedFuture = completedFuture.thenCompose(l4 -> {
                    return popMsgFromQueue(true, getMessageResult, decodeCommandCustomHeader, readQueueNums3, l4.longValue(), i7, channel, currentTimeMillis, expressionMessageFilter2, sb, sb2, sb4);
                });
            }
        }
        int i8 = abs;
        completedFuture.thenApply(l5 -> {
            if (getMessageResult.getMessageBufferList().isEmpty()) {
                int polling = polling(channel, remotingCommand, decodeCommandCustomHeader);
                if (POLLING_SUC == polling) {
                    return null;
                }
                if (POLLING_FULL == polling) {
                    createResponseCommand.setCode(209);
                } else {
                    createResponseCommand.setCode(210);
                }
                getMessageResult.setStatus(GetMessageStatus.NO_MESSAGE_IN_QUEUE);
            } else {
                createResponseCommand.setCode(POLLING_SUC);
                getMessageResult.setStatus(GetMessageStatus.FOUND);
                if (l5.longValue() > 0) {
                    notifyMessageArriving(decodeCommandCustomHeader.getTopic(), decodeCommandCustomHeader.getConsumerGroup(), decodeCommandCustomHeader.getQueueId().intValue());
                }
            }
            readCustomHeader.setInvisibleTime(decodeCommandCustomHeader.getInvisibleTime());
            readCustomHeader.setPopTime(currentTimeMillis);
            readCustomHeader.setReviveQid(i8);
            readCustomHeader.setRestNum(l5.longValue());
            readCustomHeader.setStartOffsetInfo(sb.toString());
            readCustomHeader.setMsgOffsetInfo(sb2.toString());
            if (decodeCommandCustomHeader.isOrder() && sb4 != null) {
                readCustomHeader.setOrderCountInfo(sb4.toString());
            }
            createResponseCommand.setRemark(getMessageResult.getStatus().name());
            switch (createResponseCommand.getCode()) {
                case POLLING_SUC /* 0 */:
                    if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
                        long now = this.brokerController.getMessageStore().now();
                        byte[] readGetMessageResult = readGetMessageResult(getMessageResult, decodeCommandCustomHeader.getConsumerGroup(), decodeCommandCustomHeader.getTopic(), decodeCommandCustomHeader.getQueueId().intValue());
                        this.brokerController.getBrokerStatsManager().incGroupGetLatency(decodeCommandCustomHeader.getConsumerGroup(), decodeCommandCustomHeader.getTopic(), decodeCommandCustomHeader.getQueueId().intValue(), (int) (this.brokerController.getMessageStore().now() - now));
                        createResponseCommand.setBody(readGetMessageResult);
                        return createResponseCommand;
                    }
                    try {
                        channel.writeAndFlush(new ManyMessageTransfer(createResponseCommand.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult)).addListener(channelFuture -> {
                            getMessageResult.release();
                            RemotingMetricsManager.rpcLatency.record(remotingCommand.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), RemotingMetricsManager.newAttributesBuilder().put("request_code", RemotingMetricsManager.getRequestCodeDesc(remotingCommand.getCode())).put("response_code", RemotingMetricsManager.getResponseCodeDesc(createResponseCommand.getCode())).put("result", RemotingMetricsManager.getWriteAndFlushResult(channelFuture)).build());
                            if (channelFuture.isSuccess()) {
                                return;
                            }
                            POP_LOGGER.error("Fail to transfer messages from page cache to {}", channel.remoteAddress(), channelFuture.cause());
                        });
                        return null;
                    } catch (Throwable th) {
                        POP_LOGGER.error("Error occurred when transferring messages from page cache", th);
                        getMessageResult.release();
                        return null;
                    }
                default:
                    return createResponseCommand;
            }
        }).thenAccept(remotingCommand2 -> {
            NettyRemotingAbstract.writeResponse(channel, remotingCommand, remotingCommand2);
        });
        return null;
    }

    private CompletableFuture<Long> popMsgFromQueue(boolean z, GetMessageResult getMessageResult, PopMessageRequestHeader popMessageRequestHeader, int i, long j, int i2, Channel channel, long j2, ExpressionMessageFilter expressionMessageFilter, StringBuilder sb, StringBuilder sb2, StringBuilder sb3) {
        String buildPopRetryTopic = z ? KeyBuilder.buildPopRetryTopic(popMessageRequestHeader.getTopic(), popMessageRequestHeader.getConsumerGroup()) : popMessageRequestHeader.getTopic();
        String str = buildPopRetryTopic + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + popMessageRequestHeader.getConsumerGroup() + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + i;
        boolean isOrder = popMessageRequestHeader.isOrder();
        long popOffset = getPopOffset(buildPopRetryTopic, popMessageRequestHeader.getConsumerGroup(), i, popMessageRequestHeader.getInitMode(), false, str, false);
        CompletableFuture<Long> completableFuture = new CompletableFuture<>();
        if (!this.queueLockManager.tryLock(str)) {
            completableFuture.complete(Long.valueOf((this.brokerController.getMessageStore().getMaxOffsetInQueue(buildPopRetryTopic, i) - popOffset) + j));
            return completableFuture;
        }
        try {
            completableFuture.whenComplete((l, th) -> {
                this.queueLockManager.unLock(str);
            });
            long popOffset2 = getPopOffset(buildPopRetryTopic, popMessageRequestHeader.getConsumerGroup(), i, popMessageRequestHeader.getInitMode(), true, str, true);
            if (isOrder && this.brokerController.getConsumerOrderInfoManager().checkBlock(buildPopRetryTopic, popMessageRequestHeader.getConsumerGroup(), i, popMessageRequestHeader.getInvisibleTime())) {
                completableFuture.complete(Long.valueOf((this.brokerController.getMessageStore().getMaxOffsetInQueue(buildPopRetryTopic, i) - popOffset2) + j));
                return completableFuture;
            }
            if (isOrder) {
                this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNum(buildPopRetryTopic, popMessageRequestHeader.getConsumerGroup(), i);
            }
            if (getMessageResult.getMessageMapedList().size() >= popMessageRequestHeader.getMaxMsgNums()) {
                completableFuture.complete(Long.valueOf((this.brokerController.getMessageStore().getMaxOffsetInQueue(buildPopRetryTopic, i) - popOffset2) + j));
                return completableFuture;
            }
            AtomicLong atomicLong = new AtomicLong(j);
            AtomicLong atomicLong2 = new AtomicLong(popOffset2);
            return this.brokerController.getMessageStore().getMessageAsync(popMessageRequestHeader.getConsumerGroup(), buildPopRetryTopic, i, popOffset2, popMessageRequestHeader.getMaxMsgNums() - getMessageResult.getMessageMapedList().size(), expressionMessageFilter).thenCompose(getMessageResult2 -> {
                if (getMessageResult2 == null) {
                    return CompletableFuture.completedFuture(null);
                }
                if (!GetMessageStatus.OFFSET_TOO_SMALL.equals(getMessageResult2.getStatus()) && !GetMessageStatus.OFFSET_OVERFLOW_BADLY.equals(getMessageResult2.getStatus()) && !GetMessageStatus.OFFSET_FOUND_NULL.equals(getMessageResult2.getStatus())) {
                    return CompletableFuture.completedFuture(getMessageResult2);
                }
                POP_LOGGER.warn("Pop initial offset, because store is no correct, {}, {}->{}", new Object[]{str, Long.valueOf(atomicLong2.get()), Long.valueOf(getMessageResult2.getNextBeginOffset())});
                this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(), popMessageRequestHeader.getConsumerGroup(), buildPopRetryTopic, i, getMessageResult2.getNextBeginOffset());
                atomicLong2.set(getMessageResult2.getNextBeginOffset());
                return this.brokerController.getMessageStore().getMessageAsync(popMessageRequestHeader.getConsumerGroup(), buildPopRetryTopic, i, atomicLong2.get(), popMessageRequestHeader.getMaxMsgNums() - getMessageResult.getMessageMapedList().size(), expressionMessageFilter);
            }).thenApply(getMessageResult3 -> {
                if (getMessageResult3 == null) {
                    atomicLong.set((this.brokerController.getMessageStore().getMaxOffsetInQueue(buildPopRetryTopic, i) - atomicLong2.get()) + atomicLong.get());
                    return Long.valueOf(atomicLong.get());
                }
                if (!getMessageResult3.getMessageMapedList().isEmpty()) {
                    this.brokerController.getBrokerStatsManager().incBrokerGetNums(popMessageRequestHeader.getTopic(), getMessageResult3.getMessageCount());
                    this.brokerController.getBrokerStatsManager().incGroupGetNums(popMessageRequestHeader.getConsumerGroup(), buildPopRetryTopic, getMessageResult3.getMessageCount());
                    this.brokerController.getBrokerStatsManager().incGroupGetSize(popMessageRequestHeader.getConsumerGroup(), buildPopRetryTopic, getMessageResult3.getBufferTotalSize());
                    Attributes build = BrokerMetricsManager.newAttributesBuilder().put(BrokerMetricsConstant.LABEL_TOPIC, popMessageRequestHeader.getTopic()).put(BrokerMetricsConstant.LABEL_CONSUMER_GROUP, popMessageRequestHeader.getConsumerGroup()).put(BrokerMetricsConstant.LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(popMessageRequestHeader.getTopic()) || MixAll.isSysConsumerGroup(popMessageRequestHeader.getConsumerGroup())).put(BrokerMetricsConstant.LABEL_IS_RETRY, z).build();
                    BrokerMetricsManager.messagesOutTotal.add(getMessageResult3.getMessageCount(), build);
                    BrokerMetricsManager.throughputOutTotal.add(getMessageResult3.getBufferTotalSize(), build);
                    if (isOrder) {
                        this.brokerController.getConsumerOrderInfoManager().update(z, buildPopRetryTopic, popMessageRequestHeader.getConsumerGroup(), i, j2, popMessageRequestHeader.getInvisibleTime(), getMessageResult3.getMessageQueueOffset(), sb3);
                        this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(), popMessageRequestHeader.getConsumerGroup(), buildPopRetryTopic, i, popOffset2);
                    } else {
                        appendCheckPoint(popMessageRequestHeader, buildPopRetryTopic, i2, i, popOffset2, getMessageResult3, j2, this.brokerController.getBrokerConfig().getBrokerName());
                    }
                    ExtraInfoUtil.buildStartOffsetInfo(sb, z, i, popOffset2);
                    ExtraInfoUtil.buildMsgOffsetInfo(sb2, z, i, getMessageResult3.getMessageQueueOffset());
                } else if ((GetMessageStatus.NO_MATCHED_MESSAGE.equals(getMessageResult3.getStatus()) || GetMessageStatus.OFFSET_FOUND_NULL.equals(getMessageResult3.getStatus()) || GetMessageStatus.MESSAGE_WAS_REMOVING.equals(getMessageResult3.getStatus()) || GetMessageStatus.NO_MATCHED_LOGIC_QUEUE.equals(getMessageResult3.getStatus())) && getMessageResult3.getNextBeginOffset() > -1) {
                    this.popBufferMergeService.addCkMock(popMessageRequestHeader.getConsumerGroup(), buildPopRetryTopic, i, popOffset2, popMessageRequestHeader.getInvisibleTime(), j2, i2, getMessageResult3.getNextBeginOffset(), this.brokerController.getBrokerConfig().getBrokerName());
                }
                atomicLong.set((getMessageResult3.getMaxOffset() - getMessageResult3.getNextBeginOffset()) + atomicLong.get());
                String brokerName = this.brokerController.getBrokerConfig().getBrokerName();
                for (SelectMappedBufferResult selectMappedBufferResult : getMessageResult3.getMessageMapedList()) {
                    if (z) {
                        List<MessageExt> decodesBatch = MessageDecoder.decodesBatch(selectMappedBufferResult.getByteBuffer(), true, false, true);
                        selectMappedBufferResult.release();
                        for (MessageExt messageExt : decodesBatch) {
                            try {
                                messageExt.getProperties().putIfAbsent("POP_CK", ExtraInfoUtil.buildExtraInfo(popOffset2, j2, popMessageRequestHeader.getInvisibleTime(), i2, messageExt.getTopic(), brokerName, messageExt.getQueueId(), messageExt.getQueueOffset()));
                                messageExt.setTopic(popMessageRequestHeader.getTopic());
                                messageExt.setStoreSize(POLLING_SUC);
                                byte[] encode = MessageDecoder.encode(messageExt, false);
                                getMessageResult.addMessage(new SelectMappedBufferResult(selectMappedBufferResult.getStartOffset(), ByteBuffer.wrap(encode), encode.length, (MappedFile) null));
                            } catch (Exception e) {
                                POP_LOGGER.error("Exception in recode retry message buffer, topic={}", buildPopRetryTopic, e);
                            }
                        }
                    } else {
                        getMessageResult.addMessage(selectMappedBufferResult);
                    }
                }
                this.brokerController.getPopInflightMessageCounter().incrementInFlightMessageNum(buildPopRetryTopic, popMessageRequestHeader.getConsumerGroup(), i, getMessageResult3.getMessageCount());
                return Long.valueOf(atomicLong.get());
            }).whenComplete((l2, th2) -> {
                if (th2 != null) {
                    POP_LOGGER.error("Pop message error, {}", str, th2);
                }
                this.queueLockManager.unLock(str);
            });
        } catch (Exception e) {
            POP_LOGGER.error("Exception in popMsgFromQueue", e);
            completableFuture.complete(Long.valueOf(j));
            return completableFuture;
        }
    }

    private long getPopOffset(String str, String str2, int i, int i2, boolean z, String str3, boolean z2) {
        Long resetPopOffset;
        long queryOffset = this.brokerController.getConsumerOffsetManager().queryOffset(str2, str, i);
        if (queryOffset < 0) {
            if (POLLING_SUC == i2) {
                queryOffset = this.brokerController.getMessageStore().getMinOffsetInQueue(str, i);
            } else {
                queryOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(str, i) - 1;
                if (queryOffset < 0) {
                    queryOffset = 0;
                }
                if (z) {
                    this.brokerController.getConsumerOffsetManager().commitOffset("getPopOffset", str2, str, i, queryOffset);
                }
            }
        }
        if (z2 && (resetPopOffset = resetPopOffset(str, str2, i)) != null) {
            return resetPopOffset.longValue();
        }
        long latestOffset = this.popBufferMergeService.getLatestOffset(str3);
        return latestOffset < 0 ? queryOffset : Math.max(latestOffset, queryOffset);
    }

    private int polling(Channel channel, RemotingCommand remotingCommand, PopMessageRequestHeader popMessageRequestHeader) {
        if (popMessageRequestHeader.getPollTime() <= 0 || this.popLongPollingService.isStopped()) {
            return NOT_POLLING;
        }
        ConcurrentHashMap<String, Byte> concurrentHashMap = this.topicCidMap.get(popMessageRequestHeader.getTopic());
        if (concurrentHashMap == null) {
            concurrentHashMap = new ConcurrentHashMap<>();
            ConcurrentHashMap<String, Byte> putIfAbsent = this.topicCidMap.putIfAbsent(popMessageRequestHeader.getTopic(), concurrentHashMap);
            if (putIfAbsent != null) {
                concurrentHashMap = putIfAbsent;
            }
        }
        concurrentHashMap.putIfAbsent(popMessageRequestHeader.getConsumerGroup(), Byte.MIN_VALUE);
        PopRequest popRequest = new PopRequest(remotingCommand, channel, popMessageRequestHeader.getBornTime() + popMessageRequestHeader.getPollTime());
        if (this.totalPollingNum.get() >= this.brokerController.getBrokerConfig().getMaxPopPollingSize()) {
            POP_LOGGER.info("polling {}, result POLLING_FULL, total:{}", remotingCommand, Long.valueOf(this.totalPollingNum.get()));
            return POLLING_FULL;
        }
        if (popRequest.isTimeout()) {
            if (!this.brokerController.getBrokerConfig().isEnablePopLog()) {
                return POLLING_TIMEOUT;
            }
            POP_LOGGER.info("polling {}, result POLLING_TIMEOUT", remotingCommand);
            return POLLING_TIMEOUT;
        }
        String buildPollingKey = KeyBuilder.buildPollingKey(popMessageRequestHeader.getTopic(), popMessageRequestHeader.getConsumerGroup(), popMessageRequestHeader.getQueueId().intValue());
        ConcurrentSkipListSet concurrentSkipListSet = (ConcurrentSkipListSet) this.pollingMap.get(buildPollingKey);
        if (concurrentSkipListSet == null) {
            concurrentSkipListSet = new ConcurrentSkipListSet(PopRequest.COMPARATOR);
            ConcurrentSkipListSet concurrentSkipListSet2 = (ConcurrentSkipListSet) this.pollingMap.putIfAbsent(buildPollingKey, concurrentSkipListSet);
            if (concurrentSkipListSet2 != null) {
                concurrentSkipListSet = concurrentSkipListSet2;
            }
        } else {
            int size = concurrentSkipListSet.size();
            if (size > this.brokerController.getBrokerConfig().getPopPollingSize()) {
                POP_LOGGER.info("polling {}, result POLLING_FULL, singleSize:{}", remotingCommand, Integer.valueOf(size));
                return POLLING_FULL;
            }
        }
        if (!concurrentSkipListSet.add(popRequest)) {
            POP_LOGGER.info("polling {}, result POLLING_FULL, add fail, {}", popRequest, concurrentSkipListSet);
            return POLLING_FULL;
        }
        remotingCommand.setSuspended(true);
        this.totalPollingNum.incrementAndGet();
        if (!this.brokerController.getBrokerConfig().isEnablePopLog()) {
            return POLLING_SUC;
        }
        POP_LOGGER.info("polling {}, result POLLING_SUC", remotingCommand);
        return POLLING_SUC;
    }

    public final MessageExtBrokerInner buildCkMsg(PopCheckPoint popCheckPoint, int i) {
        MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner();
        messageExtBrokerInner.setTopic(this.reviveTopic);
        messageExtBrokerInner.setBody(JSON.toJSONString(popCheckPoint).getBytes(DataConverter.charset));
        messageExtBrokerInner.setQueueId(i);
        messageExtBrokerInner.setTags("ck");
        messageExtBrokerInner.setBornTimestamp(System.currentTimeMillis());
        messageExtBrokerInner.setBornHost(this.brokerController.getStoreHost());
        messageExtBrokerInner.setStoreHost(this.brokerController.getStoreHost());
        messageExtBrokerInner.setDeliverTimeMs(popCheckPoint.getReviveTime() - PopAckConstants.ackTimeInterval);
        messageExtBrokerInner.getProperties().put("UNIQ_KEY", genCkUniqueId(popCheckPoint));
        messageExtBrokerInner.setPropertiesString(MessageDecoder.messageProperties2String(messageExtBrokerInner.getProperties()));
        return messageExtBrokerInner;
    }

    private void appendCheckPoint(PopMessageRequestHeader popMessageRequestHeader, String str, int i, int i2, long j, GetMessageResult getMessageResult, long j2, String str2) {
        PopCheckPoint popCheckPoint = new PopCheckPoint();
        popCheckPoint.setBitMap(POLLING_SUC);
        popCheckPoint.setNum((byte) getMessageResult.getMessageMapedList().size());
        popCheckPoint.setPopTime(j2);
        popCheckPoint.setInvisibleTime(popMessageRequestHeader.getInvisibleTime());
        popCheckPoint.setStartOffset(j);
        popCheckPoint.setCId(popMessageRequestHeader.getConsumerGroup());
        popCheckPoint.setTopic(str);
        popCheckPoint.setQueueId(i2);
        popCheckPoint.setBrokerName(str2);
        Iterator it = getMessageResult.getMessageQueueOffset().iterator();
        while (it.hasNext()) {
            popCheckPoint.addDiff((int) (((Long) it.next()).longValue() - j));
        }
        if (this.popBufferMergeService.addCk(popCheckPoint, i, -1L, getMessageResult.getNextBeginOffset())) {
            return;
        }
        this.popBufferMergeService.addCkJustOffset(popCheckPoint, i, -1L, getMessageResult.getNextBeginOffset());
    }

    private Long resetPopOffset(String str, String str2, int i) {
        String str3 = str + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + str2 + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + i;
        Long queryThenEraseResetOffset = this.brokerController.getConsumerOffsetManager().queryThenEraseResetOffset(str, str2, Integer.valueOf(i));
        if (queryThenEraseResetOffset != null) {
            this.brokerController.getConsumerOrderInfoManager().clearBlock(str, str2, i);
            getPopBufferMergeService().clearOffsetQueue(str3);
            this.brokerController.getConsumerOffsetManager().commitOffset("ResetPopOffset", str2, str, i, queryThenEraseResetOffset.longValue());
        }
        return queryThenEraseResetOffset;
    }

    private byte[] readGetMessageResult(GetMessageResult getMessageResult, String str, String str2, int i) {
        ByteBuffer allocate = ByteBuffer.allocate(getMessageResult.getBufferTotalSize());
        long j = 0;
        try {
            for (ByteBuffer byteBuffer : getMessageResult.getMessageBufferList()) {
                allocate.put(byteBuffer);
                j = byteBuffer.getLong(56);
            }
            this.brokerController.getBrokerStatsManager().recordDiskFallBehindTime(str, str2, i, this.brokerController.getMessageStore().now() - j);
            return allocate.array();
        } finally {
            getMessageResult.release();
        }
    }
}
