package org.apache.rocketmq.broker.processor;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.broker.client.ConsumerManager;
import org.apache.rocketmq.broker.filter.ConsumerFilterData;
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
import org.apache.rocketmq.broker.filter.ExpressionForRetryMessageFilter;
import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
import org.apache.rocketmq.broker.plugin.PullMessageResultHandler;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RequestSource;
import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.PullMessageResponseHeader;
import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.protocol.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingContext;
import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.rpc.RpcClientUtils;
import org.apache.rocketmq.remoting.rpc.RpcRequest;
import org.apache.rocketmq.remoting.rpc.RpcResponse;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.stats.BrokerStatsManager;

/* loaded from: input_file:org/apache/rocketmq/broker/processor/PullMessageProcessor.class */
public class PullMessageProcessor implements NettyRequestProcessor {
    private static final Logger LOGGER;
    private List<ConsumeMessageHook> consumeMessageHookList;
    private PullMessageResultHandler pullMessageResultHandler;
    private final BrokerController brokerController;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.rocketmq.broker.processor.PullMessageProcessor$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/rocketmq/broker/processor/PullMessageProcessor$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$rocketmq$remoting$protocol$RequestSource;
        static final /* synthetic */ int[] $SwitchMap$org$apache$rocketmq$store$GetMessageStatus = new int[GetMessageStatus.values().length];

        static {
            try {
                $SwitchMap$org$apache$rocketmq$store$GetMessageStatus[GetMessageStatus.FOUND.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$store$GetMessageStatus[GetMessageStatus.MESSAGE_WAS_REMOVING.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$store$GetMessageStatus[GetMessageStatus.NO_MATCHED_LOGIC_QUEUE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$store$GetMessageStatus[GetMessageStatus.NO_MESSAGE_IN_QUEUE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$store$GetMessageStatus[GetMessageStatus.NO_MATCHED_MESSAGE.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$store$GetMessageStatus[GetMessageStatus.OFFSET_FOUND_NULL.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$store$GetMessageStatus[GetMessageStatus.OFFSET_OVERFLOW_BADLY.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$store$GetMessageStatus[GetMessageStatus.OFFSET_OVERFLOW_ONE.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$store$GetMessageStatus[GetMessageStatus.OFFSET_RESET.ordinal()] = 9;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$store$GetMessageStatus[GetMessageStatus.OFFSET_TOO_SMALL.ordinal()] = 10;
            } catch (NoSuchFieldError e10) {
            }
            $SwitchMap$org$apache$rocketmq$remoting$protocol$RequestSource = new int[RequestSource.values().length];
            try {
                $SwitchMap$org$apache$rocketmq$remoting$protocol$RequestSource[RequestSource.PROXY_FOR_BROADCAST.ordinal()] = 1;
            } catch (NoSuchFieldError e11) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$remoting$protocol$RequestSource[RequestSource.PROXY_FOR_STREAM.ordinal()] = 2;
            } catch (NoSuchFieldError e12) {
            }
        }
    }

    public PullMessageProcessor(BrokerController brokerController) {
        this.brokerController = brokerController;
        this.pullMessageResultHandler = new DefaultPullMessageResultHandler(brokerController);
    }

    private RemotingCommand rewriteRequestForStaticTopic(PullMessageRequestHeader pullMessageRequestHeader, TopicQueueMappingContext topicQueueMappingContext) {
        try {
            if (topicQueueMappingContext.getMappingDetail() == null) {
                return null;
            }
            TopicQueueMappingDetail mappingDetail = topicQueueMappingContext.getMappingDetail();
            String topic = topicQueueMappingContext.getTopic();
            Integer globalId = topicQueueMappingContext.getGlobalId();
            if (!topicQueueMappingContext.isLeader()) {
                return RemotingCommand.buildErrorResponse(501, String.format("%s-%d cannot find mapping item in request process of current broker %s", topic, globalId, mappingDetail.getBname()));
            }
            Long queueOffset = pullMessageRequestHeader.getQueueOffset();
            LogicQueueMappingItem findLogicQueueMappingItem = TopicQueueMappingUtils.findLogicQueueMappingItem(topicQueueMappingContext.getMappingItemList(), queueOffset.longValue(), true);
            topicQueueMappingContext.setCurrentItem(findLogicQueueMappingItem);
            if (queueOffset.longValue() < findLogicQueueMappingItem.getLogicOffset()) {
            }
            String bname = findLogicQueueMappingItem.getBname();
            Integer valueOf = Integer.valueOf(findLogicQueueMappingItem.getQueueId());
            Long valueOf2 = Long.valueOf(findLogicQueueMappingItem.computePhysicalQueueOffset(queueOffset.longValue()));
            pullMessageRequestHeader.setQueueId(valueOf);
            pullMessageRequestHeader.setQueueOffset(valueOf2);
            if (findLogicQueueMappingItem.checkIfEndOffsetDecided() && pullMessageRequestHeader.getMaxMsgNums() != null) {
                pullMessageRequestHeader.setMaxMsgNums(Integer.valueOf((int) Math.min(findLogicQueueMappingItem.getEndOffset() - findLogicQueueMappingItem.getStartOffset(), pullMessageRequestHeader.getMaxMsgNums().intValue())));
            }
            if (mappingDetail.getBname().equals(bname)) {
                return null;
            }
            int intValue = pullMessageRequestHeader.getSysFlag().intValue();
            pullMessageRequestHeader.setLo(false);
            pullMessageRequestHeader.setBname(bname);
            pullMessageRequestHeader.setSysFlag(Integer.valueOf(PullSysFlag.clearCommitOffsetFlag(PullSysFlag.clearSuspendFlag(intValue))));
            RpcResponse rpcResponse = (RpcResponse) this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(new RpcRequest(11, pullMessageRequestHeader, (Object) null), this.brokerController.getBrokerConfig().getForwardTimeout()).get();
            if (rpcResponse.getException() != null) {
                throw rpcResponse.getException();
            }
            RemotingCommand rewriteResponseForStaticTopic = rewriteResponseForStaticTopic(pullMessageRequestHeader, (PullMessageResponseHeader) rpcResponse.getHeader(), topicQueueMappingContext, rpcResponse.getCode());
            return rewriteResponseForStaticTopic != null ? rewriteResponseForStaticTopic : RpcClientUtils.createCommandForRpcResponse(rpcResponse);
        } catch (Throwable th) {
            LOGGER.warn("", th);
            return RemotingCommand.buildErrorResponse(1, th.toString());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public RemotingCommand rewriteResponseForStaticTopic(PullMessageRequestHeader pullMessageRequestHeader, PullMessageResponseHeader pullMessageResponseHeader, TopicQueueMappingContext topicQueueMappingContext, int i) {
        try {
            if (topicQueueMappingContext.getMappingDetail() == null) {
                return null;
            }
            TopicQueueMappingDetail mappingDetail = topicQueueMappingContext.getMappingDetail();
            LogicQueueMappingItem leaderItem = topicQueueMappingContext.getLeaderItem();
            LogicQueueMappingItem currentItem = topicQueueMappingContext.getCurrentItem();
            LogicQueueMappingItem findLogicQueueMappingItem = TopicQueueMappingUtils.findLogicQueueMappingItem(topicQueueMappingContext.getMappingItemList(), 0L, true);
            if (!$assertionsDisabled && currentItem.getLogicOffset() < 0) {
                throw new AssertionError();
            }
            long longValue = pullMessageRequestHeader.getQueueOffset().longValue();
            long longValue2 = pullMessageResponseHeader.getNextBeginOffset().longValue();
            long longValue3 = pullMessageResponseHeader.getMinOffset().longValue();
            long longValue4 = pullMessageResponseHeader.getMaxOffset().longValue();
            int i2 = i;
            if (i != 0) {
                boolean z = false;
                if (leaderItem.getGen() == currentItem.getGen()) {
                    if (longValue > longValue4) {
                        if (i == 21) {
                            i2 = 21;
                            longValue2 = longValue4;
                        } else {
                            i2 = i;
                        }
                    } else if (longValue < longValue3) {
                        longValue2 = longValue3;
                        i2 = 20;
                    } else {
                        i2 = i;
                    }
                }
                if (findLogicQueueMappingItem.getGen() == currentItem.getGen()) {
                    if (longValue < longValue3) {
                        if (i == 21) {
                            i2 = 21;
                            longValue2 = longValue3;
                        } else {
                            i2 = 21;
                            longValue2 = longValue3;
                        }
                    } else if (longValue >= longValue4) {
                        LogicQueueMappingItem findNext = TopicQueueMappingUtils.findNext(topicQueueMappingContext.getMappingItemList(), currentItem, true);
                        if (findNext != null) {
                            z = true;
                            currentItem = findNext;
                            longValue2 = currentItem.getStartOffset();
                            longValue3 = currentItem.getStartOffset();
                            longValue4 = longValue3;
                            i2 = 20;
                        } else {
                            i2 = 19;
                        }
                    } else {
                        i2 = i;
                    }
                }
                if (!z && leaderItem.getGen() != currentItem.getGen() && findLogicQueueMappingItem.getGen() != currentItem.getGen()) {
                    if (longValue < longValue3) {
                        longValue2 = longValue3;
                        i2 = 20;
                    } else if (longValue >= longValue4) {
                        LogicQueueMappingItem findNext2 = TopicQueueMappingUtils.findNext(topicQueueMappingContext.getMappingItemList(), currentItem, true);
                        if (findNext2 != null) {
                            currentItem = findNext2;
                            longValue2 = currentItem.getStartOffset();
                            longValue3 = currentItem.getStartOffset();
                            longValue4 = longValue3;
                            i2 = 20;
                        } else {
                            i2 = 19;
                        }
                    } else {
                        i2 = i;
                    }
                }
            }
            if (currentItem.checkIfEndOffsetDecided() && longValue2 >= currentItem.getEndOffset()) {
                longValue2 = currentItem.getEndOffset();
            }
            pullMessageResponseHeader.setNextBeginOffset(Long.valueOf(currentItem.computeStaticQueueOffsetStrictly(longValue2)));
            pullMessageResponseHeader.setMinOffset(Long.valueOf(currentItem.computeStaticQueueOffsetStrictly(Math.max(currentItem.getStartOffset(), longValue3))));
            pullMessageResponseHeader.setMaxOffset(Long.valueOf(Math.max(currentItem.computeStaticQueueOffsetStrictly(longValue4), TopicQueueMappingDetail.computeMaxOffsetFromMapping(mappingDetail, topicQueueMappingContext.getGlobalId()))));
            pullMessageResponseHeader.setOffsetDelta(Long.valueOf(currentItem.computeOffsetDelta()));
            if (i != 0) {
                return RemotingCommand.createResponseCommandWithHeader(i2, pullMessageResponseHeader);
            }
            return null;
        } catch (Throwable th) {
            LOGGER.warn("", th);
            return RemotingCommand.buildErrorResponse(1, th.toString());
        }
    }

    public RemotingCommand processRequest(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand) throws RemotingCommandException {
        return processRequest(channelHandlerContext.channel(), remotingCommand, true);
    }

    public boolean rejectRequest() {
        return !this.brokerController.getBrokerConfig().isSlaveReadEnable() && this.brokerController.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE;
    }

    private RemotingCommand processRequest(Channel channel, RemotingCommand remotingCommand, boolean z) throws RemotingCommandException {
        SubscriptionData build;
        RemotingCommand createResponseCommand = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
        PullMessageResponseHeader readCustomHeader = createResponseCommand.readCustomHeader();
        PullMessageRequestHeader decodeCommandCustomHeader = remotingCommand.decodeCommandCustomHeader(PullMessageRequestHeader.class);
        createResponseCommand.setOpaque(remotingCommand.getOpaque());
        LOGGER.debug("receive PullMessage request command, {}", remotingCommand);
        if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
            createResponseCommand.setCode(16);
            createResponseCommand.setRemark(String.format("the broker[%s] pulling message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1()));
            return createResponseCommand;
        }
        if (remotingCommand.getCode() == 361 && !this.brokerController.getBrokerConfig().isLitePullMessageEnable()) {
            createResponseCommand.setCode(16);
            createResponseCommand.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] for lite pull consumer is forbidden");
            return createResponseCommand;
        }
        SubscriptionGroupConfig findSubscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(decodeCommandCustomHeader.getConsumerGroup());
        if (null == findSubscriptionGroupConfig) {
            createResponseCommand.setCode(26);
            createResponseCommand.setRemark(String.format("subscription group [%s] does not exist, %s", decodeCommandCustomHeader.getConsumerGroup(), FAQUrl.suggestTodo("https://rocketmq.apache.org/docs/bestPractice/06FAQ")));
            return createResponseCommand;
        }
        if (!findSubscriptionGroupConfig.isConsumeEnable()) {
            createResponseCommand.setCode(16);
            readCustomHeader.setForbiddenType(2);
            createResponseCommand.setRemark("subscription group no permission, " + decodeCommandCustomHeader.getConsumerGroup());
            return createResponseCommand;
        }
        PullSysFlag.hasCommitOffsetFlag(decodeCommandCustomHeader.getSysFlag().intValue());
        boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(decodeCommandCustomHeader.getSysFlag().intValue());
        TopicConfig selectTopicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(decodeCommandCustomHeader.getTopic());
        if (null == selectTopicConfig) {
            LOGGER.error("the topic {} not exist, consumer: {}", decodeCommandCustomHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));
            createResponseCommand.setCode(17);
            createResponseCommand.setRemark(String.format("topic[%s] not exist, apply first please! %s", decodeCommandCustomHeader.getTopic(), FAQUrl.suggestTodo("https://rocketmq.apache.org/docs/bestPractice/06FAQ")));
            return createResponseCommand;
        }
        if (!PermName.isReadable(selectTopicConfig.getPerm())) {
            createResponseCommand.setCode(16);
            readCustomHeader.setForbiddenType(3);
            createResponseCommand.setRemark("the topic[" + decodeCommandCustomHeader.getTopic() + "] pulling message is forbidden");
            return createResponseCommand;
        }
        TopicQueueMappingContext buildTopicQueueMappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(decodeCommandCustomHeader, false);
        RemotingCommand rewriteRequestForStaticTopic = rewriteRequestForStaticTopic(decodeCommandCustomHeader, buildTopicQueueMappingContext);
        if (rewriteRequestForStaticTopic != null) {
            return rewriteRequestForStaticTopic;
        }
        if (decodeCommandCustomHeader.getQueueId().intValue() < 0 || decodeCommandCustomHeader.getQueueId().intValue() >= selectTopicConfig.getReadQueueNums()) {
            String format = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]", decodeCommandCustomHeader.getQueueId(), decodeCommandCustomHeader.getTopic(), Integer.valueOf(selectTopicConfig.getReadQueueNums()), channel.remoteAddress());
            LOGGER.warn(format);
            createResponseCommand.setCode(1);
            createResponseCommand.setRemark(format);
            return createResponseCommand;
        }
        ConsumerManager consumerManager = this.brokerController.getConsumerManager();
        switch (AnonymousClass1.$SwitchMap$org$apache$rocketmq$remoting$protocol$RequestSource[RequestSource.parseInteger(decodeCommandCustomHeader.getRequestSource()).ordinal()]) {
            case 1:
                consumerManager.compensateBasicConsumerInfo(decodeCommandCustomHeader.getConsumerGroup(), ConsumeType.CONSUME_PASSIVELY, MessageModel.BROADCASTING);
                break;
            case 2:
                consumerManager.compensateBasicConsumerInfo(decodeCommandCustomHeader.getConsumerGroup(), ConsumeType.CONSUME_ACTIVELY, MessageModel.CLUSTERING);
                break;
            default:
                consumerManager.compensateBasicConsumerInfo(decodeCommandCustomHeader.getConsumerGroup(), ConsumeType.CONSUME_PASSIVELY, MessageModel.CLUSTERING);
                break;
        }
        ConsumerFilterData consumerFilterData = null;
        if (hasSubscriptionFlag) {
            try {
                build = FilterAPI.build(decodeCommandCustomHeader.getTopic(), decodeCommandCustomHeader.getSubscription(), decodeCommandCustomHeader.getExpressionType());
                consumerManager.compensateSubscribeData(decodeCommandCustomHeader.getConsumerGroup(), decodeCommandCustomHeader.getTopic(), build);
                if (!ExpressionType.isTagType(build.getExpressionType())) {
                    consumerFilterData = ConsumerFilterManager.build(decodeCommandCustomHeader.getTopic(), decodeCommandCustomHeader.getConsumerGroup(), decodeCommandCustomHeader.getSubscription(), decodeCommandCustomHeader.getExpressionType(), decodeCommandCustomHeader.getSubVersion().longValue());
                    if (!$assertionsDisabled && consumerFilterData == null) {
                        throw new AssertionError();
                    }
                }
            } catch (Exception e) {
                LOGGER.warn("Parse the consumer's subscription[{}] failed, group: {}", decodeCommandCustomHeader.getSubscription(), decodeCommandCustomHeader.getConsumerGroup());
                createResponseCommand.setCode(23);
                createResponseCommand.setRemark("parse the consumer's subscription failed");
                return createResponseCommand;
            }
        } else {
            ConsumerGroupInfo consumerGroupInfo = this.brokerController.getConsumerManager().getConsumerGroupInfo(decodeCommandCustomHeader.getConsumerGroup());
            if (null == consumerGroupInfo) {
                LOGGER.warn("the consumer's group info not exist, group: {}", decodeCommandCustomHeader.getConsumerGroup());
                createResponseCommand.setCode(24);
                createResponseCommand.setRemark("the consumer's group info not exist" + FAQUrl.suggestTodo("https://rocketmq.apache.org/docs/bestPractice/06FAQ"));
                return createResponseCommand;
            }
            if (!findSubscriptionGroupConfig.isConsumeBroadcastEnable() && consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) {
                createResponseCommand.setCode(16);
                readCustomHeader.setForbiddenType(4);
                createResponseCommand.setRemark("the consumer group[" + decodeCommandCustomHeader.getConsumerGroup() + "] can not consume by broadcast way");
                return createResponseCommand;
            }
            if (this.brokerController.getSubscriptionGroupManager().getForbidden(findSubscriptionGroupConfig.getGroupName(), decodeCommandCustomHeader.getTopic(), 2)) {
                createResponseCommand.setCode(16);
                readCustomHeader.setForbiddenType(5);
                createResponseCommand.setRemark("the consumer group[" + decodeCommandCustomHeader.getConsumerGroup() + "] is forbidden for topic[" + decodeCommandCustomHeader.getTopic() + "]");
                return createResponseCommand;
            }
            build = consumerGroupInfo.findSubscriptionData(decodeCommandCustomHeader.getTopic());
            if (null == build) {
                LOGGER.warn("the consumer's subscription not exist, group: {}, topic:{}", decodeCommandCustomHeader.getConsumerGroup(), decodeCommandCustomHeader.getTopic());
                createResponseCommand.setCode(24);
                createResponseCommand.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo("https://rocketmq.apache.org/docs/bestPractice/06FAQ"));
                return createResponseCommand;
            }
            if (build.getSubVersion() < decodeCommandCustomHeader.getSubVersion().longValue()) {
                LOGGER.warn("The broker's subscription is not latest, group: {} {}", decodeCommandCustomHeader.getConsumerGroup(), build.getSubString());
                createResponseCommand.setCode(25);
                createResponseCommand.setRemark("the consumer's subscription not latest");
                return createResponseCommand;
            }
            if (!ExpressionType.isTagType(build.getExpressionType())) {
                consumerFilterData = this.brokerController.getConsumerFilterManager().get(decodeCommandCustomHeader.getTopic(), decodeCommandCustomHeader.getConsumerGroup());
                if (consumerFilterData == null) {
                    createResponseCommand.setCode(27);
                    createResponseCommand.setRemark("The broker's consumer filter data is not exist!Your expression may be wrong!");
                    return createResponseCommand;
                }
                if (consumerFilterData.getClientVersion() < decodeCommandCustomHeader.getSubVersion().longValue()) {
                    LOGGER.warn("The broker's consumer filter data is not latest, group: {}, topic: {}, serverV: {}, clientV: {}", new Object[]{decodeCommandCustomHeader.getConsumerGroup(), decodeCommandCustomHeader.getTopic(), Long.valueOf(consumerFilterData.getClientVersion()), decodeCommandCustomHeader.getSubVersion()});
                    createResponseCommand.setCode(28);
                    createResponseCommand.setRemark("the consumer's consumer filter data not latest");
                    return createResponseCommand;
                }
            }
        }
        if (!ExpressionType.isTagType(build.getExpressionType()) && !this.brokerController.getBrokerConfig().isEnablePropertyFilter()) {
            createResponseCommand.setCode(1);
            createResponseCommand.setRemark("The broker does not support consumer to filter message by " + build.getExpressionType());
            return createResponseCommand;
        }
        ExpressionMessageFilter expressionForRetryMessageFilter = this.brokerController.getBrokerConfig().isFilterSupportRetry() ? new ExpressionForRetryMessageFilter(build, consumerFilterData, this.brokerController.getConsumerFilterManager()) : new ExpressionMessageFilter(build, consumerFilterData, this.brokerController.getConsumerFilterManager());
        MessageStore messageStore = this.brokerController.getMessageStore();
        boolean isUseServerSideResetOffset = this.brokerController.getBrokerConfig().isUseServerSideResetOffset();
        String topic = decodeCommandCustomHeader.getTopic();
        String consumerGroup = decodeCommandCustomHeader.getConsumerGroup();
        int intValue = decodeCommandCustomHeader.getQueueId().intValue();
        Long queryThenEraseResetOffset = this.brokerController.getConsumerOffsetManager().queryThenEraseResetOffset(topic, consumerGroup, Integer.valueOf(intValue));
        GetMessageResult getMessageResult = null;
        if (!isUseServerSideResetOffset || null == queryThenEraseResetOffset) {
            long queryBroadcastPullInitOffset = queryBroadcastPullInitOffset(topic, consumerGroup, intValue, decodeCommandCustomHeader, channel);
            if (queryBroadcastPullInitOffset >= 0) {
                getMessageResult = new GetMessageResult();
                getMessageResult.setStatus(GetMessageStatus.OFFSET_RESET);
                getMessageResult.setNextBeginOffset(queryBroadcastPullInitOffset);
            } else {
                SubscriptionData subscriptionData = build;
                ExpressionMessageFilter expressionMessageFilter = expressionForRetryMessageFilter;
                messageStore.getMessageAsync(consumerGroup, topic, intValue, decodeCommandCustomHeader.getQueueOffset().longValue(), decodeCommandCustomHeader.getMaxMsgNums().intValue(), expressionForRetryMessageFilter).thenApply(getMessageResult2 -> {
                    if (null != getMessageResult2) {
                        return this.pullMessageResultHandler.handle(getMessageResult2, remotingCommand, decodeCommandCustomHeader, channel, subscriptionData, findSubscriptionGroupConfig, z, expressionMessageFilter, createResponseCommand, buildTopicQueueMappingContext);
                    }
                    createResponseCommand.setCode(1);
                    createResponseCommand.setRemark("store getMessage return null");
                    return createResponseCommand;
                }).thenAccept(remotingCommand2 -> {
                    NettyRemotingAbstract.writeResponse(channel, remotingCommand, remotingCommand2);
                });
            }
        } else {
            getMessageResult = new GetMessageResult();
            getMessageResult.setStatus(GetMessageStatus.OFFSET_RESET);
            getMessageResult.setNextBeginOffset(queryThenEraseResetOffset.longValue());
            getMessageResult.setMinOffset(messageStore.getMinOffsetInQueue(topic, intValue));
            getMessageResult.setMaxOffset(messageStore.getMaxOffsetInQueue(topic, intValue));
            getMessageResult.setSuggestPullingFromSlave(false);
        }
        if (getMessageResult != null) {
            return this.pullMessageResultHandler.handle(getMessageResult, remotingCommand, decodeCommandCustomHeader, channel, build, findSubscriptionGroupConfig, z, expressionForRetryMessageFilter, createResponseCommand, buildTopicQueueMappingContext);
        }
        return null;
    }

    public boolean hasConsumeMessageHook() {
        return (this.consumeMessageHookList == null || this.consumeMessageHookList.isEmpty()) ? false : true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void composeResponseHeader(PullMessageRequestHeader pullMessageRequestHeader, GetMessageResult getMessageResult, int i, SubscriptionGroupConfig subscriptionGroupConfig, RemotingCommand remotingCommand, String str) {
        PullMessageResponseHeader readCustomHeader = remotingCommand.readCustomHeader();
        remotingCommand.setRemark(getMessageResult.getStatus().name());
        readCustomHeader.setNextBeginOffset(Long.valueOf(getMessageResult.getNextBeginOffset()));
        readCustomHeader.setMinOffset(Long.valueOf(getMessageResult.getMinOffset()));
        readCustomHeader.setMaxOffset(Long.valueOf(getMessageResult.getMaxOffset()));
        readCustomHeader.setTopicSysFlag(Integer.valueOf(i));
        readCustomHeader.setGroupSysFlag(Integer.valueOf(subscriptionGroupConfig.getGroupSysFlag()));
        switch (AnonymousClass1.$SwitchMap$org$apache$rocketmq$store$GetMessageStatus[getMessageResult.getStatus().ordinal()]) {
            case 1:
                remotingCommand.setCode(0);
                break;
            case 2:
                remotingCommand.setCode(20);
                break;
            case 3:
            case 4:
                if (0 == pullMessageRequestHeader.getQueueOffset().longValue()) {
                    remotingCommand.setCode(19);
                    break;
                } else {
                    remotingCommand.setCode(21);
                    LOGGER.info("the broker stores no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}", new Object[]{pullMessageRequestHeader.getQueueOffset(), Long.valueOf(getMessageResult.getNextBeginOffset()), pullMessageRequestHeader.getTopic(), pullMessageRequestHeader.getQueueId(), pullMessageRequestHeader.getConsumerGroup()});
                    break;
                }
            case 5:
                remotingCommand.setCode(20);
                break;
            case 6:
                remotingCommand.setCode(19);
                break;
            case 7:
                remotingCommand.setCode(21);
                LOGGER.info("the request offset: {} over flow badly, fix to {}, broker max offset: {}, consumer: {}", new Object[]{pullMessageRequestHeader.getQueueOffset(), Long.valueOf(getMessageResult.getNextBeginOffset()), Long.valueOf(getMessageResult.getMaxOffset()), str});
                break;
            case 8:
                remotingCommand.setCode(19);
                break;
            case 9:
                remotingCommand.setCode(21);
                LOGGER.info("The queue under pulling was previously reset to start from {}", Long.valueOf(getMessageResult.getNextBeginOffset()));
                break;
            case 10:
                remotingCommand.setCode(21);
                LOGGER.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}", new Object[]{pullMessageRequestHeader.getConsumerGroup(), pullMessageRequestHeader.getTopic(), pullMessageRequestHeader.getQueueOffset(), Long.valueOf(getMessageResult.getMinOffset()), str});
                break;
            default:
                if (!$assertionsDisabled) {
                    throw new AssertionError();
                }
                break;
        }
        if (!this.brokerController.getBrokerConfig().isSlaveReadEnable() || this.brokerController.getBrokerConfig().isInBrokerContainer()) {
            readCustomHeader.setSuggestWhichBrokerId(0L);
        } else if (getMessageResult.isSuggestPullingFromSlave()) {
            readCustomHeader.setSuggestWhichBrokerId(Long.valueOf(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly()));
        } else {
            readCustomHeader.setSuggestWhichBrokerId(Long.valueOf(subscriptionGroupConfig.getBrokerId()));
        }
        if (this.brokerController.getBrokerConfig().getBrokerId() == 0 || getMessageResult.isSuggestPullingFromSlave() || this.brokerController.getMinBrokerIdInGroup() != 0) {
            return;
        }
        LOGGER.debug("slave redirect pullRequest to master, topic: {}, queueId: {}, consumer group: {}, next: {}, min: {}, max: {}", new Object[]{pullMessageRequestHeader.getTopic(), pullMessageRequestHeader.getQueueId(), pullMessageRequestHeader.getConsumerGroup(), readCustomHeader.getNextBeginOffset(), readCustomHeader.getMinOffset(), readCustomHeader.getMaxOffset()});
        readCustomHeader.setSuggestWhichBrokerId(0L);
        if (getMessageResult.getStatus().equals(GetMessageStatus.FOUND)) {
            return;
        }
        remotingCommand.setCode(20);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void executeConsumeMessageHookBefore(RemotingCommand remotingCommand, PullMessageRequestHeader pullMessageRequestHeader, GetMessageResult getMessageResult, boolean z, int i) {
        if (hasConsumeMessageHook()) {
            String str = (String) remotingCommand.getExtFields().get("Owner");
            String str2 = (String) remotingCommand.getExtFields().get("AUTH_TYPE");
            String str3 = (String) remotingCommand.getExtFields().get("OWNER_PARENT");
            String str4 = (String) remotingCommand.getExtFields().get("OWNER_SELF");
            ConsumeMessageContext consumeMessageContext = new ConsumeMessageContext();
            consumeMessageContext.setConsumerGroup(pullMessageRequestHeader.getConsumerGroup());
            consumeMessageContext.setTopic(pullMessageRequestHeader.getTopic());
            consumeMessageContext.setQueueId(pullMessageRequestHeader.getQueueId());
            consumeMessageContext.setAccountAuthType(str2);
            consumeMessageContext.setAccountOwnerParent(str3);
            consumeMessageContext.setAccountOwnerSelf(str4);
            consumeMessageContext.setNamespace(NamespaceUtil.getNamespaceFromResource(pullMessageRequestHeader.getTopic()));
            switch (i) {
                case 0:
                    int msgCount4Commercial = getMessageResult.getMsgCount4Commercial() * this.brokerController.getBrokerConfig().getCommercialBaseCount();
                    consumeMessageContext.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_SUCCESS);
                    consumeMessageContext.setCommercialRcvTimes(msgCount4Commercial);
                    consumeMessageContext.setCommercialRcvSize(getMessageResult.getBufferTotalSize());
                    consumeMessageContext.setCommercialOwner(str);
                    consumeMessageContext.setRcvStat(BrokerStatsManager.StatsType.RCV_SUCCESS);
                    consumeMessageContext.setRcvMsgNum(getMessageResult.getMessageCount());
                    consumeMessageContext.setRcvMsgSize(getMessageResult.getBufferTotalSize());
                    consumeMessageContext.setCommercialRcvMsgNum(getMessageResult.getMsgCount4Commercial());
                    break;
                case 19:
                    if (!z) {
                        consumeMessageContext.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);
                        consumeMessageContext.setCommercialRcvTimes(1);
                        consumeMessageContext.setCommercialOwner(str);
                        consumeMessageContext.setRcvStat(BrokerStatsManager.StatsType.RCV_EPOLLS);
                        consumeMessageContext.setRcvMsgNum(0);
                        consumeMessageContext.setRcvMsgSize(0);
                        consumeMessageContext.setCommercialRcvMsgNum(0);
                        break;
                    }
                    break;
                case 20:
                case 21:
                    consumeMessageContext.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);
                    consumeMessageContext.setCommercialRcvTimes(1);
                    consumeMessageContext.setCommercialOwner(str);
                    consumeMessageContext.setRcvStat(BrokerStatsManager.StatsType.RCV_EPOLLS);
                    consumeMessageContext.setRcvMsgNum(0);
                    consumeMessageContext.setRcvMsgSize(0);
                    consumeMessageContext.setCommercialRcvMsgNum(0);
                    break;
                default:
                    if (!$assertionsDisabled) {
                        throw new AssertionError();
                    }
                    break;
            }
            Iterator<ConsumeMessageHook> it = this.consumeMessageHookList.iterator();
            while (it.hasNext()) {
                try {
                    it.next().consumeMessageBefore(consumeMessageContext);
                } catch (Throwable th) {
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void tryCommitOffset(boolean z, PullMessageRequestHeader pullMessageRequestHeader, long j, String str) {
        this.brokerController.getConsumerOffsetManager().commitPullOffset(str, pullMessageRequestHeader.getConsumerGroup(), pullMessageRequestHeader.getTopic(), pullMessageRequestHeader.getQueueId().intValue(), j);
        if (z && PullSysFlag.hasCommitOffsetFlag(pullMessageRequestHeader.getSysFlag().intValue())) {
            this.brokerController.getConsumerOffsetManager().commitOffset(str, pullMessageRequestHeader.getConsumerGroup(), pullMessageRequestHeader.getTopic(), pullMessageRequestHeader.getQueueId().intValue(), pullMessageRequestHeader.getCommitOffset().longValue());
        }
    }

    public void executeRequestWhenWakeup(Channel channel, RemotingCommand remotingCommand) {
        this.brokerController.getPullMessageExecutor().submit((Runnable) new RequestTask(() -> {
            try {
                RemotingCommand processRequest = processRequest(channel, remotingCommand, false);
                if (processRequest != null) {
                    processRequest.setOpaque(remotingCommand.getOpaque());
                    processRequest.markResponseType();
                    try {
                        NettyRemotingAbstract.writeResponse(channel, remotingCommand, processRequest, future -> {
                            if (future.isSuccess()) {
                                return;
                            }
                            LOGGER.error("processRequestWrapper response to {} failed", channel.remoteAddress(), future.cause());
                            LOGGER.error(remotingCommand.toString());
                            LOGGER.error(processRequest.toString());
                        });
                    } catch (Throwable th) {
                        LOGGER.error("processRequestWrapper process request over, but response failed", th);
                        LOGGER.error(remotingCommand.toString());
                        LOGGER.error(processRequest.toString());
                    }
                }
            } catch (RemotingCommandException e) {
                LOGGER.error("excuteRequestWhenWakeup run", e);
            }
        }, channel, remotingCommand));
    }

    public void registerConsumeMessageHook(List<ConsumeMessageHook> list) {
        this.consumeMessageHookList = list;
    }

    public void setPullMessageResultHandler(PullMessageResultHandler pullMessageResultHandler) {
        this.pullMessageResultHandler = pullMessageResultHandler;
    }

    private boolean isBroadcast(boolean z, ConsumerGroupInfo consumerGroupInfo) {
        return z || (consumerGroupInfo != null && MessageModel.BROADCASTING.equals(consumerGroupInfo.getMessageModel()) && ConsumeType.CONSUME_PASSIVELY.equals(consumerGroupInfo.getConsumeType()));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void updateBroadcastPulledOffset(String str, String str2, int i, PullMessageRequestHeader pullMessageRequestHeader, Channel channel, RemotingCommand remotingCommand, long j) {
        String clientId;
        if (remotingCommand == null || !this.brokerController.getBrokerConfig().isEnableBroadcastOffsetStore()) {
            return;
        }
        boolean equals = Objects.equals(Integer.valueOf(RequestSource.PROXY_FOR_BROADCAST.getValue()), pullMessageRequestHeader.getRequestSource());
        ConsumerGroupInfo consumerGroupInfo = this.brokerController.getConsumerManager().getConsumerGroupInfo(str2);
        if (isBroadcast(equals, consumerGroupInfo)) {
            long longValue = pullMessageRequestHeader.getQueueOffset().longValue();
            if (21 == remotingCommand.getCode()) {
                longValue = j;
            }
            if (equals) {
                clientId = pullMessageRequestHeader.getProxyFrowardClientId();
            } else {
                ClientChannelInfo findChannel = consumerGroupInfo.findChannel(channel);
                if (findChannel == null) {
                    return;
                } else {
                    clientId = findChannel.getClientId();
                }
            }
            this.brokerController.getBroadcastOffsetManager().updateOffset(str, str2, i, longValue, clientId, equals);
        }
    }

    protected long queryBroadcastPullInitOffset(String str, String str2, int i, PullMessageRequestHeader pullMessageRequestHeader, Channel channel) {
        String clientId;
        if (!this.brokerController.getBrokerConfig().isEnableBroadcastOffsetStore()) {
            return -1L;
        }
        ConsumerGroupInfo consumerGroupInfo = this.brokerController.getConsumerManager().getConsumerGroupInfo(str2);
        boolean equals = Objects.equals(Integer.valueOf(RequestSource.PROXY_FOR_BROADCAST.getValue()), pullMessageRequestHeader.getRequestSource());
        if (!isBroadcast(equals, consumerGroupInfo)) {
            return -1L;
        }
        if (equals) {
            clientId = pullMessageRequestHeader.getProxyFrowardClientId();
        } else {
            ClientChannelInfo findChannel = consumerGroupInfo.findChannel(channel);
            if (findChannel == null) {
                return -1L;
            }
            clientId = findChannel.getClientId();
        }
        return this.brokerController.getBroadcastOffsetManager().queryInitOffset(str, str2, i, clientId, pullMessageRequestHeader.getQueueOffset().longValue(), equals).longValue();
    }

    static {
        $assertionsDisabled = !PullMessageProcessor.class.desiredAssertionStatus();
        LOGGER = LoggerFactory.getLogger("RocketmqBroker");
    }
}
