下面是关于SpringBoot整合RabbitMQ实现六种工作模式的示例的完整攻略。
1. 简介
RabbitMQ是一种常用的消息队列中间件,而SpringBoot是一种流行的Java开发框架。本文将介绍如何使用SpringBoot整合RabbitMQ,并实现六种工作模式。
2. 环境准备
在开始之前,需要确保已经安装了以下软件:
- JDK 1.8或更高版本
- Maven 3.0或更高版本
- RabbitMQ 3.0或更高版本
3. 创建SpringBoot项目
首先,需要创建一个SpringBoot项目。可以使用Spring Initializr或手动创建一个Maven项目。在pom.xml文件中添加以下依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
这个依赖包含了SpringBoot对RabbitMQ的支持。
4. 配置RabbitMQ连接信息
在application.properties文件中添加以下配置:
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
这些配置项指定了RabbitMQ的连接信息。
5. 实现六种工作模式
5.1 简单模式
简单模式是最基本的工作模式,也是最简单的工作模式。在简单模式中,一个生产者向一个队列发送消息,一个消费者从该队列中接收消息。
5.1.1 生产者
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SimpleProducer implements CommandLineRunner {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private Queue queue;
public static void main(String[] args) {
SpringApplication.run(SimpleProducer.class, args);
}
@Override
public void run(String... args) throws Exception {
String message = "Hello, RabbitMQ!";
rabbitTemplate.convertAndSend(queue.getName(), message);
System.out.println("Sent message: " + message);
}
}
5.1.2 消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class SimpleConsumer {
@RabbitListener(queues = "${simple.queue.name}")
public void receiveMessage(String message) {
System.out.println("Received message: " + message);
}
}
5.2 工作模式
工作模式是一种简单的负载均衡模式。在工作模式中,多个消费者从同一个队列中接收消息,每个消息只能被一个消费者处理。
5.2.1 生产者
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class WorkProducer implements CommandLineRunner {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private Queue queue;
public static void main(String[] args) {
SpringApplication.run(WorkProducer.class, args);
}
@Override
public void run(String... args) throws Exception {
for (int i = 0; i < 10; i++) {
String message = "Message " + i;
rabbitTemplate.convertAndSend(queue.getName(), message);
System.out.println("Sent message: " + message);
}
}
}
5.2.2 消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class WorkConsumer {
@RabbitListener(queues = "${work.queue.name}")
public void receiveMessage(String message) throws InterruptedException {
System.out.println("Received message: " + message);
Thread.sleep(1000);
}
}
5.3 发布/订阅模式
发布/订阅模式是一种广播模式。在发布/订阅模式中,一个生产者向一个交换机发送消息,多个消费者从该交换机中接收消息。
5.3.1 生产者
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class PublishProducer implements CommandLineRunner {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private FanoutExchange exchange;
public static void main(String[] args) {
SpringApplication.run(PublishProducer.class, args);
}
@Override
public void run(String... args) throws Exception {
String message = "Hello, RabbitMQ!";
rabbitTemplate.convertAndSend(exchange.getName(), "", message);
System.out.println("Sent message: " + message);
}
}
5.3.2 消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class PublishConsumer1 {
@RabbitListener(queues = "${publish.queue.name}")
public void receiveMessage(String message) {
System.out.println("Consumer1 received message: " + message);
}
}
@Component
public class PublishConsumer2 {
@RabbitListener(queues = "${publish.queue.name}")
public void receiveMessage(String message) {
System.out.println("Consumer2 received message: " + message);
}
}
5.4 路由模式
路由模式是一种根据路由键选择消费者的模式。在路由模式中,一个生产者向一个交换机发送消息,并指定一个路由键,多个消费者从该交换机中接收消息,但只有路由键与消费者绑定的路由键相同的消息才会被该消费者接收。
5.4.1 生产者
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RouteProducer implements CommandLineRunner {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private DirectExchange exchange;
public static void main(String[] args) {
SpringApplication.run(RouteProducer.class, args);
}
@Override
public void run(String... args) throws Exception {
String message1 = "Hello, RabbitMQ!";
String message2 = "Goodbye, RabbitMQ!";
rabbitTemplate.convertAndSend(exchange.getName(), "info", message1);
rabbitTemplate.convertAndSend(exchange.getName(), "error", message2);
System.out.println("Sent messages: " + message1 + ", " + message2);
}
}
5.4.2 消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class RouteConsumer1 {
@RabbitListener(queues = "${route.queue.name1}")
public void receiveMessage(String message) {
System.out.println("Consumer1 received message: " + message);
}
}
@Component
public class RouteConsumer2 {
@RabbitListener(queues = "${route.queue.name2}")
public void receiveMessage(String message) {
System.out.println("Consumer2 received message: " + message);
}
}
5.5 主题模式
主题模式是一种根据通配符选择消费者的模式。在主题模式中,一个生产者向一个交换机发送消息,并指定一个主题,多个消费者从该交换机中接收消息,但只有主题与消费者绑定的主题相匹配的消息才会被该消费者接收。
5.5.1 生产者
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class TopicProducer implements CommandLineRunner {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private TopicExchange exchange;
public static void main(String[] args) {
SpringApplication.run(TopicProducer.class, args);
}
@Override
public void run(String... args) throws Exception {
String message1 = "Hello, RabbitMQ!";
String message2 = "Goodbye, RabbitMQ!";
rabbitTemplate.convertAndSend(exchange.getName(), "info.message", message1);
rabbitTemplate.convertAndSend(exchange.getName(), "error.message", message2);
System.out.println("Sent messages: " + message1 + ", " + message2);
}
}
5.5.2 消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class TopicConsumer1 {
@RabbitListener(queues = "${topic.queue.name1}")
public void receiveMessage(String message) {
System.out.println("Consumer1 received message: " + message);
}
}
@Component
public class TopicConsumer2 {
@RabbitListener(queues = "${topic.queue.name2}")
public void receiveMessage(String message) {
System.out.println("Consumer2 received message: " + message);
}
}
5.6 延迟队列模式
延迟队列模式是一种延迟执行任务的模式。在延迟队列模式中,一个生产者向一个延迟队列发送消息,一个消费者从该延迟队列中接收消息,并在指定的时间后将消息发送到目标队列中。
5.6.1 生产者
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class DelayedProducer implements CommandLineRunner {
@Autowired
private AmqpTemplate rabbitTemplate;
public static void main(String[] args) {
SpringApplication.run(DelayedProducer.class, args);
}
@Override
public void run(String... args) throws Exception {
String message = "Hello, RabbitMQ!";
rabbitTemplate.convertAndSend("delayed.exchange", "delayed.routing.key", message, message1 -> {
message1.getMessageProperties().setDelay(5000);
return message1;
});
System.out.println("Sent message: " + message);
}
}
5.6.2 消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class DelayedConsumer {
@RabbitListener(queues = "${delayed.queue.name}")
public void receiveMessage(String message) {
System.out.println("Received message: " + message);
}
}
6. 总结
本文介绍了如何使用SpringBoot整合RabbitMQ,并实现了六种工作模式。这些工作模式包括简单模式、工作模式、发布/订阅模式、路由模式、主题模式和延迟队列模式。通过这些示例,可以更好地理解RabbitMQ的工作原理和使用方法。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:SpringBoot整合RabbitMQ实现六种工作模式的示例 - Python技术站