那么我来为你详细讲解“RocketMQ NameServer 核心源码解析”的完整攻略。
1. 概述
在 RocketMQ 中,NameServer 是一个极为重要的组件,它充当了消息路由和负载均衡的角色,主要负责以下三个功能:
1. 维护 Broker 的路由信息
2. 维护 Consumer 的消费信息
3. 维护 Topic 的信息
在这里,我们将介绍 NameServer 的核心源码,并深入分析它的组成结构和关键实现方式。
2. NameServer 核心源码解析
2.1 NameServer 启动过程
在 RocketMQ 中,NameServer 的启动入口类为org.apache.rocketmq.namesrv.NamesrvStartup
,其它依赖的类和包括的配置文件都会在该类中运行。在启动过程中,主要涉及到以下几步:
1. 加载 NameServer 配置信息
2. 启动 Netty Server(用于接收 Broker 和 Consumer 的注册请求)
3. 注册 Broker、心跳检测和负载均衡
4. 加载 Topic,处理 MQTT 消息等
2.2 NameServer 的关键类
2.2.1 NameServerController
NameServerController
是 NameServer 的核心组件,它主要负责 NameServer 的启停、Broker 和 Consumer 的注册,路由信息的维护等。
2.2.2 RouteInfoManager
NameServer 维护的核心数据结构。RouteInfoManager 中存储了 Broker、Topic、Queue 等路由信息,每个 Broker 对应的发送消息队列、消费消息队列等信息也都在这里。
2.2.3 BrokerHousekeepingService
用于存储和维护 Broker 向 NameServer 注册的路由信息,通过扫描超时未更新路由信息的 Broker,进行清理和剔除。
2.3 NameServer 的关键流程
2.3.1 NameServer 注册流程
当 Broker 或 Consumer 将自己注册到 NameServer 上时,NameServer 会进行相应的处理,将相关路由信息存储到 RouteInfoManager 中。当 Broker 或 Consumer 下线时,NameServer 也会实时更新相关路由信息。
2.3.2 负载均衡
当 Broker 启动时,会向 NameServer 定时发送心跳包,以便 NameServer 及时更新 Broker 负载均衡信息。NameServer 会维护当前所有 Broker 中存储的队列信息,以便在消息发送的时候进行优化路由计算。
2.4 示例说明
2.4.1 注册 Broker
当 Broker 启动时,会通过 Netty 向 NameServer 发送注册请求。源码如下:
public static void registerBrokerAll(final String clusterName, final String brokerAddr, final String brokerName, final long brokerId,
final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final Channel channel) {
try {
// 获取 NameServer 控制器
NamesrvController controller = NamesrvController.getNamesrvController();
// 获取路由信息管理器
RouteInfoManager routeInfoManager = controller.getRouteInfoManager();
// 处理 Broker 注册逻辑
BrokerData brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());
ConcurrentHashMap<String, List<String>> brokerAddrsMap = routeInfoManager.getBrokerAddrTable();
// 如果 Broker 不存在,则添加
List<String> brokerNames = brokerAddrsMap.get(brokerAddr);
if (null == brokerNames) {
brokerNames = new LinkedList<>();
brokerAddrsMap.put(brokerAddr, brokerNames);
}
if (!brokerNames.contains(brokerName)) {
brokerNames.add(brokerName);
}
// 更新 BrokerInfo
BrokerInfo brokerInfo = new BrokerInfo(
brokerId,
System.currentTimeMillis(),
channel,
filterServerList,
topicConfigWrapper);
brokerInfo = routeInfoManager.registerBroker(brokerAddr, brokerName, brokerInfo);
// 发送注册响应
channel.writeAndFlush(buildRegisterBrokerResponse(brokerInfo));
} catch (Exception e) {
log.error("registerBrokerAll Exception", e);
}
}
2.4.2 负载均衡
当 Broker 启动时,会定时向 NameServer 发送心跳包,以便 NameServer 维护其当前的负载均衡数据。在心跳到达 NameServer 后,NameServer 会通过计算算法分配发送消息和消费消息时的路由信息。具体处理过程如下:
public List<MessageQueue> fetchSubscribeMessageQueues(String consumerGroup, Set<String> topics) {
// 获取所有的 Topic
Set<String> topicSet = this.routeInfoManager.getTopicList();
topicSet = (null == topics || topics.isEmpty()) ? topicSet : topics;
// 按照 Broker 进行消息队列信息分组
Map<String/* brokerAddr */, List<MessageQueue>> brokerSubscribeInfoTable = new HashMap<>(16);
for (String topic : topicSet) {
try {
// 查找 Topic 绑定的所有消费者信息
TopicRouteData topicRouteData = this.routeInfoManager.pickupTopicRouteData(topic);
if (null == topicRouteData || null == topicRouteData.getBrokerDataList() || topicRouteData.getBrokerDataList().isEmpty()) {
continue;
}
// 遍历所有的 Broker
for (BrokerData brokerData : topicRouteData.getBrokerDataList()) {
String addr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (addr != null) {
// 根据地址获得所有的消费者列表
List<String> groupList = brokerData.getBrokerAddrs().get(MixAll.RETRY_GROUP_TOPIC_PREFIX + consumerGroup);
if (groupList == null || groupList.isEmpty()) {
groupList = brokerData.getBrokerAddrs().get(consumerGroup);
}
if (groupList != null && !groupList.isEmpty()) {
// 遍历所有的消费者,计算消费队列路由信息
for (String group : groupList) {
ConsumerData consumerData = this.routeInfoManager.getConsumerData(consumerGroup, group);
if (consumerData == null) {
log.error(
"[NOTIFYME] consumer group: {} consumer: {} not online",
consumerGroup,
group);
continue;
}
// 计算消费者订阅的队列,返回队列列表
List<MessageQueue> mqSet = this.allocateMessageQueueStrategy.allocate(
consumerGroup,
addr,
consumerData.getSubscription(),
this.topicQueueTable.get(topic));
if (!mqSet.isEmpty()) {
List<MessageQueue> prev = brokerSubscribeInfoTable.putIfAbsent(addr, mqSet);
if (prev != null) {
prev.addAll(mqSet);
}
}
}
}
}
}
} catch (Throwable ignored) {
}
}
List<MessageQueue> result = new ArrayList<>();
for (List<MessageQueue> mqs : brokerSubscribeInfoTable.values()) {
result.addAll(mqs);
}
// 按照消息队列顺序排序
Collections.sort(result);
return result;
}
以上就是关于“RocketMQ NameServer 核心源码解析”的完整攻略,同时我还提供了两个示例以便更好地帮助你理解。如果还有其它问题,欢迎继续提问哦。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:RocketMQ NameServer 核心源码解析 - Python技术站