当你需要对RocketMQ这个消息中间件进行二次开发或者调试的时候,我们需要搭建RocketMQ源码环境,从而可以方便地进行代码调试以及定位问题。接下来,我会为你介绍如何在本地搭建RocketMQ源码的开发环境,并且通过两个示例来演示如何进行调试。
环境准备
在开始搭建RocketMQ源码环境之前,需要您本地已经准备好以下环境:
- JDK1.8及以上
- Git
- Maven
- IDE(建议使用IntelliJ IDEA)
步骤1:克隆源代码
首先,我们需要从RocketMQ的Github仓库中将源代码克隆到本地。在终端中执行以下命令:
git clone https://github.com/apache/rocketmq.git
这个命令将会在当前目录下创建一个rocketmq的文件夹,并且下载RocketMQ的源代码到该文件夹中。
步骤2:编译源代码
进入到rocketmq文件夹中,执行以下命令编译源代码:
mvn -Prelease-all -DskipTests clean install -U
这个命令将会对RocketMQ源代码进行编译,并且打包生成相关的jar包。
示例1:如何调试producer发送消息的源码
接下来,我将通过一个示例来说明如何在本地搭建RocketMQ源码开发环境,并且进行代码调试。这个示例的场景是在producer端发送消息的时候调试源代码。
步骤1:启动RocketMQ服务
首先,我们需要启动RocketMQ服务。在终端中进入apache-rocketmq-all模块的bin目录,并且执行以下命令:
sh mqnamesrv
这个命令将会启动namesrv服务。
然后,我们需要在终端中进入apache-rocketmq-all模块的bin目录,并且执行以下命令:
sh mqbroker -n localhost:9876 autoCreateTopicEnable=true
这个命令将会启动broker服务。
步骤2:调试代码
接下来,在IDE中打开RocketMQ的源代码,将producer端的源码添加到我们自己的工程中。
在IDE中打开Producer代码,找到SendMessageService类中的send(…)方法。在代码的第39行添加断点。
public String send(Message msg,
SendMessageRequestHeader requestHeader,
long timeoutMillis,
final CommunicationMode communicationMode,
final SendCallback sendCallback) throws InterruptedException, RemotingException {
...
String[] brokerAddrs = new String[4];
brokerAddrs[0] = "localhost:10911";
brokerAddrs[1] = "localhost:20911";
brokerAddrs[2] = "localhost:30911";
brokerAddrs[3] = "localhost:40911";
SendResult sendResult = null;
for (String addr : brokerAddrs) {
try {
sendResult = this.sendKernelImpl(msg, requestHeader, addr, timeoutMillis, communicationMode, sendCallback);
if (sendResult != null) {
break;
}
} catch (Exception e) {
// log ...
}
}
return sendResult == null ? null : sendResult.getMsgId();
}
然后,在我们自己的工程中调用sendMessage()方法,发送一条消息。当代码运行到我们在sendMessageService类中添加的断点时,IDE会自动进入调试模式,此时我们就可以在IDE中进行断点调试了。
示例2:如何调试Consumer消费消息的源码
接下来,我将通过另外一个示例来介绍如何在本地搭建RocketMQ源码环境,并且进行代码调试。这个示例的场景是在consumer端接收消息的时候调试源代码。
步骤1:启动RocketMQ服务
首先,我们需要启动RocketMQ的服务,与示例1一样,在终端中进入apache-rocketmq-all模块的bin目录,并且执行以下命令:
sh mqnamesrv
sh mqbroker -n localhost:9876 autoCreateTopicEnable=true
步骤2:编写consumer代码
然后,在自己工程中编写consumer代码,找到Consumer的代码,添加branche模块的源代码到自己的工程中。
在本地运行的ConfigureInitializer类中,添加如下代码:
MessageModel messageModel = MessageModel.CLUSTERING;
String consumerGroup = "please_rename_unique_group_name";
ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET;
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup + "_" + UUID.randomUUID().toString().substring(0, 5), false, j2EESyncPullBridge.getThreadPoolExecutor());
consumer.setMessageModel(messageModel);
consumer.setConsumeFromWhere(consumeFromWhere);
consumer.subscribe("rmq_j2ee_topic", "*");
consumer.setMaxReconsumeTimes(220);
consumer.setPullInterval(60 * 1000);
consumer.setPullBatchSize(32);
Gson gson = new Gson();
rocketMQMessageListener.setOnsListener(new BatchMessageListener() {
@Override
public Action consume(List<MessageExt> msgs, ConsumeOrderlyContext context) {
List<TemplateMessage> resultList = new ArrayList<>();
for (MessageExt message : msgs) {
try {
String messageBody = new String(message.getBody());
TemplateMessage templateMessage = gson.fromJson(messageBody, TemplateMessage.class);
resultList.add(templateMessage);
} catch (Exception e) {
logger.warn("j2ee callback从mq中拉取消息,解析失败。messageId:" + message.getMsgId());
return Action.ReconsumeLater;
}
}
return Action.CommitMessage;
}
});
consumer.registerMessageListener(rocketMQMessageListener);
try {
consumer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
步骤3:调试代码
在IDE中打开Consumer的源代码,找到ConsumeMessageService类中的consumeMessageDirectly(…)方法。在代码的第62行添加断点。
public ConsumeMessageDirectlyResult consumeMessageDirectly(//
final String consumerGroup,//
final String clientId,//
final String topic,//
final String brokerName,//
final byte[] msgId, //
final long timeoutMillis,//
final boolean isUnitMode) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
...
ConsumeMessageDirectlyResult result = consumeMessageDirectlyResult != null ? JsonUtil.fromJson(consumeMessageDirectlyResult, ConsumeMessageDirectlyResult.class) : null;
return result;
}
然后,在我们自己的工程中启动consumer,在IDE中执行我们编写的consumer代码,等待consumer接收一条消息。当代码运行到我们在consumeMessageService类中添加的断点时,IDE会自动进入调试模式,此时我们就可以在IDE中进行断点调试了。
总结
通过以上的示例,我们可以看到如何在本地搭建RocketMQ源码环境,并且进行调试。在实际生产环境中,我们往往通过这种方式来解决问题,定位代码bug,并且进行二次开发。但是在实际的开发过程中,我们也需要注意到调试代码对性能的影响,合理地选择调试策略,不要在生产环境中长期调试代码。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:RocketMQ源码本地搭建调试方法 - Python技术站