RocketMQ NameServer 核心源码解析

yizhihongxing

那么我来为你详细讲解“RocketMQ NameServer 核心源码解析”的完整攻略。

1. 概述

在 RocketMQ 中,NameServer 是一个极为重要的组件,它充当了消息路由和负载均衡的角色,主要负责以下三个功能:
1. 维护 Broker 的路由信息
2. 维护 Consumer 的消费信息
3. 维护 Topic 的信息

在这里,我们将介绍 NameServer 的核心源码,并深入分析它的组成结构和关键实现方式。

2. NameServer 核心源码解析

2.1 NameServer 启动过程

在 RocketMQ 中,NameServer 的启动入口类为org.apache.rocketmq.namesrv.NamesrvStartup,其它依赖的类和包括的配置文件都会在该类中运行。在启动过程中,主要涉及到以下几步:
1. 加载 NameServer 配置信息
2. 启动 Netty Server(用于接收 Broker 和 Consumer 的注册请求)
3. 注册 Broker、心跳检测和负载均衡
4. 加载 Topic,处理 MQTT 消息等

2.2 NameServer 的关键类

2.2.1 NameServerController

NameServerController是 NameServer 的核心组件,它主要负责 NameServer 的启停、Broker 和 Consumer 的注册,路由信息的维护等。

2.2.2 RouteInfoManager

NameServer 维护的核心数据结构。RouteInfoManager 中存储了 Broker、Topic、Queue 等路由信息,每个 Broker 对应的发送消息队列、消费消息队列等信息也都在这里。

2.2.3 BrokerHousekeepingService

用于存储和维护 Broker 向 NameServer 注册的路由信息,通过扫描超时未更新路由信息的 Broker,进行清理和剔除。

2.3 NameServer 的关键流程

2.3.1 NameServer 注册流程

当 Broker 或 Consumer 将自己注册到 NameServer 上时,NameServer 会进行相应的处理,将相关路由信息存储到 RouteInfoManager 中。当 Broker 或 Consumer 下线时,NameServer 也会实时更新相关路由信息。

2.3.2 负载均衡

当 Broker 启动时,会向 NameServer 定时发送心跳包,以便 NameServer 及时更新 Broker 负载均衡信息。NameServer 会维护当前所有 Broker 中存储的队列信息,以便在消息发送的时候进行优化路由计算。

2.4 示例说明

2.4.1 注册 Broker

当 Broker 启动时,会通过 Netty 向 NameServer 发送注册请求。源码如下:

public static void registerBrokerAll(final String clusterName, final String brokerAddr, final String brokerName, final long brokerId,
        final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final Channel channel) {
    try {
        // 获取 NameServer 控制器
        NamesrvController controller = NamesrvController.getNamesrvController();

        // 获取路由信息管理器
        RouteInfoManager routeInfoManager = controller.getRouteInfoManager();

        // 处理 Broker 注册逻辑
        BrokerData brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());
        ConcurrentHashMap<String, List<String>> brokerAddrsMap = routeInfoManager.getBrokerAddrTable();

        // 如果 Broker 不存在,则添加
        List<String> brokerNames = brokerAddrsMap.get(brokerAddr);
        if (null == brokerNames) {
            brokerNames = new LinkedList<>();
            brokerAddrsMap.put(brokerAddr, brokerNames);
        }
        if (!brokerNames.contains(brokerName)) {
            brokerNames.add(brokerName);
        }

        // 更新 BrokerInfo
        BrokerInfo brokerInfo = new BrokerInfo(
            brokerId,
            System.currentTimeMillis(),
            channel,
            filterServerList,
            topicConfigWrapper);

        brokerInfo = routeInfoManager.registerBroker(brokerAddr, brokerName, brokerInfo);

        // 发送注册响应
        channel.writeAndFlush(buildRegisterBrokerResponse(brokerInfo));
    } catch (Exception e) {
        log.error("registerBrokerAll Exception", e);
    }
}

2.4.2 负载均衡

当 Broker 启动时,会定时向 NameServer 发送心跳包,以便 NameServer 维护其当前的负载均衡数据。在心跳到达 NameServer 后,NameServer 会通过计算算法分配发送消息和消费消息时的路由信息。具体处理过程如下:

public List<MessageQueue> fetchSubscribeMessageQueues(String consumerGroup, Set<String> topics) {
    // 获取所有的 Topic
    Set<String> topicSet = this.routeInfoManager.getTopicList();
    topicSet = (null == topics || topics.isEmpty()) ? topicSet : topics;

    // 按照 Broker 进行消息队列信息分组
    Map<String/* brokerAddr */, List<MessageQueue>> brokerSubscribeInfoTable = new HashMap<>(16);

    for (String topic : topicSet) {
        try {
            // 查找 Topic 绑定的所有消费者信息
            TopicRouteData topicRouteData = this.routeInfoManager.pickupTopicRouteData(topic);
            if (null == topicRouteData || null == topicRouteData.getBrokerDataList() || topicRouteData.getBrokerDataList().isEmpty()) {
                continue;
            }

            // 遍历所有的 Broker
            for (BrokerData brokerData : topicRouteData.getBrokerDataList()) {
                String addr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
                if (addr != null) {
                    // 根据地址获得所有的消费者列表
                    List<String> groupList = brokerData.getBrokerAddrs().get(MixAll.RETRY_GROUP_TOPIC_PREFIX + consumerGroup);
                    if (groupList == null || groupList.isEmpty()) {
                        groupList = brokerData.getBrokerAddrs().get(consumerGroup);
                    }
                    if (groupList != null && !groupList.isEmpty()) {
                        // 遍历所有的消费者,计算消费队列路由信息
                        for (String group : groupList) {
                            ConsumerData consumerData = this.routeInfoManager.getConsumerData(consumerGroup, group);
                            if (consumerData == null) {
                                log.error(
                                    "[NOTIFYME] consumer group: {} consumer: {} not online",
                                    consumerGroup,
                                    group);
                                continue;
                            }

                            // 计算消费者订阅的队列,返回队列列表
                            List<MessageQueue> mqSet = this.allocateMessageQueueStrategy.allocate(
                                consumerGroup,
                                addr,
                                consumerData.getSubscription(),
                                this.topicQueueTable.get(topic));
                            if (!mqSet.isEmpty()) {
                                List<MessageQueue> prev = brokerSubscribeInfoTable.putIfAbsent(addr, mqSet);
                                if (prev != null) {
                                    prev.addAll(mqSet);
                                }
                            }
                        }
                    }
                }
            }
        } catch (Throwable ignored) {
        }
    }

    List<MessageQueue> result = new ArrayList<>();
    for (List<MessageQueue> mqs : brokerSubscribeInfoTable.values()) {
        result.addAll(mqs);
    }
    // 按照消息队列顺序排序
    Collections.sort(result);
    return result;
}

以上就是关于“RocketMQ NameServer 核心源码解析”的完整攻略,同时我还提供了两个示例以便更好地帮助你理解。如果还有其它问题,欢迎继续提问哦。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:RocketMQ NameServer 核心源码解析 - Python技术站

(0)
上一篇 2023年6月16日
下一篇 2023年6月16日

相关文章

  • Spring mvc 分步式session的实例详解

    Spring MVC 分步式Session的实例详解 在Spring MVC中,Session是一种用于在服务器端存储用户数据的机制。本文将详细介绍Spring MVC中分步式Session的实现方法,并提供两个示例来说明如何实现这一过程。 分步式Session的实现方法 在Spring MVC中,分步式Session是一种将Session数据分散存储在多个…

    Java 2023年5月17日
    00
  • Kafka常用命令之kafka-console-consumer.sh解读

    Kafka是一个分布式消息系统,常用于构建实时流数据管道和数据处理应用程序。kafka-console-consumer.sh是Kafka的一个命令行消费者,可以用来消费Kafka中的消息。本文将详细讲解kafka-console-consumer.sh的使用方法和常用参数。 kafka-console-consumer.sh命令的基础用法 命令格式 bin…

    Java 2023年5月20日
    00
  • java连接MySQL数据库实现代码

    下面就来详细讲解如何使用Java连接MySQL数据库实现相关代码: 准备工作 首先需要下载安装MySQL数据库,安装完成后需要设置用户名和密码。 接着下载并安装Java SDK,在使用Java访问MySQL之前需要下载并安装MySQL JDBC驱动。 新建一个Java项目。 导入JDBC驱动 将下载好的MySQL JDBC驱动包(mysql-connecto…

    Java 2023年5月19日
    00
  • SpringBoot +DynamicDataSource切换多数据源的全过程

    下面我就来详细讲解SpringBoot + DynamicDataSource切换多数据源的全过程。 1. 概述 在实际项目中,经常会遇到需要切换多数据源的情况,SpringBoot + DynamicDataSource可以很好地解决这个问题。本文将介绍如何使用SpringBoot + DynamicDataSource实现多数据源的切换过程。 2. 示例…

    Java 2023年6月3日
    00
  • 解决dubbo错误ip及ip乱入问题的方法

    解决dubbo错误ip及ip乱入问题的方法 在使用dubbo进行微服务开发时,可能会出现一些ip相关的问题,如服务提供者使用了错误的ip地址进行暴露,或者消费者调用时使用了错误的ip地址等等。这些问题会导致服务无法正常运行。本攻略将介绍如何解决dubbo错误ip及ip乱入问题。 Dubbo服务提供者使用了错误的ip地址进行暴露 在dubbo的服务提供者端,可…

    Java 2023年6月2日
    00
  • Java字符串替换函数replace()用法解析

    Java字符串替换函数replace()用法解析 在Java中,我们常常需要对字符串进行替换操作。其中,最常用的就是 replace() 函数。本文将为你详细讲解 replace() 函数的使用方法和注意事项。 replace() 函数基本用法 replace() 函数的基本用法是:将原字符串中的某个字符或字符串替换成新的字符或字符串。 public Str…

    Java 2023年5月26日
    00
  • Spring Data JPA例子代码[基于Spring Boot、Mysql]

    下面是“Spring Data JPA例子代码[基于Spring Boot、Mysql]”的完整攻略。 简介 Spring Data JPA是基于JPA规范的一种框架,结合Spring Data,可以方便地访问和操作关系型数据库。本文基于Spring Boot和Mysql数据库,演示了Spring Data JPA的使用方法。 前置准备 在开始之前,您需要准…

    Java 2023年6月2日
    00
  • Java数据溢出代码详解

    Java数据溢出代码详解 什么是数据溢出? 在计算机程序中,数据溢出指的是计算结果超出了数据类型所能表示范围的情况。在Java程序中,数据溢出会导致程序运行出错或计算结果不准确。 数据溢出的原因 Java中的数据类型有固定的范围,例如byte类型的范围是-128到127,short类型的范围是-32768到32767,当我们使用一个超出范围的值进行计算时,结…

    Java 2023年5月26日
    00
合作推广
合作推广
分享本页
返回顶部