package org.apache.rocketmq.broker.failover;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.store.AppendMessageResult;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;

/* loaded from: input_file:org/apache/rocketmq/broker/failover/EscapeBridge.class */
public class EscapeBridge {
    protected static final Logger LOG = LoggerFactory.getLogger("RocketmqBroker");
    private static final long SEND_TIMEOUT = 3000;
    private static final long DEFAULT_PULL_TIMEOUT_MILLIS = 10000;
    private final String innerProducerGroupName;
    private final String innerConsumerGroupName;
    private final BrokerController brokerController;
    private ExecutorService defaultAsyncSenderExecutor;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.rocketmq.broker.failover.EscapeBridge$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/rocketmq/broker/failover/EscapeBridge$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$rocketmq$client$producer$SendStatus = new int[SendStatus.values().length];

        static {
            try {
                $SwitchMap$org$apache$rocketmq$client$producer$SendStatus[SendStatus.SEND_OK.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$client$producer$SendStatus[SendStatus.SLAVE_NOT_AVAILABLE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$client$producer$SendStatus[SendStatus.FLUSH_DISK_TIMEOUT.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$client$producer$SendStatus[SendStatus.FLUSH_SLAVE_TIMEOUT.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    public EscapeBridge(BrokerController brokerController) {
        this.brokerController = brokerController;
        this.innerProducerGroupName = "InnerProducerGroup_" + brokerController.getBrokerConfig().getBrokerName() + "_" + brokerController.getBrokerConfig().getBrokerId();
        this.innerConsumerGroupName = "InnerConsumerGroup_" + brokerController.getBrokerConfig().getBrokerName() + "_" + brokerController.getBrokerConfig().getBrokerId();
    }

    public void start() throws Exception {
        if (this.brokerController.getBrokerConfig().isEnableSlaveActingMaster() && this.brokerController.getBrokerConfig().isEnableRemoteEscape()) {
            this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 60000L, TimeUnit.MILLISECONDS, (BlockingQueue<Runnable>) new LinkedBlockingQueue(50000), (ThreadFactory) new ThreadFactoryImpl("AsyncEscapeBridgeExecutor_", this.brokerController.getBrokerIdentity()));
            LOG.info("init executor for escaping messages asynchronously success.");
        }
    }

    public void shutdown() {
        if (null != this.defaultAsyncSenderExecutor) {
            this.defaultAsyncSenderExecutor.shutdown();
        }
    }

    public PutMessageResult putMessage(MessageExtBrokerInner messageExtBrokerInner) {
        BrokerController peekMasterBroker = this.brokerController.peekMasterBroker();
        if (peekMasterBroker != null) {
            return peekMasterBroker.getMessageStore().putMessage(messageExtBrokerInner);
        }
        if (!this.brokerController.getBrokerConfig().isEnableSlaveActingMaster() || !this.brokerController.getBrokerConfig().isEnableRemoteEscape()) {
            LOG.warn("Put message failed, enableSlaveActingMaster={}, enableRemoteEscape={}.", Boolean.valueOf(this.brokerController.getBrokerConfig().isEnableSlaveActingMaster()), Boolean.valueOf(this.brokerController.getBrokerConfig().isEnableRemoteEscape()));
            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, (AppendMessageResult) null);
        }
        try {
            messageExtBrokerInner.setWaitStoreMsgOK(false);
            return transformSendResult2PutResult(putMessageToRemoteBroker(messageExtBrokerInner));
        } catch (Exception e) {
            LOG.error("sendMessageInFailover to remote failed", e);
            return new PutMessageResult(PutMessageStatus.PUT_TO_REMOTE_BROKER_FAIL, (AppendMessageResult) null, true);
        }
    }

    private SendResult putMessageToRemoteBroker(MessageExtBrokerInner messageExtBrokerInner) {
        MessageExtBrokerInner messageExtBrokerInner2 = messageExtBrokerInner;
        if (TransactionalMessageUtil.buildHalfTopic().equals(messageExtBrokerInner.getTopic())) {
            messageExtBrokerInner2 = TransactionalMessageUtil.buildTransactionalMessageFromHalfMessage(messageExtBrokerInner);
        }
        TopicPublishInfo tryToFindTopicPublishInfo = this.brokerController.getTopicRouteInfoManager().tryToFindTopicPublishInfo(messageExtBrokerInner2.getTopic());
        if (null == tryToFindTopicPublishInfo || !tryToFindTopicPublishInfo.ok()) {
            LOG.warn("putMessageToRemoteBroker: no route info of topic {} when escaping message, msgId={}", messageExtBrokerInner2.getTopic(), messageExtBrokerInner2.getMsgId());
            return null;
        }
        MessageQueue selectOneMessageQueue = tryToFindTopicPublishInfo.selectOneMessageQueue();
        messageExtBrokerInner2.setQueueId(selectOneMessageQueue.getQueueId());
        String brokerName = selectOneMessageQueue.getBrokerName();
        String findBrokerAddressInPublish = this.brokerController.getTopicRouteInfoManager().findBrokerAddressInPublish(brokerName);
        long currentTimeMillis = System.currentTimeMillis();
        try {
            try {
                SendResult sendMessageToSpecificBroker = this.brokerController.getBrokerOuterAPI().sendMessageToSpecificBroker(findBrokerAddressInPublish, brokerName, messageExtBrokerInner2, getProducerGroup(messageExtBrokerInner2), SEND_TIMEOUT);
                if (null != sendMessageToSpecificBroker && SendStatus.SEND_OK.equals(sendMessageToSpecificBroker.getSendStatus())) {
                    return sendMessageToSpecificBroker;
                }
                LOG.error("Escaping failed! cost {}ms, Topic: {}, MsgId: {}, Broker: {}", new Object[]{Long.valueOf(System.currentTimeMillis() - currentTimeMillis), messageExtBrokerInner.getTopic(), messageExtBrokerInner.getMsgId(), brokerName});
                return null;
            } catch (InterruptedException e) {
                LOG.error(String.format("putMessageToRemoteBroker interrupted, MsgId: %s, RT: %sms, Broker: %s", messageExtBrokerInner2.getMsgId(), Long.valueOf(System.currentTimeMillis() - currentTimeMillis), selectOneMessageQueue), e);
                Thread.currentThread().interrupt();
                return null;
            }
        } catch (RemotingException | MQBrokerException e2) {
            LOG.error(String.format("putMessageToRemoteBroker exception, MsgId: %s, RT: %sms, Broker: %s", messageExtBrokerInner2.getMsgId(), Long.valueOf(System.currentTimeMillis() - currentTimeMillis), selectOneMessageQueue), e2);
            return null;
        }
    }

    public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner messageExtBrokerInner) {
        BrokerController peekMasterBroker = this.brokerController.peekMasterBroker();
        if (peekMasterBroker != null) {
            return peekMasterBroker.getMessageStore().asyncPutMessage(messageExtBrokerInner);
        }
        if (!this.brokerController.getBrokerConfig().isEnableSlaveActingMaster() || !this.brokerController.getBrokerConfig().isEnableRemoteEscape()) {
            LOG.warn("Put message failed, enableSlaveActingMaster={}, enableRemoteEscape={}.", Boolean.valueOf(this.brokerController.getBrokerConfig().isEnableSlaveActingMaster()), Boolean.valueOf(this.brokerController.getBrokerConfig().isEnableRemoteEscape()));
            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, (AppendMessageResult) null));
        }
        try {
            messageExtBrokerInner.setWaitStoreMsgOK(false);
            TopicPublishInfo tryToFindTopicPublishInfo = this.brokerController.getTopicRouteInfoManager().tryToFindTopicPublishInfo(messageExtBrokerInner.getTopic());
            String producerGroup = getProducerGroup(messageExtBrokerInner);
            MessageQueue selectOneMessageQueue = tryToFindTopicPublishInfo.selectOneMessageQueue();
            messageExtBrokerInner.setQueueId(selectOneMessageQueue.getQueueId());
            String brokerName = selectOneMessageQueue.getBrokerName();
            return this.brokerController.getBrokerOuterAPI().sendMessageToSpecificBrokerAsync(this.brokerController.getTopicRouteInfoManager().findBrokerAddressInPublish(brokerName), brokerName, messageExtBrokerInner, producerGroup, SEND_TIMEOUT).exceptionally(th -> {
                return null;
            }).thenApplyAsync(sendResult -> {
                return transformSendResult2PutResult(sendResult);
            }, (Executor) this.defaultAsyncSenderExecutor).exceptionally((Function<Throwable, ? extends U>) th2 -> {
                return transformSendResult2PutResult(null);
            });
        } catch (Exception e) {
            LOG.error("sendMessageInFailover to remote failed", e);
            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.PUT_TO_REMOTE_BROKER_FAIL, (AppendMessageResult) null, true));
        }
    }

    private String getProducerGroup(MessageExtBrokerInner messageExtBrokerInner) {
        if (null == messageExtBrokerInner) {
            return this.innerProducerGroupName;
        }
        String property = messageExtBrokerInner.getProperty("PGROUP");
        if (StringUtils.isEmpty(property)) {
            property = this.innerProducerGroupName;
        }
        return property;
    }

    public PutMessageResult putMessageToSpecificQueue(MessageExtBrokerInner messageExtBrokerInner) {
        BrokerController peekMasterBroker = this.brokerController.peekMasterBroker();
        if (peekMasterBroker != null) {
            return peekMasterBroker.getMessageStore().putMessage(messageExtBrokerInner);
        }
        if (!this.brokerController.getBrokerConfig().isEnableSlaveActingMaster() || !this.brokerController.getBrokerConfig().isEnableRemoteEscape()) {
            LOG.warn("Put message to specific queue failed, enableSlaveActingMaster={}, enableRemoteEscape={}.", Boolean.valueOf(this.brokerController.getBrokerConfig().isEnableSlaveActingMaster()), Boolean.valueOf(this.brokerController.getBrokerConfig().isEnableRemoteEscape()));
            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, (AppendMessageResult) null);
        }
        try {
            messageExtBrokerInner.setWaitStoreMsgOK(false);
            List messageQueueList = this.brokerController.getTopicRouteInfoManager().tryToFindTopicPublishInfo(messageExtBrokerInner.getTopic()).getMessageQueueList();
            if (null == messageQueueList || messageQueueList.isEmpty()) {
                return new PutMessageResult(PutMessageStatus.PUT_TO_REMOTE_BROKER_FAIL, (AppendMessageResult) null, true);
            }
            MessageQueue messageQueue = (MessageQueue) messageQueueList.get(Math.floorMod((messageExtBrokerInner.getTopic() + messageExtBrokerInner.getStoreHost()).hashCode(), messageQueueList.size()));
            messageExtBrokerInner.setQueueId(messageQueue.getQueueId());
            String brokerName = messageQueue.getBrokerName();
            return transformSendResult2PutResult(this.brokerController.getBrokerOuterAPI().sendMessageToSpecificBroker(this.brokerController.getTopicRouteInfoManager().findBrokerAddressInPublish(brokerName), brokerName, messageExtBrokerInner, getProducerGroup(messageExtBrokerInner), SEND_TIMEOUT));
        } catch (Exception e) {
            LOG.error("sendMessageInFailover to remote failed", e);
            return new PutMessageResult(PutMessageStatus.PUT_TO_REMOTE_BROKER_FAIL, (AppendMessageResult) null, true);
        }
    }

    private PutMessageResult transformSendResult2PutResult(SendResult sendResult) {
        if (sendResult == null) {
            return new PutMessageResult(PutMessageStatus.PUT_TO_REMOTE_BROKER_FAIL, (AppendMessageResult) null, true);
        }
        switch (AnonymousClass1.$SwitchMap$org$apache$rocketmq$client$producer$SendStatus[sendResult.getSendStatus().ordinal()]) {
            case 1:
                return new PutMessageResult(PutMessageStatus.PUT_OK, (AppendMessageResult) null, true);
            case 2:
                return new PutMessageResult(PutMessageStatus.SLAVE_NOT_AVAILABLE, (AppendMessageResult) null, true);
            case 3:
                return new PutMessageResult(PutMessageStatus.FLUSH_DISK_TIMEOUT, (AppendMessageResult) null, true);
            case 4:
                return new PutMessageResult(PutMessageStatus.FLUSH_SLAVE_TIMEOUT, (AppendMessageResult) null, true);
            default:
                return new PutMessageResult(PutMessageStatus.PUT_TO_REMOTE_BROKER_FAIL, (AppendMessageResult) null, true);
        }
    }

    public Pair<GetMessageStatus, MessageExt> getMessage(String str, long j, int i, String str2, boolean z) {
        return getMessageAsync(str, j, i, str2, z).join();
    }

    public CompletableFuture<Pair<GetMessageStatus, MessageExt>> getMessageAsync(String str, long j, int i, String str2, boolean z) {
        MessageStore messageStoreByBrokerName = this.brokerController.getMessageStoreByBrokerName(str2);
        return messageStoreByBrokerName != null ? messageStoreByBrokerName.getMessageAsync(this.innerConsumerGroupName, str, i, j, 1, (MessageFilter) null).thenApply(getMessageResult -> {
            if (getMessageResult == null) {
                LOG.warn("getMessageResult is null , innerConsumerGroupName {}, topic {}, offset {}, queueId {}", new Object[]{this.innerConsumerGroupName, str, Long.valueOf(j), Integer.valueOf(i)});
                return new Pair(GetMessageStatus.MESSAGE_WAS_REMOVING, (Object) null);
            }
            List<MessageExt> decodeMsgList = decodeMsgList(getMessageResult, z);
            if (decodeMsgList != null && !decodeMsgList.isEmpty()) {
                return new Pair(getMessageResult.getStatus(), decodeMsgList.get(0));
            }
            LOG.warn("Can not get msg , topic {}, offset {}, queueId {}, result is {}", new Object[]{str, Long.valueOf(j), Integer.valueOf(i), getMessageResult});
            return new Pair(getMessageResult.getStatus(), (Object) null);
        }) : getMessageFromRemoteAsync(str, j, i, str2).thenApply(messageExt -> {
            return messageExt == null ? new Pair(GetMessageStatus.MESSAGE_WAS_REMOVING, (Object) null) : new Pair(GetMessageStatus.FOUND, messageExt);
        });
    }

    protected List<MessageExt> decodeMsgList(GetMessageResult getMessageResult, boolean z) {
        ArrayList arrayList = new ArrayList();
        try {
            List messageBufferList = getMessageResult.getMessageBufferList();
            if (messageBufferList != null) {
                for (int i = 0; i < messageBufferList.size(); i++) {
                    ByteBuffer byteBuffer = (ByteBuffer) messageBufferList.get(i);
                    if (byteBuffer == null) {
                        LOG.error("bb is null {}", getMessageResult);
                    } else {
                        MessageExt decode = MessageDecoder.decode(byteBuffer, true, z);
                        if (decode == null) {
                            LOG.error("decode msgExt is null {}", getMessageResult);
                        } else {
                            decode.setQueueOffset(((Long) getMessageResult.getMessageQueueOffset().get(i)).longValue());
                            arrayList.add(decode);
                        }
                    }
                }
            }
            return arrayList;
        } finally {
            getMessageResult.release();
        }
    }

    protected MessageExt getMessageFromRemote(String str, long j, int i, String str2) {
        return getMessageFromRemoteAsync(str, j, i, str2).join();
    }

    protected CompletableFuture<MessageExt> getMessageFromRemoteAsync(String str, long j, int i, String str2) {
        try {
            String findBrokerAddressInSubscribe = this.brokerController.getTopicRouteInfoManager().findBrokerAddressInSubscribe(str2, 0L, false);
            if (null == findBrokerAddressInSubscribe) {
                this.brokerController.getTopicRouteInfoManager().updateTopicRouteInfoFromNameServer(str, true, false);
                findBrokerAddressInSubscribe = this.brokerController.getTopicRouteInfoManager().findBrokerAddressInSubscribe(str2, 0L, false);
                if (null == findBrokerAddressInSubscribe) {
                    LOG.warn("can't find broker address for topic {}", str);
                    return CompletableFuture.completedFuture(null);
                }
            }
            return this.brokerController.getBrokerOuterAPI().pullMessageFromSpecificBrokerAsync(str2, findBrokerAddressInSubscribe, this.innerConsumerGroupName, str, i, j, 1, DEFAULT_PULL_TIMEOUT_MILLIS).thenApply(pullResult -> {
                if (!pullResult.getPullStatus().equals(PullStatus.FOUND) || pullResult.getMsgFoundList().isEmpty()) {
                    return null;
                }
                return (MessageExt) pullResult.getMsgFoundList().get(0);
            });
        } catch (Exception e) {
            LOG.error("Get message from remote failed.", e);
            return CompletableFuture.completedFuture(null);
        }
    }
}
