RocketMQ NameServer 核心源码解析

那么我来为你详细讲解“RocketMQ NameServer 核心源码解析”的完整攻略。

1. 概述

在 RocketMQ 中,NameServer 是一个极为重要的组件,它充当了消息路由和负载均衡的角色,主要负责以下三个功能:
1. 维护 Broker 的路由信息
2. 维护 Consumer 的消费信息
3. 维护 Topic 的信息

在这里,我们将介绍 NameServer 的核心源码,并深入分析它的组成结构和关键实现方式。

2. NameServer 核心源码解析

2.1 NameServer 启动过程

在 RocketMQ 中,NameServer 的启动入口类为org.apache.rocketmq.namesrv.NamesrvStartup,其它依赖的类和包括的配置文件都会在该类中运行。在启动过程中,主要涉及到以下几步:
1. 加载 NameServer 配置信息
2. 启动 Netty Server(用于接收 Broker 和 Consumer 的注册请求)
3. 注册 Broker、心跳检测和负载均衡
4. 加载 Topic,处理 MQTT 消息等

2.2 NameServer 的关键类

2.2.1 NameServerController

NameServerController是 NameServer 的核心组件,它主要负责 NameServer 的启停、Broker 和 Consumer 的注册,路由信息的维护等。

2.2.2 RouteInfoManager

NameServer 维护的核心数据结构。RouteInfoManager 中存储了 Broker、Topic、Queue 等路由信息,每个 Broker 对应的发送消息队列、消费消息队列等信息也都在这里。

2.2.3 BrokerHousekeepingService

用于存储和维护 Broker 向 NameServer 注册的路由信息,通过扫描超时未更新路由信息的 Broker,进行清理和剔除。

2.3 NameServer 的关键流程

2.3.1 NameServer 注册流程

当 Broker 或 Consumer 将自己注册到 NameServer 上时,NameServer 会进行相应的处理,将相关路由信息存储到 RouteInfoManager 中。当 Broker 或 Consumer 下线时,NameServer 也会实时更新相关路由信息。

2.3.2 负载均衡

当 Broker 启动时,会向 NameServer 定时发送心跳包,以便 NameServer 及时更新 Broker 负载均衡信息。NameServer 会维护当前所有 Broker 中存储的队列信息,以便在消息发送的时候进行优化路由计算。

2.4 示例说明

2.4.1 注册 Broker

当 Broker 启动时,会通过 Netty 向 NameServer 发送注册请求。源码如下:

public static void registerBrokerAll(final String clusterName, final String brokerAddr, final String brokerName, final long brokerId,
        final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final Channel channel) {
    try {
        // 获取 NameServer 控制器
        NamesrvController controller = NamesrvController.getNamesrvController();

        // 获取路由信息管理器
        RouteInfoManager routeInfoManager = controller.getRouteInfoManager();

        // 处理 Broker 注册逻辑
        BrokerData brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());
        ConcurrentHashMap<String, List<String>> brokerAddrsMap = routeInfoManager.getBrokerAddrTable();

        // 如果 Broker 不存在,则添加
        List<String> brokerNames = brokerAddrsMap.get(brokerAddr);
        if (null == brokerNames) {
            brokerNames = new LinkedList<>();
            brokerAddrsMap.put(brokerAddr, brokerNames);
        }
        if (!brokerNames.contains(brokerName)) {
            brokerNames.add(brokerName);
        }

        // 更新 BrokerInfo
        BrokerInfo brokerInfo = new BrokerInfo(
            brokerId,
            System.currentTimeMillis(),
            channel,
            filterServerList,
            topicConfigWrapper);

        brokerInfo = routeInfoManager.registerBroker(brokerAddr, brokerName, brokerInfo);

        // 发送注册响应
        channel.writeAndFlush(buildRegisterBrokerResponse(brokerInfo));
    } catch (Exception e) {
        log.error("registerBrokerAll Exception", e);
    }
}

2.4.2 负载均衡

当 Broker 启动时,会定时向 NameServer 发送心跳包,以便 NameServer 维护其当前的负载均衡数据。在心跳到达 NameServer 后,NameServer 会通过计算算法分配发送消息和消费消息时的路由信息。具体处理过程如下:

public List<MessageQueue> fetchSubscribeMessageQueues(String consumerGroup, Set<String> topics) {
    // 获取所有的 Topic
    Set<String> topicSet = this.routeInfoManager.getTopicList();
    topicSet = (null == topics || topics.isEmpty()) ? topicSet : topics;

    // 按照 Broker 进行消息队列信息分组
    Map<String/* brokerAddr */, List<MessageQueue>> brokerSubscribeInfoTable = new HashMap<>(16);

    for (String topic : topicSet) {
        try {
            // 查找 Topic 绑定的所有消费者信息
            TopicRouteData topicRouteData = this.routeInfoManager.pickupTopicRouteData(topic);
            if (null == topicRouteData || null == topicRouteData.getBrokerDataList() || topicRouteData.getBrokerDataList().isEmpty()) {
                continue;
            }

            // 遍历所有的 Broker
            for (BrokerData brokerData : topicRouteData.getBrokerDataList()) {
                String addr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
                if (addr != null) {
                    // 根据地址获得所有的消费者列表
                    List<String> groupList = brokerData.getBrokerAddrs().get(MixAll.RETRY_GROUP_TOPIC_PREFIX + consumerGroup);
                    if (groupList == null || groupList.isEmpty()) {
                        groupList = brokerData.getBrokerAddrs().get(consumerGroup);
                    }
                    if (groupList != null && !groupList.isEmpty()) {
                        // 遍历所有的消费者,计算消费队列路由信息
                        for (String group : groupList) {
                            ConsumerData consumerData = this.routeInfoManager.getConsumerData(consumerGroup, group);
                            if (consumerData == null) {
                                log.error(
                                    "[NOTIFYME] consumer group: {} consumer: {} not online",
                                    consumerGroup,
                                    group);
                                continue;
                            }

                            // 计算消费者订阅的队列,返回队列列表
                            List<MessageQueue> mqSet = this.allocateMessageQueueStrategy.allocate(
                                consumerGroup,
                                addr,
                                consumerData.getSubscription(),
                                this.topicQueueTable.get(topic));
                            if (!mqSet.isEmpty()) {
                                List<MessageQueue> prev = brokerSubscribeInfoTable.putIfAbsent(addr, mqSet);
                                if (prev != null) {
                                    prev.addAll(mqSet);
                                }
                            }
                        }
                    }
                }
            }
        } catch (Throwable ignored) {
        }
    }

    List<MessageQueue> result = new ArrayList<>();
    for (List<MessageQueue> mqs : brokerSubscribeInfoTable.values()) {
        result.addAll(mqs);
    }
    // 按照消息队列顺序排序
    Collections.sort(result);
    return result;
}

以上就是关于“RocketMQ NameServer 核心源码解析”的完整攻略,同时我还提供了两个示例以便更好地帮助你理解。如果还有其它问题,欢迎继续提问哦。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:RocketMQ NameServer 核心源码解析 - Python技术站

(0)
上一篇 2023年6月16日
下一篇 2023年6月16日

相关文章

  • MyBatis实现简单的数据表分月存储

    让我来为您详细讲解“MyBatis实现简单的数据表分月存储”的完整攻略。 1. 数据表分月存储的设计原理 数据表分月存储其实就是将大量数据按月份分散存储到不同的数据表中,可以有效减小单个数据表的数据量,提高数据访问的效率。对于需要根据时间范围查询数据的应用场景特别适用。 具体实现过程可以通过MyBatis的动态SQL实现。动态SQL可以根据数据表的名称动态生…

    Java 2023年5月20日
    00
  • java利用Ant脚本生成war包全过程

    生成war包是Java Web开发中的重要过程之一。为了优化这个过程,可以使用Ant脚本来自动化这个过程。以下是Java利用Ant脚本生成war包的详细攻略。 1. 创建Ant脚本 首先需要创建一个Ant脚本,脚本需要包含以下几个步骤: 清空目标目录,以准备生成新的war包。 将源代码和依赖库编译成Java字节码。 将字节码打包成war包。 以下是示例Ant…

    Java 2023年5月26日
    00
  • Spring Security拦截器引起Java CORS跨域失败的问题及解决

    Spring Security拦截器引起Java CORS跨域失败的问题及解决 在使用Spring Security进行接口保护的时候,经常会遇到因为跨域问题导致前端无法访问服务器接口的问题。本文将详细介绍Spring Security拦截器引起Java CORS跨域失败的问题及解决。 什么是CORS跨域 CORS(Cross-Origin Resource…

    Java 2023年5月20日
    00
  • JavaWeb Servlet中url-pattern的使用

    当我们开发JavaWeb应用程序时,Servlet是最常用的核心组件之一。而servlet和客户端交互时,url-pattern就是一个非常重要的属性。本篇文章将详细讲解JavaWeb Servlet中url-pattern的使用,包括其用途、语法、注意事项以及两个示例。 一、url-pattern的用途 url-pattern是Servlet负责处理HTT…

    Java 2023年6月15日
    00
  • JS验证身份证有效性示例

    关于 JS 验证身份证有效性示例,我们可以采取以下步骤: 1. 获取身份证号码并进行格式验证 首先,我们需要获取用户输入的身份证号码,然后判断其长度是否为 18 位,并且最后一位是否为数字或字母 X(表示校验位)。具体实现代码如下: const idCard = document.getElementById(‘idCard’).value // 长度验证 …

    Java 2023年6月15日
    00
  • Java 完美判断中文字符的方法

    Java 完美判断中文字符的方法 在Java程序中,经常需要对中文字符进行操作,例如输入、输出、比较、查找等等。因此如何正确判断中文字符就显得非常重要。下面将介绍一些常见的方法。 方法一:使用正则表达式 正则表达式可以用来判断一个字符串是否为中文字符。可以使用Unicode编码来匹配中文字符。 以下是一个示例代码: public static boolean…

    Java 2023年5月27日
    00
  • java实现图片转base64字符串 java实现base64字符串转图片

    Java实现图片转Base64字符串和Base64字符串转图片的过程可以分为两部分进行: 第一部分:图片转Base64字符串。 使用Java中的File类或者ImageIO类读取图片文件,并将其转化为BufferedImage对象,如下代码: File file = new File("test.png"); BufferedImage …

    Java 2023年5月29日
    00
  • 详解Java中格式化日期的DateFormat与SimpleDateFormat类

    详解Java中格式化日期的DateFormat与SimpleDateFormat类 在Java编程中,时间和日期的操作是比较常见的,因此学习Java中时间和日期的处理是很有必要的。在Java中,可以使用 DateFormat 和 SimpleDateFormat 类来对日期进行格式化。 DateFormat类 DateFormat 类是抽象类,提供了与日期相…

    Java 2023年5月20日
    00
合作推广
合作推广
分享本页
返回顶部