package org.apache.rocketmq.broker.metrics;

import com.google.common.base.Splitter;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.LongHistogram;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.ObservableLongGauge;
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter;
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder;
import io.opentelemetry.exporter.prometheus.PrometheusHttpServer;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.InstrumentSelector;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder;
import io.opentelemetry.sdk.metrics.View;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
import io.opentelemetry.sdk.resources.Resource;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.metrics.ConsumerLagCalculator;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.metrics.NopLongCounter;
import org.apache.rocketmq.common.metrics.NopLongHistogram;
import org.apache.rocketmq.common.metrics.NopObservableLongGauge;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.metrics.RemotingMetricsManager;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.store.MessageStore;

/* loaded from: input_file:org/apache/rocketmq/broker/metrics/BrokerMetricsManager.class */
public class BrokerMetricsManager {
    private final BrokerConfig brokerConfig;
    private final MessageStore messageStore;
    private final BrokerController brokerController;
    private final ConsumerLagCalculator consumerLagCalculator;
    private OtlpGrpcMetricExporter metricExporter;
    private PeriodicMetricReader periodicMetricReader;
    private PrometheusHttpServer prometheusHttpServer;
    private Meter brokerMeter;
    private static final Logger LOGGER = LoggerFactory.getLogger("RocketmqBroker");
    private static final Map<String, String> LABEL_MAP = new HashMap();
    public static ObservableLongGauge processorWatermark = new NopObservableLongGauge();
    public static ObservableLongGauge brokerPermission = new NopObservableLongGauge();
    public static LongCounter messagesInTotal = new NopLongCounter();
    public static LongCounter messagesOutTotal = new NopLongCounter();
    public static LongCounter throughputInTotal = new NopLongCounter();
    public static LongCounter throughputOutTotal = new NopLongCounter();
    public static LongHistogram messageSize = new NopLongHistogram();
    public static ObservableLongGauge producerConnection = new NopObservableLongGauge();
    public static ObservableLongGauge consumerConnection = new NopObservableLongGauge();
    public static ObservableLongGauge consumerLagMessages = new NopObservableLongGauge();
    public static ObservableLongGauge consumerLagLatency = new NopObservableLongGauge();
    public static ObservableLongGauge consumerInflightMessages = new NopObservableLongGauge();
    public static ObservableLongGauge consumerQueueingLatency = new NopObservableLongGauge();
    public static ObservableLongGauge consumerReadyMessages = new NopObservableLongGauge();
    public static LongCounter sendToDlqMessages = new NopLongCounter();
    public static final List<String> SYSTEM_GROUP_PREFIX_LIST = new ArrayList<String>() { // from class: org.apache.rocketmq.broker.metrics.BrokerMetricsManager.1
        {
            add("CID_RMQ_SYS_".toLowerCase());
        }
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.rocketmq.broker.metrics.BrokerMetricsManager$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/rocketmq/broker/metrics/BrokerMetricsManager$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$rocketmq$common$BrokerConfig$MetricsExporterType = new int[BrokerConfig.MetricsExporterType.values().length];

        static {
            try {
                $SwitchMap$org$apache$rocketmq$common$BrokerConfig$MetricsExporterType[BrokerConfig.MetricsExporterType.OTLP_GRPC.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$common$BrokerConfig$MetricsExporterType[BrokerConfig.MetricsExporterType.PROM.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public BrokerMetricsManager(BrokerController brokerController) {
        this.brokerController = brokerController;
        this.brokerConfig = brokerController.getBrokerConfig();
        this.messageStore = brokerController.getMessageStore();
        this.consumerLagCalculator = new ConsumerLagCalculator(brokerController);
        init();
    }

    public static AttributesBuilder newAttributesBuilder() {
        AttributesBuilder builder = Attributes.builder();
        Map<String, String> map = LABEL_MAP;
        builder.getClass();
        map.forEach(builder::put);
        return builder;
    }

    private Attributes buildLagAttributes(ConsumerLagCalculator.BaseCalculateResult baseCalculateResult) {
        AttributesBuilder newAttributesBuilder = newAttributesBuilder();
        newAttributesBuilder.put(BrokerMetricsConstant.LABEL_CONSUMER_GROUP, baseCalculateResult.group);
        newAttributesBuilder.put(BrokerMetricsConstant.LABEL_TOPIC, baseCalculateResult.topic);
        newAttributesBuilder.put(BrokerMetricsConstant.LABEL_IS_RETRY, baseCalculateResult.isRetry);
        newAttributesBuilder.put(BrokerMetricsConstant.LABEL_IS_SYSTEM, isSystem(baseCalculateResult.topic, baseCalculateResult.group));
        return newAttributesBuilder.build();
    }

    public static boolean isRetryOrDlqTopic(String str) {
        if (StringUtils.isBlank(str)) {
            return false;
        }
        return str.startsWith("%RETRY%") || str.startsWith("%DLQ%");
    }

    public static boolean isSystemGroup(String str) {
        if (StringUtils.isBlank(str)) {
            return false;
        }
        String lowerCase = str.toLowerCase();
        Iterator<String> it = SYSTEM_GROUP_PREFIX_LIST.iterator();
        while (it.hasNext()) {
            if (lowerCase.startsWith(it.next())) {
                return true;
            }
        }
        return false;
    }

    public static boolean isSystem(String str, String str2) {
        return TopicValidator.isSystemTopic(str) || isSystemGroup(str2);
    }

    public static TopicMessageType getMessageType(SendMessageRequestHeader sendMessageRequestHeader) {
        Map string2messageProperties = MessageDecoder.string2messageProperties(sendMessageRequestHeader.getProperties());
        String str = (String) string2messageProperties.get("TRAN_MSG");
        TopicMessageType topicMessageType = TopicMessageType.NORMAL;
        if (Boolean.parseBoolean(str)) {
            topicMessageType = TopicMessageType.TRANSACTION;
        } else if (string2messageProperties.containsKey("__SHARDINGKEY")) {
            topicMessageType = TopicMessageType.FIFO;
        } else if (string2messageProperties.get("__STARTDELIVERTIME") != null || string2messageProperties.get("DELAY") != null || string2messageProperties.get("TIMER_DELIVER_MS") != null || string2messageProperties.get("TIMER_DELAY_SEC") != null) {
            topicMessageType = TopicMessageType.DELAY;
        }
        return topicMessageType;
    }

    public Meter getBrokerMeter() {
        return this.brokerMeter;
    }

    private boolean checkConfig() {
        if (this.brokerConfig == null) {
            return false;
        }
        BrokerConfig.MetricsExporterType metricsExporterType = this.brokerConfig.getMetricsExporterType();
        if (!metricsExporterType.isEnable()) {
            return false;
        }
        switch (AnonymousClass2.$SwitchMap$org$apache$rocketmq$common$BrokerConfig$MetricsExporterType[metricsExporterType.ordinal()]) {
            case 1:
                return StringUtils.isNotBlank(this.brokerConfig.getMetricsGrpcExporterTarget());
            case 2:
                return true;
            default:
                return false;
        }
    }

    private void init() {
        BrokerConfig.MetricsExporterType metricsExporterType = this.brokerConfig.getMetricsExporterType();
        if (metricsExporterType == BrokerConfig.MetricsExporterType.DISABLE) {
            return;
        }
        if (!checkConfig()) {
            LOGGER.error("check metrics config failed, will not export metrics");
            return;
        }
        String metricsLabel = this.brokerConfig.getMetricsLabel();
        if (StringUtils.isNotBlank(metricsLabel)) {
            Iterator it = Splitter.on(',').omitEmptyStrings().splitToList(metricsLabel).iterator();
            while (it.hasNext()) {
                String[] split = ((String) it.next()).split(":");
                if (split.length != 2) {
                    LOGGER.warn("metricsLabel is not valid: {}", metricsLabel);
                } else {
                    LABEL_MAP.put(split[0], split[1]);
                }
            }
        }
        if (this.brokerConfig.isMetricsInDelta()) {
            LABEL_MAP.put(BrokerMetricsConstant.LABEL_AGGREGATION, BrokerMetricsConstant.AGGREGATION_DELTA);
        }
        LABEL_MAP.put(BrokerMetricsConstant.LABEL_NODE_TYPE, BrokerMetricsConstant.NODE_TYPE_BROKER);
        LABEL_MAP.put(BrokerMetricsConstant.LABEL_CLUSTER_NAME, this.brokerConfig.getBrokerClusterName());
        LABEL_MAP.put(BrokerMetricsConstant.LABEL_NODE_ID, this.brokerConfig.getBrokerName());
        SdkMeterProviderBuilder resource = SdkMeterProvider.builder().setResource(Resource.empty());
        if (metricsExporterType == BrokerConfig.MetricsExporterType.OTLP_GRPC) {
            String metricsGrpcExporterTarget = this.brokerConfig.getMetricsGrpcExporterTarget();
            if (!metricsGrpcExporterTarget.startsWith("http")) {
                metricsGrpcExporterTarget = "https://" + metricsGrpcExporterTarget;
            }
            OtlpGrpcMetricExporterBuilder aggregationTemporalitySelector = OtlpGrpcMetricExporter.builder().setEndpoint(metricsGrpcExporterTarget).setTimeout(this.brokerConfig.getMetricGrpcExporterTimeOutInMills(), TimeUnit.MILLISECONDS).setAggregationTemporalitySelector(instrumentType -> {
                return (this.brokerConfig.isMetricsInDelta() && (instrumentType == InstrumentType.COUNTER || instrumentType == InstrumentType.OBSERVABLE_COUNTER || instrumentType == InstrumentType.HISTOGRAM)) ? AggregationTemporality.DELTA : AggregationTemporality.CUMULATIVE;
            });
            String metricsGrpcExporterHeader = this.brokerConfig.getMetricsGrpcExporterHeader();
            if (StringUtils.isNotBlank(metricsGrpcExporterHeader)) {
                HashMap hashMap = new HashMap();
                Iterator it2 = Splitter.on(',').omitEmptyStrings().splitToList(metricsGrpcExporterHeader).iterator();
                while (it2.hasNext()) {
                    String[] split2 = ((String) it2.next()).split(":");
                    if (split2.length != 2) {
                        LOGGER.warn("metricsGrpcExporterHeader is not valid: {}", metricsGrpcExporterHeader);
                    } else {
                        hashMap.put(split2[0], split2[1]);
                    }
                }
                aggregationTemporalitySelector.getClass();
                hashMap.forEach(aggregationTemporalitySelector::addHeader);
            }
            this.metricExporter = aggregationTemporalitySelector.build();
            this.periodicMetricReader = PeriodicMetricReader.builder(this.metricExporter).setInterval(this.brokerConfig.getMetricGrpcExporterIntervalInMills(), TimeUnit.MILLISECONDS).build();
            resource.registerMetricReader(this.periodicMetricReader);
        }
        if (metricsExporterType == BrokerConfig.MetricsExporterType.PROM) {
            String metricsPromExporterHost = this.brokerConfig.getMetricsPromExporterHost();
            if (StringUtils.isBlank(metricsPromExporterHost)) {
                metricsPromExporterHost = this.brokerConfig.getBrokerIP1();
            }
            this.prometheusHttpServer = PrometheusHttpServer.builder().setHost(metricsPromExporterHost).setPort(this.brokerConfig.getMetricsPromExporterPort()).build();
            resource.registerMetricReader(this.prometheusHttpServer);
        }
        registerMetricsView(resource);
        this.brokerMeter = OpenTelemetrySdk.builder().setMeterProvider(resource.build()).build().getMeter(BrokerMetricsConstant.OPEN_TELEMETRY_METER_NAME);
        initStatsMetrics();
        initRequestMetrics();
        initConnectionMetrics();
        initLagAndDlqMetrics();
        initOtherMetrics();
    }

    private void registerMetricsView(SdkMeterProviderBuilder sdkMeterProviderBuilder) {
        sdkMeterProviderBuilder.registerView(InstrumentSelector.builder().setType(InstrumentType.HISTOGRAM).setName(BrokerMetricsConstant.HISTOGRAM_MESSAGE_SIZE).build(), View.builder().setAggregation(Aggregation.explicitBucketHistogram(Arrays.asList(Double.valueOf(1024.0d), Double.valueOf(4096.0d), Double.valueOf(524288.0d), Double.valueOf(1048576.0d), Double.valueOf(2097152.0d), Double.valueOf(4194304.0d)))).build());
        for (Pair pair : RemotingMetricsManager.getMetricsView()) {
            sdkMeterProviderBuilder.registerView((InstrumentSelector) pair.getObject1(), (View) pair.getObject2());
        }
        for (Pair pair2 : this.messageStore.getMetricsView()) {
            sdkMeterProviderBuilder.registerView((InstrumentSelector) pair2.getObject1(), (View) pair2.getObject2());
        }
        for (Pair<InstrumentSelector, View> pair3 : PopMetricsManager.getMetricsView()) {
            sdkMeterProviderBuilder.registerView((InstrumentSelector) pair3.getObject1(), (View) pair3.getObject2());
        }
    }

    private void initStatsMetrics() {
        processorWatermark = this.brokerMeter.gaugeBuilder(BrokerMetricsConstant.GAUGE_PROCESSOR_WATERMARK).setDescription("Request processor watermark").ofLongs().buildWithCallback(observableLongMeasurement -> {
            observableLongMeasurement.record(this.brokerController.getSendThreadPoolQueue().size(), newAttributesBuilder().put(BrokerMetricsConstant.LABEL_PROCESSOR, "send").build());
            observableLongMeasurement.record(this.brokerController.getAsyncPutThreadPoolQueue().size(), newAttributesBuilder().put(BrokerMetricsConstant.LABEL_PROCESSOR, "async_put").build());
            observableLongMeasurement.record(this.brokerController.getPullThreadPoolQueue().size(), newAttributesBuilder().put(BrokerMetricsConstant.LABEL_PROCESSOR, "pull").build());
            observableLongMeasurement.record(this.brokerController.getAckThreadPoolQueue().size(), newAttributesBuilder().put(BrokerMetricsConstant.LABEL_PROCESSOR, "ack").build());
            observableLongMeasurement.record(this.brokerController.getQueryThreadPoolQueue().size(), newAttributesBuilder().put(BrokerMetricsConstant.LABEL_PROCESSOR, "query_message").build());
            observableLongMeasurement.record(this.brokerController.getClientManagerThreadPoolQueue().size(), newAttributesBuilder().put(BrokerMetricsConstant.LABEL_PROCESSOR, "client_manager").build());
            observableLongMeasurement.record(this.brokerController.getHeartbeatThreadPoolQueue().size(), newAttributesBuilder().put(BrokerMetricsConstant.LABEL_PROCESSOR, "heartbeat").build());
            observableLongMeasurement.record(this.brokerController.getLitePullThreadPoolQueue().size(), newAttributesBuilder().put(BrokerMetricsConstant.LABEL_PROCESSOR, "lite_pull").build());
            observableLongMeasurement.record(this.brokerController.getEndTransactionThreadPoolQueue().size(), newAttributesBuilder().put(BrokerMetricsConstant.LABEL_PROCESSOR, "transaction").build());
            observableLongMeasurement.record(this.brokerController.getConsumerManagerThreadPoolQueue().size(), newAttributesBuilder().put(BrokerMetricsConstant.LABEL_PROCESSOR, "consumer_manager").build());
            observableLongMeasurement.record(this.brokerController.getAdminBrokerThreadPoolQueue().size(), newAttributesBuilder().put(BrokerMetricsConstant.LABEL_PROCESSOR, "admin").build());
            observableLongMeasurement.record(this.brokerController.getReplyThreadPoolQueue().size(), newAttributesBuilder().put(BrokerMetricsConstant.LABEL_PROCESSOR, "reply").build());
        });
        brokerPermission = this.brokerMeter.gaugeBuilder(BrokerMetricsConstant.GAUGE_BROKER_PERMISSION).setDescription("Broker permission").ofLongs().buildWithCallback(observableLongMeasurement2 -> {
            observableLongMeasurement2.record(this.brokerConfig.getBrokerPermission(), newAttributesBuilder().build());
        });
    }

    private void initRequestMetrics() {
        messagesInTotal = this.brokerMeter.counterBuilder(BrokerMetricsConstant.COUNTER_MESSAGES_IN_TOTAL).setDescription("Total number of incoming messages").build();
        messagesOutTotal = this.brokerMeter.counterBuilder(BrokerMetricsConstant.COUNTER_MESSAGES_OUT_TOTAL).setDescription("Total number of outgoing messages").build();
        throughputInTotal = this.brokerMeter.counterBuilder(BrokerMetricsConstant.COUNTER_THROUGHPUT_IN_TOTAL).setDescription("Total traffic of incoming messages").build();
        throughputOutTotal = this.brokerMeter.counterBuilder(BrokerMetricsConstant.COUNTER_THROUGHPUT_OUT_TOTAL).setDescription("Total traffic of outgoing messages").build();
        messageSize = this.brokerMeter.histogramBuilder(BrokerMetricsConstant.HISTOGRAM_MESSAGE_SIZE).setDescription("Incoming messages size").ofLongs().build();
    }

    private void initConnectionMetrics() {
        producerConnection = this.brokerMeter.gaugeBuilder(BrokerMetricsConstant.GAUGE_PRODUCER_CONNECTIONS).setDescription("Producer connections").ofLongs().buildWithCallback(observableLongMeasurement -> {
            HashMap hashMap = new HashMap();
            this.brokerController.getProducerManager().getGroupChannelTable().values().stream().flatMap(concurrentHashMap -> {
                return concurrentHashMap.values().stream();
            }).forEach(clientChannelInfo -> {
                ProducerAttr producerAttr = new ProducerAttr(clientChannelInfo.getLanguage(), clientChannelInfo.getVersion());
                hashMap.put(producerAttr, Integer.valueOf(((Integer) hashMap.computeIfAbsent(producerAttr, producerAttr2 -> {
                    return 0;
                })).intValue() + 1));
            });
            hashMap.forEach((producerAttr, num) -> {
                observableLongMeasurement.record(num.intValue(), newAttributesBuilder().put(BrokerMetricsConstant.LABEL_LANGUAGE, producerAttr.language.name().toLowerCase()).put(BrokerMetricsConstant.LABEL_VERSION, MQVersion.getVersionDesc(producerAttr.version).toLowerCase()).put("protocol_type", "remoting").build());
            });
        });
        consumerConnection = this.brokerMeter.gaugeBuilder(BrokerMetricsConstant.GAUGE_CONSUMER_CONNECTIONS).setDescription("Consumer connections").ofLongs().buildWithCallback(observableLongMeasurement2 -> {
            HashMap hashMap = new HashMap();
            this.brokerController.getConsumerManager().getConsumerTable().forEach((str, consumerGroupInfo) -> {
                if (consumerGroupInfo != null) {
                    consumerGroupInfo.getChannelInfoTable().values().forEach(clientChannelInfo -> {
                        ConsumerAttr consumerAttr = new ConsumerAttr(str, clientChannelInfo.getLanguage(), clientChannelInfo.getVersion(), consumerGroupInfo.getConsumeType());
                        hashMap.put(consumerAttr, Integer.valueOf(((Integer) hashMap.computeIfAbsent(consumerAttr, consumerAttr2 -> {
                            return 0;
                        })).intValue() + 1));
                    });
                }
            });
            hashMap.forEach((consumerAttr, num) -> {
                observableLongMeasurement2.record(num.intValue(), newAttributesBuilder().put(BrokerMetricsConstant.LABEL_CONSUMER_GROUP, consumerAttr.group).put(BrokerMetricsConstant.LABEL_LANGUAGE, consumerAttr.language.name().toLowerCase()).put(BrokerMetricsConstant.LABEL_VERSION, MQVersion.getVersionDesc(consumerAttr.version).toLowerCase()).put(BrokerMetricsConstant.LABEL_CONSUME_MODE, consumerAttr.consumeMode.getTypeCN().toLowerCase()).put("protocol_type", "remoting").put(BrokerMetricsConstant.LABEL_IS_SYSTEM, isSystemGroup(consumerAttr.group)).build());
            });
        });
    }

    private void initLagAndDlqMetrics() {
        consumerLagMessages = this.brokerMeter.gaugeBuilder(BrokerMetricsConstant.GAUGE_CONSUMER_LAG_MESSAGES).setDescription("Consumer lag messages").ofLongs().buildWithCallback(observableLongMeasurement -> {
            this.consumerLagCalculator.calculateLag(calculateLagResult -> {
                observableLongMeasurement.record(calculateLagResult.lag, buildLagAttributes(calculateLagResult));
            });
        });
        consumerLagLatency = this.brokerMeter.gaugeBuilder(BrokerMetricsConstant.GAUGE_CONSUMER_LAG_LATENCY).setDescription("Consumer lag time").setUnit("milliseconds").ofLongs().buildWithCallback(observableLongMeasurement2 -> {
            this.consumerLagCalculator.calculateLag(calculateLagResult -> {
                long j = 0;
                long currentTimeMillis = System.currentTimeMillis();
                if (calculateLagResult.earliestUnconsumedTimestamp != 0) {
                    j = currentTimeMillis - calculateLagResult.earliestUnconsumedTimestamp;
                }
                observableLongMeasurement2.record(j, buildLagAttributes(calculateLagResult));
            });
        });
        consumerInflightMessages = this.brokerMeter.gaugeBuilder(BrokerMetricsConstant.GAUGE_CONSUMER_INFLIGHT_MESSAGES).setDescription("Consumer inflight messages").ofLongs().buildWithCallback(observableLongMeasurement3 -> {
            this.consumerLagCalculator.calculateInflight(calculateInflightResult -> {
                observableLongMeasurement3.record(calculateInflightResult.inFlight, buildLagAttributes(calculateInflightResult));
            });
        });
        consumerQueueingLatency = this.brokerMeter.gaugeBuilder(BrokerMetricsConstant.GAUGE_CONSUMER_QUEUEING_LATENCY).setDescription("Consumer queueing time").setUnit("milliseconds").ofLongs().buildWithCallback(observableLongMeasurement4 -> {
            this.consumerLagCalculator.calculateInflight(calculateInflightResult -> {
                long j = 0;
                long currentTimeMillis = System.currentTimeMillis();
                if (calculateInflightResult.earliestUnPulledTimestamp != 0) {
                    j = currentTimeMillis - calculateInflightResult.earliestUnPulledTimestamp;
                }
                observableLongMeasurement4.record(j, buildLagAttributes(calculateInflightResult));
            });
        });
        consumerReadyMessages = this.brokerMeter.gaugeBuilder(BrokerMetricsConstant.GAUGE_CONSUMER_READY_MESSAGES).setDescription("Consumer ready messages").ofLongs().buildWithCallback(observableLongMeasurement5 -> {
            this.consumerLagCalculator.calculateAvailable(calculateAvailableResult -> {
                observableLongMeasurement5.record(calculateAvailableResult.available, buildLagAttributes(calculateAvailableResult));
            });
        });
        sendToDlqMessages = this.brokerMeter.counterBuilder(BrokerMetricsConstant.COUNTER_CONSUMER_SEND_TO_DLQ_MESSAGES_TOTAL).setDescription("Consumer send to DLQ messages").build();
    }

    private void initOtherMetrics() {
        RemotingMetricsManager.initMetrics(this.brokerMeter, BrokerMetricsManager::newAttributesBuilder);
        this.messageStore.initMetrics(this.brokerMeter, BrokerMetricsManager::newAttributesBuilder);
        PopMetricsManager.initMetrics(this.brokerMeter, this.brokerController, BrokerMetricsManager::newAttributesBuilder);
    }

    public void shutdown() {
        if (this.brokerConfig.getMetricsExporterType() == BrokerConfig.MetricsExporterType.OTLP_GRPC) {
            this.periodicMetricReader.forceFlush();
            this.periodicMetricReader.shutdown();
            this.metricExporter.shutdown();
        }
        if (this.brokerConfig.getMetricsExporterType() == BrokerConfig.MetricsExporterType.PROM) {
            this.prometheusHttpServer.forceFlush();
            this.prometheusHttpServer.shutdown();
        }
    }
}
