package org.apache.rocketmq.broker.processor;

import io.netty.channel.Channel;
import io.opentelemetry.api.common.Attributes;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.longpolling.PullRequest;
import org.apache.rocketmq.broker.metrics.BrokerMetricsConstant;
import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer;
import org.apache.rocketmq.broker.plugin.PullMessageResultHandler;
import org.apache.rocketmq.common.AbortProcessException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.common.utils.NetworkUtil;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.metrics.RemotingMetricsManager;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.PullMessageResponseHeader;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingContext;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.protocol.topic.OffsetMovedEvent;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.config.BrokerRole;

/* loaded from: input_file:org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.class */
public class DefaultPullMessageResultHandler implements PullMessageResultHandler {
    protected static final Logger log;
    protected final BrokerController brokerController;
    static final /* synthetic */ boolean $assertionsDisabled;

    public DefaultPullMessageResultHandler(BrokerController brokerController) {
        this.brokerController = brokerController;
    }

    @Override // org.apache.rocketmq.broker.plugin.PullMessageResultHandler
    public RemotingCommand handle(GetMessageResult getMessageResult, RemotingCommand remotingCommand, PullMessageRequestHeader pullMessageRequestHeader, Channel channel, SubscriptionData subscriptionData, SubscriptionGroupConfig subscriptionGroupConfig, boolean z, MessageFilter messageFilter, RemotingCommand remotingCommand2, TopicQueueMappingContext topicQueueMappingContext) {
        PullMessageProcessor pullMessageProcessor = this.brokerController.getPullMessageProcessor();
        String parseChannelRemoteAddr = RemotingHelper.parseChannelRemoteAddr(channel);
        pullMessageProcessor.composeResponseHeader(pullMessageRequestHeader, getMessageResult, this.brokerController.getTopicConfigManager().selectTopicConfig(pullMessageRequestHeader.getTopic()).getTopicSysFlag(), subscriptionGroupConfig, remotingCommand2, parseChannelRemoteAddr);
        try {
            pullMessageProcessor.executeConsumeMessageHookBefore(remotingCommand, pullMessageRequestHeader, getMessageResult, z, remotingCommand2.getCode());
            PullMessageResponseHeader pullMessageResponseHeader = (PullMessageResponseHeader) remotingCommand2.readCustomHeader();
            RemotingCommand rewriteResponseForStaticTopic = pullMessageProcessor.rewriteResponseForStaticTopic(pullMessageRequestHeader, pullMessageResponseHeader, topicQueueMappingContext, remotingCommand2.getCode());
            if (rewriteResponseForStaticTopic != null) {
                remotingCommand2 = rewriteResponseForStaticTopic;
            }
            pullMessageProcessor.updateBroadcastPulledOffset(pullMessageRequestHeader.getTopic(), pullMessageRequestHeader.getConsumerGroup(), pullMessageRequestHeader.getQueueId().intValue(), pullMessageRequestHeader, channel, remotingCommand2, getMessageResult.getNextBeginOffset());
            pullMessageProcessor.tryCommitOffset(z, pullMessageRequestHeader, getMessageResult.getNextBeginOffset(), parseChannelRemoteAddr);
            switch (remotingCommand2.getCode()) {
                case 0:
                    this.brokerController.getBrokerStatsManager().incGroupGetNums(pullMessageRequestHeader.getConsumerGroup(), pullMessageRequestHeader.getTopic(), getMessageResult.getMessageCount());
                    this.brokerController.getBrokerStatsManager().incGroupGetSize(pullMessageRequestHeader.getConsumerGroup(), pullMessageRequestHeader.getTopic(), getMessageResult.getBufferTotalSize());
                    this.brokerController.getBrokerStatsManager().incBrokerGetNums(pullMessageRequestHeader.getTopic(), getMessageResult.getMessageCount());
                    if (!BrokerMetricsManager.isRetryOrDlqTopic(pullMessageRequestHeader.getTopic())) {
                        Attributes build = BrokerMetricsManager.newAttributesBuilder().put(BrokerMetricsConstant.LABEL_TOPIC, pullMessageRequestHeader.getTopic()).put(BrokerMetricsConstant.LABEL_CONSUMER_GROUP, pullMessageRequestHeader.getConsumerGroup()).put(BrokerMetricsConstant.LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(pullMessageRequestHeader.getTopic()) || MixAll.isSysConsumerGroup(pullMessageRequestHeader.getConsumerGroup())).build();
                        BrokerMetricsManager.messagesOutTotal.add(getMessageResult.getMessageCount(), build);
                        BrokerMetricsManager.throughputOutTotal.add(getMessageResult.getBufferTotalSize(), build);
                    }
                    if (!channelIsWritable(channel, pullMessageRequestHeader)) {
                        getMessageResult.release();
                        return null;
                    }
                    if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
                        long now = this.brokerController.getMessageStore().now();
                        byte[] readGetMessageResult = readGetMessageResult(getMessageResult, pullMessageRequestHeader.getConsumerGroup(), pullMessageRequestHeader.getTopic(), pullMessageRequestHeader.getQueueId().intValue());
                        this.brokerController.getBrokerStatsManager().incGroupGetLatency(pullMessageRequestHeader.getConsumerGroup(), pullMessageRequestHeader.getTopic(), pullMessageRequestHeader.getQueueId().intValue(), (int) (this.brokerController.getMessageStore().now() - now));
                        remotingCommand2.setBody(readGetMessageResult);
                        return remotingCommand2;
                    }
                    try {
                        RemotingCommand remotingCommand3 = remotingCommand2;
                        channel.writeAndFlush(new ManyMessageTransfer(remotingCommand2.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult)).addListener(channelFuture -> {
                            getMessageResult.release();
                            RemotingMetricsManager.rpcLatency.record(remotingCommand.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), RemotingMetricsManager.newAttributesBuilder().put("request_code", RemotingMetricsManager.getRequestCodeDesc(remotingCommand.getCode())).put("response_code", RemotingMetricsManager.getResponseCodeDesc(remotingCommand3.getCode())).put("result", RemotingMetricsManager.getWriteAndFlushResult(channelFuture)).build());
                            if (channelFuture.isSuccess()) {
                                return;
                            }
                            log.error("Fail to transfer messages from page cache to {}", channel.remoteAddress(), channelFuture.cause());
                        });
                        return null;
                    } catch (Throwable th) {
                        log.error("Error occurred when transferring messages from page cache", th);
                        getMessageResult.release();
                        return null;
                    }
                case 19:
                    boolean hasSuspendFlag = PullSysFlag.hasSuspendFlag(pullMessageRequestHeader.getSysFlag().intValue());
                    long longValue = hasSuspendFlag ? pullMessageRequestHeader.getSuspendTimeoutMillis().longValue() : 0L;
                    if (z && hasSuspendFlag) {
                        long j = longValue;
                        if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                            j = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
                        }
                        this.brokerController.getPullRequestHoldService().suspendPullRequest(pullMessageRequestHeader.getTopic(), pullMessageRequestHeader.getQueueId().intValue(), new PullRequest(remotingCommand, channel, j, this.brokerController.getMessageStore().now(), pullMessageRequestHeader.getQueueOffset().longValue(), subscriptionData, messageFilter));
                        return null;
                    }
                    break;
                case 20:
                    break;
                case 21:
                    if (this.brokerController.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE && !this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) {
                        pullMessageResponseHeader.setSuggestWhichBrokerId(Long.valueOf(subscriptionGroupConfig.getBrokerId()));
                        remotingCommand2.setCode(20);
                        log.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}", new Object[]{pullMessageRequestHeader.getTopic(), pullMessageRequestHeader.getConsumerGroup(), pullMessageRequestHeader.getQueueOffset(), pullMessageResponseHeader.getSuggestWhichBrokerId()});
                        break;
                    } else {
                        MessageQueue messageQueue = new MessageQueue();
                        messageQueue.setTopic(pullMessageRequestHeader.getTopic());
                        messageQueue.setQueueId(pullMessageRequestHeader.getQueueId().intValue());
                        messageQueue.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
                        OffsetMovedEvent offsetMovedEvent = new OffsetMovedEvent();
                        offsetMovedEvent.setConsumerGroup(pullMessageRequestHeader.getConsumerGroup());
                        offsetMovedEvent.setMessageQueue(messageQueue);
                        offsetMovedEvent.setOffsetRequest(pullMessageRequestHeader.getQueueOffset().longValue());
                        offsetMovedEvent.setOffsetNew(getMessageResult.getNextBeginOffset());
                        log.warn("PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}", new Object[]{pullMessageRequestHeader.getTopic(), pullMessageRequestHeader.getConsumerGroup(), Long.valueOf(offsetMovedEvent.getOffsetRequest()), Long.valueOf(offsetMovedEvent.getOffsetNew()), pullMessageResponseHeader.getSuggestWhichBrokerId()});
                        break;
                    }
                    break;
                default:
                    log.warn("[BUG] impossible result code of get message: {}", Integer.valueOf(remotingCommand2.getCode()));
                    if (!$assertionsDisabled) {
                        throw new AssertionError();
                    }
                    break;
            }
            return remotingCommand2;
        } catch (AbortProcessException e) {
            remotingCommand2.setCode(e.getResponseCode());
            remotingCommand2.setRemark(e.getErrorMessage());
            return remotingCommand2;
        }
    }

    private boolean channelIsWritable(Channel channel, PullMessageRequestHeader pullMessageRequestHeader) {
        if (!this.brokerController.getBrokerConfig().isEnableNetWorkFlowControl() || channel.isWritable()) {
            return true;
        }
        log.warn("channel {} not writable ,cid {}", channel.remoteAddress(), pullMessageRequestHeader.getConsumerGroup());
        return false;
    }

    protected byte[] readGetMessageResult(GetMessageResult getMessageResult, String str, String str2, int i) {
        ByteBuffer allocate = ByteBuffer.allocate(getMessageResult.getBufferTotalSize());
        long j = 0;
        try {
            for (ByteBuffer byteBuffer : getMessageResult.getMessageBufferList()) {
                allocate.put(byteBuffer);
                j = byteBuffer.getLong(48 + ((byteBuffer.getInt(36) & 16) == 0 ? 8 : 20));
            }
            this.brokerController.getBrokerStatsManager().recordDiskFallBehindTime(str, str2, i, this.brokerController.getMessageStore().now() - j);
            return allocate.array();
        } finally {
            getMessageResult.release();
        }
    }

    protected void generateOffsetMovedEvent(OffsetMovedEvent offsetMovedEvent) {
        try {
            MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner();
            messageExtBrokerInner.setTopic("OFFSET_MOVED_EVENT");
            messageExtBrokerInner.setTags(offsetMovedEvent.getConsumerGroup());
            messageExtBrokerInner.setDelayTimeLevel(0);
            messageExtBrokerInner.setKeys(offsetMovedEvent.getConsumerGroup());
            messageExtBrokerInner.setBody(offsetMovedEvent.encode());
            messageExtBrokerInner.setFlag(0);
            messageExtBrokerInner.setPropertiesString(MessageDecoder.messageProperties2String(messageExtBrokerInner.getProperties()));
            messageExtBrokerInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(TopicFilterType.SINGLE_TAG, messageExtBrokerInner.getTags()));
            messageExtBrokerInner.setQueueId(0);
            messageExtBrokerInner.setSysFlag(0);
            messageExtBrokerInner.setBornTimestamp(System.currentTimeMillis());
            messageExtBrokerInner.setBornHost(NetworkUtil.string2SocketAddress(this.brokerController.getBrokerAddr()));
            messageExtBrokerInner.setStoreHost(messageExtBrokerInner.getBornHost());
            messageExtBrokerInner.setReconsumeTimes(0);
            this.brokerController.getMessageStore().putMessage(messageExtBrokerInner);
        } catch (Exception e) {
            log.warn(String.format("generateOffsetMovedEvent Exception, %s", offsetMovedEvent.toString()), e);
        }
    }

    static {
        $assertionsDisabled = !DefaultPullMessageResultHandler.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger("RocketmqBroker");
    }
}
