RabbitMQ 最常用的三大模式实例解析
RabbitMQ 是一个开源的消息队列系统,支持多种消息递协议。在 RabbitMQ 中,有三种最常用的消息模式,分别是发布/订阅模式、工作队列模式和路由模式。本文将详细讲解这三种模式的实现方法,并提供两个示例说明。
发布/订阅模式
发布/订阅模式是一种常见的消息模型,也称为广播模式。在发布/订阅模式中,生产者将消息发送到一个交换机上,交换机将消息广播给所有与之绑定的队列。每个队列都有自己的消费者,消费者从队列中接收消息并进行处理。
示例一:使用 Java 实现发布/订阅模式
在本例中,我们将使用 Java 实现发布/订阅模式。具体步骤如下:
- 创建一个 RabbitMQ 的生产者并将消息设置为持久化消息。
- 创建一个队列并将其绑定到一个交换机上。
- 发送消息到交换机中。
public class RabbitMQPubSubExample {
private static final String EXCHANGE_NAME = "myExchange";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
for (int i = 0; i < 10; i++) {
String message = "Message " + i;
channel.basicPublish(EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
System.out.println("Sent message: " + message);
}
}
}
}
在上述代码中,我们创建了一个 RabbitMQ 的生产者并将消息设置为持久化消息。在 main
方法中,我们创建了一个交换机并将其类型设置为 fanout
,这意味着交换机将消息广播给所有与之绑定的队列。在 for
循环中,我们发送了 10 条消息到交换机中。
示例二:使用 Java 实现发布/订阅模式与多个队列
在本例中,我们将使用 Java 实现发布/订阅模式与多个队列。具体步骤如下:
- 创建一个 RabbitMQ 的生产者并将消息设置为持久化消息。
- 创建两个队列并将其绑定到一个交换机上。
- 发送消息到交换机中。
public class RabbitMQPubSubMultipleQueuesExample {
private static final String EXCHANGE_NAME = "myExchange";
private static final String QUEUE_NAME_1 = "myQueue1";
private static final String QUEUE_NAME_2 = "myQueue2";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
channel.queueDeclare(QUEUE_NAME_1, true, false, false, null);
channel.queueDeclare(QUEUE_NAME_2, true, false, false, null);
channel.queueBind(QUEUE_NAME_1, EXCHANGE_NAME, "");
channel.queueBind(QUEUE_NAME_2, EXCHANGE_NAME, "");
for (int i = 0; i < 10; i++) {
String message = "Message " + i;
channel.basicPublish(EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
System.out.println("Sent message: " + message);
}
}
}
}
在上述代码中,我们创建了一个 RabbitMQ 的生产者并将消息设置为持久化消息。在 main
方法中,我们创建了一个交换机并将其类型设置为 fanout
,这意味着交换机将消息广播给所有与之绑定的队列。我们还创建了两个队列并将其绑定到交换机上。在 for
循环中,我们发送了 10 条消息到交换机中。
工作队列模式
工作队列模式是一种常见的消息模型,也称为任务队列。在工作队列模式中,多个消费者共同消费同一个队列中的消息。当消息被发送到队列中时,它将被分配给其中一个消费者进行处理。每个消息只能被一个消费者处理,但是一个消费者可以处理多个消息。
示例一:使用 Java 实现工作队列模式
在本例中,我们将使用 Java 实现工作队列模式。具体步骤如下:
- 创建一个 RabbitMQ 的生产者并将消息设置为持久化消息。
- 创建一个 RabbitMQ 的消费者并确认消息已被接收。
- 创建一个队列并将其绑定到一个交换机上。
- 发送消息到队列中。
public class RabbitMQWorkerQueueExample {
private static final String QUEUE_NAME = "myQueue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received message: " + message);
try {
doWork(message);
} finally {
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);
for (int i = 0; i < 10; i++) {
String message = "Message " + i;
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
System.out.println("Sent message: " + message);
}
}
}
private static void doWork(String message) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
在上述代码中,我们创建了一个 RabbitMQ 的生产者并将消息设置为持久化消息,然后创建了一个 RabbitMQ 的消费者并确认消息已被接收。在 main
方法中,我们创建了一个队列并将其绑定到一个交换机上。在 handleDelivery
方法中,我们处理消息并确认消息已被费。在 main
方法中,我们发送了 10 条消息到队列中。
示例二:使用 Java 实现工作队列模式与消息应答
在本例中,我们将使用 Java 实现工作队列模式与消息应答。具体步骤如下:
- 创建一个 RabbitMQ 的生产者并将消息设置为持久化消息。
- 创建一个 RabbitMQ 的消费者并确认消息已被接收。
- 创建一个队列并将其绑定到一个交换机上。
- 发送消息到队列中。
- 确认消息已被消费。
public class RabbitMQWorkerQueueAckExample {
private static final String QUEUE_NAME = "myQueue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received message: " + message);
try {
doWork(message);
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(envelope.getDeliveryTag(), false, true);
}
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);
for (int i = 0; i < 10; i++) {
String message = "Message " + i;
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
System.out.println("Sent message: " + message);
}
}
}
private static void doWork(String message) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
在上述代码中,我们创建了一个 RabbitMQ 的生产者并将消息设置为持久化消息,然后创建了一个 RabbitMQ 的消费者并确认消息已被接收。在 main
方法中,我们创建了一个队列并将其绑定到一个交换机上。在 handleDelivery
方法中,我们处理消息并确认消息已被消费。在 main
方法中我们发送了 10 条消息到队列中,并在 doWork
方法中模拟了一个耗时任务。如果任务处理失败,我们将使用 channel.basicNack
方法将消息重新放回队列中。
路由模式
路由模式是一种常见的消息模型,也称为直连模式。在路由模式中,生产者将消息发送到一个交换机上,并指定一个路由键。交换机将消息路由到与之绑定的队列中,路由键与队列的绑定关系是通过路由键和队列名的匹配来实现的。
示例一:使用 Java 实现路由模式
在本例中,我们将使用 Java 实现路由模式。具体步骤如下:
- 创建一个 RabbitMQ 的生产者并将消息设置为持久化消息。
- 创建一个 RabbitMQ 的消费者并确认消息已被接收。
- 创建一个队列并将其绑定到一个交换机上,并指定一个路由键。
- 发送消息到交换机中,并指定一个路由键。
public class RabbitMQRoutingExample {
private static final String EXCHANGE_NAME = "myExchange";
private static final String QUEUE_NAME = "myQueue";
private static final String ROUTING_KEY = "myRoutingKey";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
for (int i = 0; i < 10; i++) {
String message = "Message " + i;
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
System.out.println("Sent message: " + message);
}
}
}
}
在上述代码中,我们创建了一个 RabbitMQ 的生产者并将消息设置为持久化消息。在 main
方法中,我们创建了一个交换机并将其类型设置为 direct
,这意味着交换机将消息路由到与之绑定的队列中。我们还创建了一个队列并将其绑定到交换机上,并指定了一个路由键。在 for
循环中,我们发送了 10 条消息到交换机中,并指定了一个路由键。
示例二:使用 Java 实现路由模式与多个队列
在本例中,我们将使用 Java 实现路由模式与多个队列。具体步骤如下:
- 创建一个 RabbitMQ 的生产者并将消息设置为持久化消息。
- 创建两个队列并将其绑定到一个交换机上,并指定不同的路由键。
- 发送消息到交换机中,并指定不同的路由键。
public class RabbitMQRoutingMultipleQueuesExample {
private static final String EXCHANGE_NAME = "myExchange";
private static final String QUEUE_NAME_1 = "myQueue1";
private static final String QUEUE_NAME_2 = "myQueue2";
private static final String ROUTING_KEY_1 = "myRoutingKey1";
private static final String ROUTING_KEY_2 = "myRoutingKey2";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.queueDeclare(QUEUE_NAME_1, true, false, false, null);
channel.queueDeclare(QUEUE_NAME_2, true, false, false, null);
channel.queueBind(QUEUE_NAME_1, EXCHANGE_NAME, ROUTING_KEY_1);
channel.queueBind(QUEUE_NAME_2, EXCHANGE_NAME, ROUTING_KEY_2);
for (int i = 0; i < 10; i++) {
String message = "Message " + i;
if (i % 2 == 0) {
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY_1, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
} else {
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY_2, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
}
System.out.println("Sent message: " + message);
}
}
}
}
在上述代码中,我们创建了一个 RabbitMQ 的生产者并将消息设置为持久化消息。在 main
方法中,我们创建了一个交换机并将其类型设置为 direct
,这意味着交换机将消息路由到与之绑定的队列中。我们还创建了两个队列并将其绑定到交换机上,并指定了不同的路由键。在 for
循环中,我们发送了 10 条消息到交换机中,并根据消息的奇偶性指定不同的路由键。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:RabbitMQ 最常用的三大模式实例解析 - Python技术站