package org.apache.rocketmq.broker.processor;

import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.longpolling.NotificationRequest;
import org.apache.rocketmq.common.AbstractBrokerRunnable;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.header.NotificationRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.NotificationResponseHeader;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;

/* loaded from: input_file:org/apache/rocketmq/broker/processor/NotificationProcessor.class */
public class NotificationProcessor implements NettyRequestProcessor {
    private static final Logger POP_LOGGER = LoggerFactory.getLogger("RocketmqPop");
    private final BrokerController brokerController;
    private static final String BORN_TIME = "bornTime";
    private Thread checkNotificationPollingThread;
    private Random random = new Random(System.currentTimeMillis());
    private ConcurrentLinkedHashMap<String, ArrayBlockingQueue<NotificationRequest>> pollingMap = new ConcurrentLinkedHashMap.Builder().maximumWeightedCapacity(100000).build();

    public NotificationProcessor(BrokerController brokerController) {
        this.brokerController = brokerController;
        this.checkNotificationPollingThread = new Thread((Runnable) new AbstractBrokerRunnable(brokerController.getBrokerConfig()) { // from class: org.apache.rocketmq.broker.processor.NotificationProcessor.1
            public void run0() {
                NotificationRequest notificationRequest;
                while (!Thread.currentThread().isInterrupted()) {
                    try {
                        Thread.sleep(200L);
                        for (ArrayBlockingQueue arrayBlockingQueue : NotificationProcessor.this.pollingMap.values()) {
                            for (NotificationRequest notificationRequest2 = (NotificationRequest) arrayBlockingQueue.peek(); notificationRequest2 != null && notificationRequest2.isTimeout() && (notificationRequest = (NotificationRequest) arrayBlockingQueue.poll()) != null; notificationRequest2 = (NotificationRequest) arrayBlockingQueue.peek()) {
                                NotificationProcessor.POP_LOGGER.info("timeout , wakeUp Notification : {}", notificationRequest);
                                NotificationProcessor.this.wakeUp(notificationRequest);
                            }
                        }
                    } catch (InterruptedException e) {
                        return;
                    } catch (Exception e2) {
                        NotificationProcessor.POP_LOGGER.error("checkNotificationPolling error", e2);
                    }
                }
            }
        });
        this.checkNotificationPollingThread.setDaemon(true);
        this.checkNotificationPollingThread.setName("checkNotificationPolling");
        this.checkNotificationPollingThread.start();
    }

    public void shutdown() {
        this.checkNotificationPollingThread.interrupt();
    }

    public RemotingCommand processRequest(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand) throws RemotingCommandException {
        remotingCommand.addExtField(BORN_TIME, String.valueOf(System.currentTimeMillis()));
        return processRequest(channelHandlerContext.channel(), remotingCommand);
    }

    public boolean rejectRequest() {
        return false;
    }

    public void notifyMessageArriving(String str, int i) {
        notifyMessageArrivingForQueue(str, -1);
        if (i > 0) {
            notifyMessageArrivingForQueue(str, i);
        }
    }

    public void notifyMessageArrivingForQueue(String str, int i) {
        ArrayBlockingQueue arrayBlockingQueue = (ArrayBlockingQueue) this.pollingMap.get(KeyBuilder.buildPollingNotificationKey(str, i));
        if (arrayBlockingQueue != null) {
            ArrayList<NotificationRequest> arrayList = new ArrayList();
            arrayBlockingQueue.drainTo(arrayList);
            for (NotificationRequest notificationRequest : arrayList) {
                POP_LOGGER.info("new msg arrive , wakeUp : {}", notificationRequest);
                wakeUp(notificationRequest);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void wakeUp(NotificationRequest notificationRequest) {
        if (notificationRequest != null && notificationRequest.complete() && notificationRequest.getChannel().isActive()) {
            this.brokerController.getPullMessageExecutor().submit((Runnable) new RequestTask(() -> {
                try {
                    RemotingCommand processRequest = processRequest(notificationRequest.getChannel(), notificationRequest.getRemotingCommand());
                    if (processRequest != null) {
                        processRequest.setOpaque(notificationRequest.getRemotingCommand().getOpaque());
                        processRequest.markResponseType();
                        NettyRemotingAbstract.writeResponse(notificationRequest.getChannel(), notificationRequest.getRemotingCommand(), processRequest, future -> {
                            if (future.isSuccess()) {
                                return;
                            }
                            POP_LOGGER.error("ProcessRequestWrapper response to {} failed", notificationRequest.getChannel().remoteAddress(), future.cause());
                            POP_LOGGER.error(notificationRequest.toString());
                            POP_LOGGER.error(processRequest.toString());
                        });
                    }
                } catch (RemotingCommandException e) {
                    POP_LOGGER.error("ExecuteRequestWhenWakeup run", e);
                }
            }, notificationRequest.getChannel(), notificationRequest.getRemotingCommand()));
        }
    }

    private RemotingCommand processRequest(Channel channel, RemotingCommand remotingCommand) throws RemotingCommandException {
        TopicConfig selectTopicConfig;
        TopicConfig selectTopicConfig2;
        RemotingCommand createResponseCommand = RemotingCommand.createResponseCommand(NotificationResponseHeader.class);
        NotificationResponseHeader readCustomHeader = createResponseCommand.readCustomHeader();
        NotificationRequestHeader notificationRequestHeader = (NotificationRequestHeader) remotingCommand.decodeCommandCustomHeader(NotificationRequestHeader.class);
        createResponseCommand.setOpaque(remotingCommand.getOpaque());
        if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
            createResponseCommand.setCode(16);
            createResponseCommand.setRemark(String.format("the broker[%s] peeking message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1()));
            return createResponseCommand;
        }
        TopicConfig selectTopicConfig3 = this.brokerController.getTopicConfigManager().selectTopicConfig(notificationRequestHeader.getTopic());
        if (null == selectTopicConfig3) {
            POP_LOGGER.error("The topic {} not exist, consumer: {} ", notificationRequestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));
            createResponseCommand.setCode(17);
            createResponseCommand.setRemark(String.format("topic[%s] not exist, apply first please! %s", notificationRequestHeader.getTopic(), FAQUrl.suggestTodo("https://rocketmq.apache.org/docs/bestPractice/06FAQ")));
            return createResponseCommand;
        }
        if (!PermName.isReadable(selectTopicConfig3.getPerm())) {
            createResponseCommand.setCode(16);
            createResponseCommand.setRemark("the topic[" + notificationRequestHeader.getTopic() + "] peeking message is forbidden");
            return createResponseCommand;
        }
        if (notificationRequestHeader.getQueueId().intValue() >= selectTopicConfig3.getReadQueueNums()) {
            String format = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]", notificationRequestHeader.getQueueId(), notificationRequestHeader.getTopic(), Integer.valueOf(selectTopicConfig3.getReadQueueNums()), channel.remoteAddress());
            POP_LOGGER.warn(format);
            createResponseCommand.setCode(1);
            createResponseCommand.setRemark(format);
            return createResponseCommand;
        }
        SubscriptionGroupConfig findSubscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(notificationRequestHeader.getConsumerGroup());
        if (null == findSubscriptionGroupConfig) {
            createResponseCommand.setCode(26);
            createResponseCommand.setRemark(String.format("subscription group [%s] does not exist, %s", notificationRequestHeader.getConsumerGroup(), FAQUrl.suggestTodo("https://rocketmq.apache.org/docs/bestPractice/06FAQ")));
            return createResponseCommand;
        }
        if (!findSubscriptionGroupConfig.isConsumeEnable()) {
            createResponseCommand.setCode(16);
            createResponseCommand.setRemark("subscription group no permission, " + notificationRequestHeader.getConsumerGroup());
            return createResponseCommand;
        }
        int nextInt = this.random.nextInt(100);
        boolean z = false;
        boolean z2 = nextInt % 5 == 0;
        if (z2 && (selectTopicConfig2 = this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopic(notificationRequestHeader.getTopic(), notificationRequestHeader.getConsumerGroup()))) != null) {
            for (int i = 0; i < selectTopicConfig2.getReadQueueNums(); i++) {
                z = hasMsgFromQueue(true, notificationRequestHeader, (nextInt + i) % selectTopicConfig2.getReadQueueNums());
                if (z) {
                    break;
                }
            }
        }
        if (!z) {
            if (notificationRequestHeader.getQueueId().intValue() < 0) {
                for (int i2 = 0; i2 < selectTopicConfig3.getReadQueueNums(); i2++) {
                    z = hasMsgFromQueue(false, notificationRequestHeader, (nextInt + i2) % selectTopicConfig3.getReadQueueNums());
                    if (z) {
                        break;
                    }
                }
            } else {
                z = hasMsgFromQueue(false, notificationRequestHeader, notificationRequestHeader.getQueueId().intValue());
            }
            if (!z2 && !z && (selectTopicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopic(notificationRequestHeader.getTopic(), notificationRequestHeader.getConsumerGroup()))) != null) {
                for (int i3 = 0; i3 < selectTopicConfig.getReadQueueNums(); i3++) {
                    z = hasMsgFromQueue(true, notificationRequestHeader, (nextInt + i3) % selectTopicConfig.getReadQueueNums());
                    if (z) {
                        break;
                    }
                }
            }
        }
        if (!z && polling(channel, remotingCommand, notificationRequestHeader)) {
            return null;
        }
        createResponseCommand.setCode(0);
        readCustomHeader.setHasMsg(z);
        return createResponseCommand;
    }

    private boolean hasMsgFromQueue(boolean z, NotificationRequestHeader notificationRequestHeader, int i) {
        if (this.brokerController.getConsumerOrderInfoManager().checkBlock(notificationRequestHeader.getTopic(), notificationRequestHeader.getConsumerGroup(), i, 0L)) {
            return false;
        }
        String buildPopRetryTopic = z ? KeyBuilder.buildPopRetryTopic(notificationRequestHeader.getTopic(), notificationRequestHeader.getConsumerGroup()) : notificationRequestHeader.getTopic();
        return this.brokerController.getMessageStore().getMaxOffsetInQueue(buildPopRetryTopic, i) - getPopOffset(buildPopRetryTopic, notificationRequestHeader.getConsumerGroup(), i) > 0;
    }

    private long getPopOffset(String str, String str2, int i) {
        long queryOffset = this.brokerController.getConsumerOffsetManager().queryOffset(str2, str, i);
        if (queryOffset < 0) {
            queryOffset = this.brokerController.getMessageStore().getMinOffsetInQueue(str, i);
        }
        long latestOffset = this.brokerController.getPopMessageProcessor().getPopBufferMergeService().getLatestOffset(str, str2, i);
        if (latestOffset >= 0 && latestOffset > queryOffset) {
            return latestOffset;
        }
        return queryOffset;
    }

    private boolean polling(Channel channel, RemotingCommand remotingCommand, NotificationRequestHeader notificationRequestHeader) {
        if (notificationRequestHeader.getPollTime() <= 0) {
            return false;
        }
        NotificationRequest notificationRequest = new NotificationRequest(remotingCommand, channel, notificationRequestHeader.getBornTime() + notificationRequestHeader.getPollTime());
        boolean z = false;
        if (!notificationRequest.isTimeout()) {
            String buildPollingNotificationKey = KeyBuilder.buildPollingNotificationKey(notificationRequestHeader.getTopic(), notificationRequestHeader.getQueueId().intValue());
            ArrayBlockingQueue arrayBlockingQueue = (ArrayBlockingQueue) this.pollingMap.get(buildPollingNotificationKey);
            if (arrayBlockingQueue == null) {
                ArrayBlockingQueue arrayBlockingQueue2 = new ArrayBlockingQueue(this.brokerController.getBrokerConfig().getPopPollingSize());
                this.pollingMap.put(buildPollingNotificationKey, arrayBlockingQueue2);
                z = arrayBlockingQueue2.offer(notificationRequest);
            } else {
                z = arrayBlockingQueue.offer(notificationRequest);
            }
        }
        POP_LOGGER.info("polling {}, result {}", remotingCommand, Boolean.valueOf(z));
        return z;
    }
}
