RocketMQ NameServer 核心源码解析

那么我来为你详细讲解“RocketMQ NameServer 核心源码解析”的完整攻略。

1. 概述

在 RocketMQ 中,NameServer 是一个极为重要的组件,它充当了消息路由和负载均衡的角色,主要负责以下三个功能:
1. 维护 Broker 的路由信息
2. 维护 Consumer 的消费信息
3. 维护 Topic 的信息

在这里,我们将介绍 NameServer 的核心源码,并深入分析它的组成结构和关键实现方式。

2. NameServer 核心源码解析

2.1 NameServer 启动过程

在 RocketMQ 中,NameServer 的启动入口类为org.apache.rocketmq.namesrv.NamesrvStartup,其它依赖的类和包括的配置文件都会在该类中运行。在启动过程中,主要涉及到以下几步:
1. 加载 NameServer 配置信息
2. 启动 Netty Server(用于接收 Broker 和 Consumer 的注册请求)
3. 注册 Broker、心跳检测和负载均衡
4. 加载 Topic,处理 MQTT 消息等

2.2 NameServer 的关键类

2.2.1 NameServerController

NameServerController是 NameServer 的核心组件,它主要负责 NameServer 的启停、Broker 和 Consumer 的注册,路由信息的维护等。

2.2.2 RouteInfoManager

NameServer 维护的核心数据结构。RouteInfoManager 中存储了 Broker、Topic、Queue 等路由信息,每个 Broker 对应的发送消息队列、消费消息队列等信息也都在这里。

2.2.3 BrokerHousekeepingService

用于存储和维护 Broker 向 NameServer 注册的路由信息,通过扫描超时未更新路由信息的 Broker,进行清理和剔除。

2.3 NameServer 的关键流程

2.3.1 NameServer 注册流程

当 Broker 或 Consumer 将自己注册到 NameServer 上时,NameServer 会进行相应的处理,将相关路由信息存储到 RouteInfoManager 中。当 Broker 或 Consumer 下线时,NameServer 也会实时更新相关路由信息。

2.3.2 负载均衡

当 Broker 启动时,会向 NameServer 定时发送心跳包,以便 NameServer 及时更新 Broker 负载均衡信息。NameServer 会维护当前所有 Broker 中存储的队列信息,以便在消息发送的时候进行优化路由计算。

2.4 示例说明

2.4.1 注册 Broker

当 Broker 启动时,会通过 Netty 向 NameServer 发送注册请求。源码如下:

public static void registerBrokerAll(final String clusterName, final String brokerAddr, final String brokerName, final long brokerId,
        final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final Channel channel) {
    try {
        // 获取 NameServer 控制器
        NamesrvController controller = NamesrvController.getNamesrvController();

        // 获取路由信息管理器
        RouteInfoManager routeInfoManager = controller.getRouteInfoManager();

        // 处理 Broker 注册逻辑
        BrokerData brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());
        ConcurrentHashMap<String, List<String>> brokerAddrsMap = routeInfoManager.getBrokerAddrTable();

        // 如果 Broker 不存在,则添加
        List<String> brokerNames = brokerAddrsMap.get(brokerAddr);
        if (null == brokerNames) {
            brokerNames = new LinkedList<>();
            brokerAddrsMap.put(brokerAddr, brokerNames);
        }
        if (!brokerNames.contains(brokerName)) {
            brokerNames.add(brokerName);
        }

        // 更新 BrokerInfo
        BrokerInfo brokerInfo = new BrokerInfo(
            brokerId,
            System.currentTimeMillis(),
            channel,
            filterServerList,
            topicConfigWrapper);

        brokerInfo = routeInfoManager.registerBroker(brokerAddr, brokerName, brokerInfo);

        // 发送注册响应
        channel.writeAndFlush(buildRegisterBrokerResponse(brokerInfo));
    } catch (Exception e) {
        log.error("registerBrokerAll Exception", e);
    }
}

2.4.2 负载均衡

当 Broker 启动时,会定时向 NameServer 发送心跳包,以便 NameServer 维护其当前的负载均衡数据。在心跳到达 NameServer 后,NameServer 会通过计算算法分配发送消息和消费消息时的路由信息。具体处理过程如下:

public List<MessageQueue> fetchSubscribeMessageQueues(String consumerGroup, Set<String> topics) {
    // 获取所有的 Topic
    Set<String> topicSet = this.routeInfoManager.getTopicList();
    topicSet = (null == topics || topics.isEmpty()) ? topicSet : topics;

    // 按照 Broker 进行消息队列信息分组
    Map<String/* brokerAddr */, List<MessageQueue>> brokerSubscribeInfoTable = new HashMap<>(16);

    for (String topic : topicSet) {
        try {
            // 查找 Topic 绑定的所有消费者信息
            TopicRouteData topicRouteData = this.routeInfoManager.pickupTopicRouteData(topic);
            if (null == topicRouteData || null == topicRouteData.getBrokerDataList() || topicRouteData.getBrokerDataList().isEmpty()) {
                continue;
            }

            // 遍历所有的 Broker
            for (BrokerData brokerData : topicRouteData.getBrokerDataList()) {
                String addr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
                if (addr != null) {
                    // 根据地址获得所有的消费者列表
                    List<String> groupList = brokerData.getBrokerAddrs().get(MixAll.RETRY_GROUP_TOPIC_PREFIX + consumerGroup);
                    if (groupList == null || groupList.isEmpty()) {
                        groupList = brokerData.getBrokerAddrs().get(consumerGroup);
                    }
                    if (groupList != null && !groupList.isEmpty()) {
                        // 遍历所有的消费者,计算消费队列路由信息
                        for (String group : groupList) {
                            ConsumerData consumerData = this.routeInfoManager.getConsumerData(consumerGroup, group);
                            if (consumerData == null) {
                                log.error(
                                    "[NOTIFYME] consumer group: {} consumer: {} not online",
                                    consumerGroup,
                                    group);
                                continue;
                            }

                            // 计算消费者订阅的队列,返回队列列表
                            List<MessageQueue> mqSet = this.allocateMessageQueueStrategy.allocate(
                                consumerGroup,
                                addr,
                                consumerData.getSubscription(),
                                this.topicQueueTable.get(topic));
                            if (!mqSet.isEmpty()) {
                                List<MessageQueue> prev = brokerSubscribeInfoTable.putIfAbsent(addr, mqSet);
                                if (prev != null) {
                                    prev.addAll(mqSet);
                                }
                            }
                        }
                    }
                }
            }
        } catch (Throwable ignored) {
        }
    }

    List<MessageQueue> result = new ArrayList<>();
    for (List<MessageQueue> mqs : brokerSubscribeInfoTable.values()) {
        result.addAll(mqs);
    }
    // 按照消息队列顺序排序
    Collections.sort(result);
    return result;
}

以上就是关于“RocketMQ NameServer 核心源码解析”的完整攻略,同时我还提供了两个示例以便更好地帮助你理解。如果还有其它问题,欢迎继续提问哦。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:RocketMQ NameServer 核心源码解析 - Python技术站

(0)
上一篇 2023年6月16日
下一篇 2023年6月16日

相关文章

  • Java正则多字符串匹配替换

    下面是Java正则多字符串匹配替换的完整攻略: 什么是Java正则匹配? Java正则匹配是Java语言内置的一种文本匹配模式,其使用正则表达式对指定的文本进行匹配和查找。正则表达式由普通字符及通配符组成,用于确定文本模式。 可以使用Java的 java.util.regex 包中的类 Pattern 和 Matcher 来进行正则匹配。 如何进行多字符串匹…

    Java 2023年5月27日
    00
  • java 获取项目文件路径实现方法

    当我们编写 Java 项目时,有时需要获取项目文件所在的路径。这里,我们介绍两种获取 Java 项目文件路径的方法。 方法一:使用 System.getProperty() 方法 Java 提供了一个 System.getProperty() 方法,它可以返回 Java 运行环境中的属性信息,其中包括“user.dir”属性,表示用户当前的工作目录。在项目运…

    Java 2023年5月31日
    00
  • Spring系列中的beanFactory与ApplicationContext

    当提到Spring框架的IoC容器时,很容易想到beanFactory和ApplicationContext,这两者都属于Spring框架中IoC容器的范畴。本篇文章将详细讲解beanFactory和ApplicationContext的特点,优缺点以及使用场景。 BeanFactory BeanFactory是Spring框架最基本的IoC容器,提供了一种…

    Java 2023年5月19日
    00
  • Sprint Boot @TransactionalEventListener使用方法详解

    在Spring Boot中,@TransactionalEventListener注解用于在事务提交后异步地处理事件。使用@TransactionalEventListener注解可以确保事件处理程序在事务提交后才会执行,从而避免了在事务未提交时处理事件可能导致的问题。本文将详细介绍@TransactionalEventListener注解的作用和使用方法,…

    Java 2023年5月5日
    00
  • Spring Boot 2.0多数据源配置方法实例详解

    Spring Boot 2.0多数据源配置方法实例详解 基础知识 在进行本文的阅读前,读者需要掌握以下知识: Spring Boot 2.0框架基础 数据源的概念和用法 Spring Boot在多数据源方面的优势和实现方式 实现过程 在多数据源的配置中,我们需要主要的是多个数据源的定义和配置。接下来,我们将给出两条实例来帮助读者更好的理解多数据源的配置。 步…

    Java 2023年5月20日
    00
  • JSP errorPage设置方法

    当在JSP页面中发生错误的时候,可以通过errorPage设置指定的错误页面来处理异常,下面是JSP errorPage设置方法的完整攻略。 1. errorPage设置方法介绍 在JSP页面中设置错误页面有两种方式,分别是: 通过page指令设置errorPage属性; 在web.xml文件中配置<error-page>元素。 1.1 通过pa…

    Java 2023年6月15日
    00
  • Java中Arraylist的最大长度

    Java中ArrayList的最大长度 简介 ArrayList是Java中非常常用的数据结构,它是可变长度的数组。ArrayList最大长度由内存大小决定。当数组长度大于内存大小时,便会抛出OutOfMemoryError异常。 ArrayList的初始化长度 初始化ArrayList时可以指定其大小,如下所示: ArrayList<String&g…

    Java 2023年5月26日
    00
  • spring mvc路径匹配原则详解

    Spring MVC 路径匹配原则详解 Spring MVC 是一种基于 Servlet 的 MVC 框架,用于创建 Java Web 应用程序。 在 Spring MVC 中,请求的 URL 将被映射到具体的控制器类和方法,这种映射是通过使用 URL Path Pattern(路径模式)实现的。路径模式指定了请求路径的规则,这些规则用于将请求映射到具体的处…

    Java 2023年5月16日
    00
合作推广
合作推广
分享本页
返回顶部