下面是“Linux安装RocketMQ实例步骤”的完整攻略。
准备工作
在进行安装前,请确保你已经完成如下步骤:
- 安装好Java环境(建议使用JDK 8及以上版本)。
- 确认安装好了RocketMQ服务端的压缩包(下载地址详见官网)。
- 确认你拥有安装并运行RocketMQ所需的系统权限。
安装步骤
- 下载RocketMQ服务端的压缩包,解压到指定目录下:
bash
tar zxvf rocketmq-all-4.9.1-bin-release.tar.gz -C /usr/local/
- 配置环境变量:
在~/.bashrc
或~/.bash_profile
末尾追加以下内容:
bash
export ROCKETMQ_HOME=/usr/local/rocketmq-all-4.9.1-bin-release
export PATH=$PATH:$ROCKETMQ_HOME/bin
让配置生效:
bash
source ~/.bashrc
- 启动Name Server和Broker:
bash
nohup sh $ROCKETMQ_HOME/bin/mqnamesrv &
nohup sh $ROCKETMQ_HOME/bin/mqbroker -n localhost:9876 &
- 验证是否启动成功:
bash
tail -f $ROCKETMQ_HOME/logs/rocketmqlogs/namesrv.log
tail -f $ROCKETMQ_HOME/logs/rocketmqlogs/broker.log
上述命令行的输出日志中如果没有错误信息,则说明安装成功。
示例说明
示例一:创建Topic
创建一个示例的test_topic
Topic,分别在两个Producer和Consumer上发布和接收消息。
- 先启动Consumer:
bash
sh $ROCKETMQ_HOME/bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer test_consumer_group test_topic localhost:9876
- 再启动两个Producer:
bash
sh $ROCKETMQ_HOME/bin/tools.sh org.apache.rocketmq.example.quickstart.Producer test_topic localhost:9876
sh $ROCKETMQ_HOME/bin/tools.sh org.apache.rocketmq.example.quickstart.Producer test_topic localhost:9876
- 通过Producer分别向Topic发送消息:
bash
hello rocketmq 1
hello rocketmq 2
- 在Consumer上看是否可以接收到消息:
CONSUME: ConsumeResult [MessageExt=messageExt {topic='test_topic', brokerName='**', queueId=1, storeSize=129, queueOffset=1, sysFlag=0, bornTimestamp=1642424835662, bornHost=/192.168.1.1:40482, storeTimestamp=1642424835728, storeHost=/192.168.1.1:10911, msgId='3662D71846360DD1AAC4926847E00000', commitLogOffset=889, bodyCRC=1583895369, reconsumeTimes=0, preparedTransactionOffset=0}, status=CONSUME_SUCCESS, isSuccess=true]
CONSUME: ConsumeResult [MessageExt=messageExt {topic='test_topic', brokerName='**', queueId=1, storeSize=129, queueOffset=2, sysFlag=0, bornTimestamp=1642424841707, bornHost=/192.168.1.1:40482, storeTimestamp=1642424841726, storeHost=/192.168.1.1:10911, msgId='3662D71B00000DD1AAC4926847FE0000', commitLogOffset=1705, bodyCRC=-1914527645, reconsumeTimes=0, preparedTransactionOffset=0}, status=CONSUME_SUCCESS, isSuccess=true]
示例二:消息发送状态查询
通过查询消息发送状态,可以判断消息是否发送成功,并找到发送失败的原因。
- 先启动Producer:
bash
sh $ROCKETMQ_HOME/bin/tools.sh org.apache.rocketmq.example.quickstart.Producer test_topic localhost:9876
- 发送消息并查看发送状态:
bash
sh $ROCKETMQ_HOME/bin/rocketmq-console.sh queryMsgById -n localhost:9876 --msgId=C0A81A0100002A9F00000000003C4F2E
上述命令中的--msgId
参数值需要替换为实际发送的消息ID。
如果消息发送成功,则会返回如下信息:
{
"msgId": "C0A81A0100002A9F00000000003C4F2E",
"messageQueue": {
"brokerName": "**",
"topic": "test_topic",
"queueId": 0
},
"status": "SEND_OK",
"body": "***",
"QueueOffset": 6,
"queueId": 0,
"transactionId": null,
"bornTimestamp": 1642429384913,
"storeTimestamp": 1642429384945,
"bornHost": "192.168.1.1:38676",
"storeHost": "192.168.1.1:10911",
"commitLogOffset": 1573,
"bodyCRC": 229289180,
"reconsumeTimes": 0
}
如果消息发送失败,则会返回如下信息:
{
"msgId": "C0A81A0100002A9F00000000003C4F2E",
"messageQueue": {
"brokerName": "**",
"topic": "test_topic",
"queueId": 0
},
"status": "SEND_OK",
"body": "***",
"QueueOffset": 6,
"queueId": 0,
"transactionId": null,
"bornTimestamp": 1642429384913,
"storeTimestamp": 1642429384945,
"bornHost": "192.168.1.1:38676",
"storeHost": "192.168.1.1:10911",
"commitLogOffset": 1573,
"bodyCRC": 229289180,
"reconsumeTimes": 3
}
如果消息发送过程中遇到了某些问题,那么status
字段的值就会反映出对应的情况。例如,如果因为Broker无法写入消息而发送失败,则status
字段的值为FLUSH_DISK_TIMEOUT
。在查看消息发送状态时,你需要参考官方文档来对各种发送状态进行识别和判断。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:linux安装RocketMQ实例步骤 - Python技术站