Java RabbitMQ的工作队列与消息应答详解
RabbitMQ 是一个开源的消息队列系统,支持多种消息递协议。在 RabbitMQ 中,工作队列是一种常见的消息模型,用于处理大量的耗时任务。本文将详细讲解 Java RabbitMQ 的工作队列与消息应答的完整攻略,并提供两个示例说明。
工作队列
工作队列是一种常见的消息模型,也称为任务队列。在工作队列中,多个消费者共同消费同一个队列中的消息。当消息被发送到队列中时,它将被分配给其中一个消费者进行处理。每个消息只能被一个消费者处理,但是一个消费者可以处理多个消息。
示例一:使用 Java 实现工作队列
在本例中,我们将使用 Java 实现工作队列。具体步骤如下:
- 创建一个 RabbitMQ 的生产者并将消息设置为持久化消息。
- 创建一个 RabbitMQ 的消费者并确认消息已被接收。
- 创建一个队列并将其绑定到一个交换机上。
- 发送消息到队列中。
public class RabbitMQWorkerQueueExample {
private static final String QUEUE_NAME = "myQueue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received message: " + message);
try {
doWork(message);
} finally {
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);
for (int i = 0; i < 10; i++) {
String message = "Message " + i;
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
System.out.println("Sent message: " + message);
}
}
}
private static void doWork(String message) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
在上述代码中,我们创建了一个 RabbitMQ 的生产者并将消息设置为持久化消息,然后创建了一个 RabbitMQ 的消费者并确认消息已被接收。在 main
方法中,我们创建了一个队列并将其绑定到一个交换机上。在 handleDelivery
方法中,我们处理消息并确认消息已被消费。在 main
方法中,我们发送了 10 条消息到队列中。
示例二:使用 Java 实现工作队列与消息应答
在本例中,我们将使用 Java 实现工作队列与消息应答。具体步骤如下:
- 创建一个 RabbitMQ 的生产者并将消息设置为持久化消息。
- 创建一个 RabbitMQ 的消费者并确认消息已被接收。
- 创建一个队列并将其绑定到一个交换机上。
- 发送消息到队列中。
- 确认消息已被消费。
public class RabbitMQWorkerQueueAckExample {
private static final String QUEUE_NAME = "myQueue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received message: " + message);
try {
doWork(message);
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(envelope.getDeliveryTag(), false, true);
}
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);
for (int i = 0; i < 10; i++) {
String message = "Message " + i;
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
System.out.println("Sent message: " + message);
}
}
}
private static void doWork(String message) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
在上述代码中,我们创建了一个 RabbitMQ 的生产者并将消息设置为持久化消息,然后创建了一个 RabbitMQ 的消费者并确认消息已被接收。在 main
方法中,我们创建了一个队列并将其绑定到一个交换机上。在 handleDelivery
方法中,我们处理消息并确认消息已被消费。在 main
方法中,我们发送了 10 条消息到队列中,并在 doWork
方法中模拟了一个耗时任务。如果任务处理失败,我们将使用 channel.basicNack
方法将消息重新放回队列中。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Java RabbitMQ的工作队列与消息应答详解 - Python技术站