package com.aliyun.openservices.ons.api.impl.rocketmq;

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.Constants;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSUnitUtils;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.PropertyValueConst;
import com.aliyun.openservices.ons.api.batch.BatchConsumer;
import com.aliyun.openservices.ons.api.batch.BatchMessageListener;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.aliyun.openservices.ons.api.spi.DefaultInvocationContext;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.exception.MQClientException;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.UtilAll;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageAccessor;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageConst;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageExt;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.NamespaceUtil;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
import com.aliyun.openservices.shade.org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:com/aliyun/openservices/ons/api/impl/rocketmq/BatchConsumerImpl.class */
public class BatchConsumerImpl extends ONSConsumerAbstract implements BatchConsumer {
    private static final int MAX_BATCH_SIZE = 1024;
    private static final int MIN_BATCH_SIZE = 1;
    private final ConcurrentHashMap<String, BatchMessageListener> subscribeTable;

    /* loaded from: input_file:com/aliyun/openservices/ons/api/impl/rocketmq/BatchConsumerImpl$BatchMessageListenerImpl.class */
    class BatchMessageListenerImpl implements MessageListenerConcurrently {
        private final ConcurrentMap<String, BatchMessageListener> subscribeTable;

        public BatchMessageListenerImpl(ConcurrentMap<String, BatchMessageListener> concurrentMap) {
            this.subscribeTable = concurrentMap;
        }

        @Override // com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            ArrayList arrayList = new ArrayList();
            for (MessageExt messageExt : list) {
                Message msgConvert = ONSUtil.msgConvert(messageExt);
                Map<String, String> properties = messageExt.getProperties();
                msgConvert.setMsgID(messageExt.getMsgId());
                if (properties != null && properties.get(Constants.TRANSACTION_ID) != null) {
                    msgConvert.setMsgID(properties.get(Constants.TRANSACTION_ID));
                }
                arrayList.add(msgConvert);
            }
            BatchMessageListener batchMessageListener = this.subscribeTable.get(((Message) arrayList.get(0)).getTopic());
            if (null == batchMessageListener) {
                throw new ONSClientException("BatchMessageListener is null");
            }
            ConsumeContext consumeContext = new ConsumeContext();
            DefaultInvocationContext defaultInvocationContext = new DefaultInvocationContext();
            defaultInvocationContext.setNamespaceId(BatchConsumerImpl.this.defaultMQPushConsumer.getNamespace());
            defaultInvocationContext.setConsumerGroup(NamespaceUtil.withoutNamespace(BatchConsumerImpl.this.defaultMQPushConsumer.getConsumerGroup()));
            defaultInvocationContext.setMessages(arrayList);
            ArrayList arrayList2 = new ArrayList();
            boolean preHandle = BatchConsumerImpl.this.preHandle(BatchConsumerImpl.this.interceptors, defaultInvocationContext, arrayList2);
            for (int i = 0; i < arrayList.size(); i++) {
                if (StringUtils.isNotBlank(ONSUnitUtils.getMSHAUnitRetry((Message) arrayList.get(i)))) {
                    MessageAccessor.putProperty(list.get(i), MessageConst.PROPERTY_TRANSIENT_MSHA_RETRY, ONSUnitUtils.getMSHAUnitRetry((Message) arrayList.get(i)));
                }
            }
            try {
                if (!preHandle) {
                    if (defaultInvocationContext.getAction().isPresent()) {
                        ConsumeConcurrentlyStatus action2Status = BatchConsumerImpl.this.action2Status(defaultInvocationContext.getAction().get());
                        consumeConcurrentlyContext.setAckIndex(consumeContext.getAcknowledgeIndex());
                        BatchConsumerImpl.this.executePostHandle(arrayList2);
                        return action2Status;
                    }
                    ConsumeConcurrentlyStatus consumeConcurrentlyStatus = ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    consumeConcurrentlyContext.setAckIndex(consumeContext.getAcknowledgeIndex());
                    BatchConsumerImpl.this.executePostHandle(arrayList2);
                    return consumeConcurrentlyStatus;
                }
                Action consume = batchMessageListener.consume(arrayList, consumeContext);
                defaultInvocationContext.setAction(consume);
                if (consume == null) {
                    ConsumeConcurrentlyStatus consumeConcurrentlyStatus2 = ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    consumeConcurrentlyContext.setAckIndex(consumeContext.getAcknowledgeIndex());
                    BatchConsumerImpl.this.executePostHandle(arrayList2);
                    return consumeConcurrentlyStatus2;
                }
                ConsumeConcurrentlyStatus action2Status2 = BatchConsumerImpl.this.action2Status(consume);
                consumeConcurrentlyContext.setAckIndex(consumeContext.getAcknowledgeIndex());
                BatchConsumerImpl.this.executePostHandle(arrayList2);
                return action2Status2;
            } catch (Throwable th) {
                consumeConcurrentlyContext.setAckIndex(consumeContext.getAcknowledgeIndex());
                BatchConsumerImpl.this.executePostHandle(arrayList2);
                throw th;
            }
        }
    }

    public BatchConsumerImpl(Properties properties) {
        super(properties);
        this.subscribeTable = new ConcurrentHashMap<>();
        this.defaultMQPushConsumer.setPostSubscriptionWhenPull(Boolean.parseBoolean(properties.getProperty(PropertyKeyConst.PostSubscriptionWhenPull, "false")));
        this.defaultMQPushConsumer.setMessageModel(MessageModel.valueOf(properties.getProperty(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING)));
        String property = properties.getProperty(PropertyKeyConst.ConsumeMessageBatchMaxSize);
        if (!UtilAll.isBlank(property)) {
            this.defaultMQPushConsumer.setConsumeMessageBatchMaxSize(Math.max(1, Math.min(1024, Integer.valueOf(property).intValue())));
        }
        String property2 = properties.getProperty(PropertyKeyConst.BatchConsumeMaxAwaitDurationInSeconds);
        if (UtilAll.isBlank(property2)) {
            return;
        }
        try {
            this.defaultMQPushConsumer.setMaxBatchConsumeWaitTime(Long.parseLong(property2), TimeUnit.SECONDS);
        } catch (MQClientException e) {
            LOGGER.error("Invalid value for BatchConsumeMaxAwaitDurationInSeconds", (Throwable) e);
        } catch (NumberFormatException e2) {
            LOGGER.error("Number format error", (Throwable) e2);
        }
    }

    @Override // com.aliyun.openservices.ons.api.impl.rocketmq.ONSConsumerAbstract, com.aliyun.openservices.ons.api.impl.rocketmq.ONSClientAbstract, com.aliyun.openservices.ons.api.Admin
    public void start() {
        this.defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) new BatchMessageListenerImpl(this.subscribeTable));
        super.start();
    }

    @Override // com.aliyun.openservices.ons.api.batch.BatchConsumer
    public void subscribe(String str, String str2, BatchMessageListener batchMessageListener) {
        if (null == str) {
            throw new ONSClientException("topic is null");
        }
        if (null == batchMessageListener) {
            throw new ONSClientException("listener is null");
        }
        this.subscribeTable.put(str, batchMessageListener);
        super.subscribe(str, str2);
    }

    @Override // com.aliyun.openservices.ons.api.impl.rocketmq.ONSConsumerAbstract, com.aliyun.openservices.ons.api.batch.BatchConsumer
    public void unsubscribe(String str) {
        if (null != str) {
            this.subscribeTable.remove(str);
            super.unsubscribe(str);
        }
    }
}
