以下是“RocketMQ生产消息与消费消息超详细讲解”的完整攻略,包含两个示例说明。
简介
RocketMQ是阿里巴巴开源的分布式消息中间件,具有高吞吐量、高可用性、可伸缩性等特点。本教程将介绍如何使用RocketMQ生产消息和消费消息,并提供两个示例说明。
示例1:生产和消费简单消息
以下是一个生产和消费简单消息的示例:
1. 添加依赖
在Maven项目中,添加以下依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.0</version>
</dependency>
2. 生产消息
创建一个生产者Producer
,并发送消息:
public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message message = new Message("topic", "Hello, RocketMQ!".getBytes(StandardCharsets.UTF_8));
SendResult result = producer.send(message);
System.out.println("Send result: " + result);
producer.shutdown();
}
}
在这个示例中,我们创建了一个生产者Producer
,并使用DefaultMQProducer
类发送消息。我们设置了生产者组名group
和NameServer地址localhost:9876
,并发送了一条消息到主题topic
。
3. 消费消息
创建一个消费者Consumer
,并消费消息:
public class Consumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("topic", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
System.out.println("Receive message: " + new String(msgs.get(0).getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.out.println("Consumer started.");
}
}
在这个示例中,我们创建了一个消费者Consumer
,并使用DefaultMQPushConsumer
类消费消息。我们设置了消费者组名group
和NameServer地址localhost:9876
,并订阅了主题topic
。我们使用registerMessageListener
方法注册了一个消息监听器,并在监听器中输出接收到的消息。
4. 运行程序
运行生产者和消费者程序,并查看控制台输出。
现在,您应该能够看到生产者输出Send result: SendResult [sendStatus=SEND_OK, msgId=...]
,消费者输出Receive message: Hello, RocketMQ!
。
示例2:生产和消费顺序消息
以下是一个生产和消费顺序消息的示例:
1. 添加依赖
在Maven项目中,添加以下依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.0</version>
</dependency>
2. 生产顺序消息
创建一个生产者Producer
,并发送顺序消息:
public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 10; i++) {
Message message = new Message("topic", ("Hello, RocketMQ! " + i).getBytes(StandardCharsets.UTF_8));
SendResult result = producer.send(message, (list, msg, arg) -> {
int index = (int) arg;
return list.get(index % list.size());
}, i);
System.out.println("Send result: " + result);
}
producer.shutdown();
}
}
在这个示例中,我们创建了一个生产者Producer
,并使用DefaultMQProducer
类发送顺序消息。我们设置了生产者组名group
和NameServer地址localhost:9876
,并发送了10条消息到主题topic
。我们使用send
方法发送消息,并使用MessageQueueSelector
接口实现顺序发送。我们在send
方法的第三个参数中传递了一个参数i
,用于计算消息应该发送到哪个队列。
3. 消费顺序消息
创建一个消费者Consumer
,并消费顺序消息:
public class Consumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("topic", "*");
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
System.out.println("Receive message: " + new String(msgs.get(0).getBody()));
return ConsumeOrderlyStatus.SUCCESS;
});
consumer.start();
System.out.println("Consumer started.");
}
}
在这个示例中,我们创建了一个消费者Consumer
,并使用DefaultMQPushConsumer
类消费顺序消息。我们设置了消费者组名group
和NameServer地址localhost:9876
,并订阅了主题topic
。我们使用registerMessageListener
方法注册了一个消息监听器,并在监听器中输出接收到的消息。
4. 运行程序
运行生产者和消费者程序,并查看控制台输出。
现在,您应该能够看到生产者输出10条消息的发送结果,消费者输出10条消息的接收结果,并且消息的顺序与发送顺序相同。
总结
RocketMQ是阿里巴巴开源的分布式消息中间件,具有高吞吐量、高可用性、可伸缩性等特点。在本教程中,我们介绍了如何使用RocketMQ生产消息和消费消息,并提供了两个示例说明。我们还介绍了如何生产和消费顺序消息,并提供了一个示例说明。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:RocketMQ生产消息与消费消息超详细讲解 - Python技术站