RabbitMQ 最常用的三大模式实例解析

RabbitMQ 最常用的三大模式实例解析

RabbitMQ 是一个开源的消息队列系统,支持多种消息递协议。在 RabbitMQ 中,有三种最常用的消息模式,分别是发布/订阅模式、工作队列模式和路由模式。本文将详细讲解这三种模式的实现方法,并提供两个示例说明。

发布/订阅模式

发布/订阅模式是一种常见的消息模型,也称为广播模式。在发布/订阅模式中,生产者将消息发送到一个交换机上,交换机将消息广播给所有与之绑定的队列。每个队列都有自己的消费者,消费者从队列中接收消息并进行处理。

示例一:使用 Java 实现发布/订阅模式

在本例中,我们将使用 Java 实现发布/订阅模式。具体步骤如下:

  1. 创建一个 RabbitMQ 的生产者并将消息设置为持久化消息。
  2. 创建一个队列并将其绑定到一个交换机上。
  3. 发送消息到交换机中。
public class RabbitMQPubSubExample {

    private static final String EXCHANGE_NAME = "myExchange";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
            for (int i = 0; i < 10; i++) {
                String message = "Message " + i;
                channel.basicPublish(EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
                System.out.println("Sent message: " + message);
            }
        }
    }
}

在上述代码中,我们创建了一个 RabbitMQ 的生产者并将消息设置为持久化消息。在 main 方法中,我们创建了一个交换机并将其类型设置为 fanout,这意味着交换机将消息广播给所有与之绑定的队列。在 for 循环中,我们发送了 10 条消息到交换机中。

示例二:使用 Java 实现发布/订阅模式与多个队列

在本例中,我们将使用 Java 实现发布/订阅模式与多个队列。具体步骤如下:

  1. 创建一个 RabbitMQ 的生产者并将消息设置为持久化消息。
  2. 创建两个队列并将其绑定到一个交换机上。
  3. 发送消息到交换机中。
public class RabbitMQPubSubMultipleQueuesExample {

    private static final String EXCHANGE_NAME = "myExchange";
    private static final String QUEUE_NAME_1 = "myQueue1";
    private static final String QUEUE_NAME_2 = "myQueue2";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
            channel.queueDeclare(QUEUE_NAME_1, true, false, false, null);
            channel.queueDeclare(QUEUE_NAME_2, true, false, false, null);
            channel.queueBind(QUEUE_NAME_1, EXCHANGE_NAME, "");
            channel.queueBind(QUEUE_NAME_2, EXCHANGE_NAME, "");
            for (int i = 0; i < 10; i++) {
                String message = "Message " + i;
                channel.basicPublish(EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
                System.out.println("Sent message: " + message);
            }
        }
    }
}

在上述代码中,我们创建了一个 RabbitMQ 的生产者并将消息设置为持久化消息。在 main 方法中,我们创建了一个交换机并将其类型设置为 fanout,这意味着交换机将消息广播给所有与之绑定的队列。我们还创建了两个队列并将其绑定到交换机上。在 for 循环中,我们发送了 10 条消息到交换机中。

工作队列模式

工作队列模式是一种常见的消息模型,也称为任务队列。在工作队列模式中,多个消费者共同消费同一个队列中的消息。当消息被发送到队列中时,它将被分配给其中一个消费者进行处理。每个消息只能被一个消费者处理,但是一个消费者可以处理多个消息。

示例一:使用 Java 实现工作队列模式

在本例中,我们将使用 Java 实现工作队列模式。具体步骤如下:

  1. 创建一个 RabbitMQ 的生产者并将消息设置为持久化消息。
  2. 创建一个 RabbitMQ 的消费者并确认消息已被接收。
  3. 创建一个队列并将其绑定到一个交换机上。
  4. 发送消息到队列中。
public class RabbitMQWorkerQueueExample {

    private static final String QUEUE_NAME = "myQueue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            channel.basicQos(1);
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("Received message: " + message);
                    try {
                        doWork(message);
                    } finally {
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
            channel.basicConsume(QUEUE_NAME, false, consumer);
            for (int i = 0; i < 10; i++) {
                String message = "Message " + i;
                channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
                System.out.println("Sent message: " + message);
            }
        }
    }

    private static void doWork(String message) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

在上述代码中,我们创建了一个 RabbitMQ 的生产者并将消息设置为持久化消息,然后创建了一个 RabbitMQ 的消费者并确认消息已被接收。在 main 方法中,我们创建了一个队列并将其绑定到一个交换机上。在 handleDelivery 方法中,我们处理消息并确认消息已被费。在 main 方法中,我们发送了 10 条消息到队列中。

示例二:使用 Java 实现工作队列模式与消息应答

在本例中,我们将使用 Java 实现工作队列模式与消息应答。具体步骤如下:

  1. 创建一个 RabbitMQ 的生产者并将消息设置为持久化消息。
  2. 创建一个 RabbitMQ 的消费者并确认消息已被接收。
  3. 创建一个队列并将其绑定到一个交换机上。
  4. 发送消息到队列中。
  5. 确认消息已被消费。
public class RabbitMQWorkerQueueAckExample {

    private static final String QUEUE_NAME = "myQueue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            channel.basicQos(1);
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("Received message: " + message);
                    try {
                        doWork(message);
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    } catch (Exception e) {
                        channel.basicNack(envelope.getDeliveryTag(), false, true);
                    }
                }
            };
            channel.basicConsume(QUEUE_NAME, false, consumer);
            for (int i = 0; i < 10; i++) {
                String message = "Message " + i;
                channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
                System.out.println("Sent message: " + message);
            }
        }
    }

    private static void doWork(String message) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

在上述代码中,我们创建了一个 RabbitMQ 的生产者并将消息设置为持久化消息,然后创建了一个 RabbitMQ 的消费者并确认消息已被接收。在 main 方法中,我们创建了一个队列并将其绑定到一个交换机上。在 handleDelivery 方法中,我们处理消息并确认消息已被消费。在 main 方法中我们发送了 10 条消息到队列中,并在 doWork 方法中模拟了一个耗时任务。如果任务处理失败,我们将使用 channel.basicNack 方法将消息重新放回队列中。

路由模式

路由模式是一种常见的消息模型,也称为直连模式。在路由模式中,生产者将消息发送到一个交换机上,并指定一个路由键。交换机将消息路由到与之绑定的队列中,路由键与队列的绑定关系是通过路由键和队列名的匹配来实现的。

示例一:使用 Java 实现路由模式

在本例中,我们将使用 Java 实现路由模式。具体步骤如下:

  1. 创建一个 RabbitMQ 的生产者并将消息设置为持久化消息。
  2. 创建一个 RabbitMQ 的消费者并确认消息已被接收。
  3. 创建一个队列并将其绑定到一个交换机上,并指定一个路由键。
  4. 发送消息到交换机中,并指定一个路由键。
public class RabbitMQRoutingExample {

    private static final String EXCHANGE_NAME = "myExchange";
    private static final String QUEUE_NAME = "myQueue";
    private static final String ROUTING_KEY = "myRoutingKey";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
            for (int i = 0; i < 10; i++) {
                String message = "Message " + i;
                channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
                System.out.println("Sent message: " + message);
            }
        }
    }
}

在上述代码中,我们创建了一个 RabbitMQ 的生产者并将消息设置为持久化消息。在 main 方法中,我们创建了一个交换机并将其类型设置为 direct,这意味着交换机将消息路由到与之绑定的队列中。我们还创建了一个队列并将其绑定到交换机上,并指定了一个路由键。在 for 循环中,我们发送了 10 条消息到交换机中,并指定了一个路由键。

示例二:使用 Java 实现路由模式与多个队列

在本例中,我们将使用 Java 实现路由模式与多个队列。具体步骤如下:

  1. 创建一个 RabbitMQ 的生产者并将消息设置为持久化消息。
  2. 创建两个队列并将其绑定到一个交换机上,并指定不同的路由键。
  3. 发送消息到交换机中,并指定不同的路由键。
public class RabbitMQRoutingMultipleQueuesExample {

    private static final String EXCHANGE_NAME = "myExchange";
    private static final String QUEUE_NAME_1 = "myQueue1";
    private static final String QUEUE_NAME_2 = "myQueue2";
    private static final String ROUTING_KEY_1 = "myRoutingKey1";
    private static final String ROUTING_KEY_2 = "myRoutingKey2";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            channel.queueDeclare(QUEUE_NAME_1, true, false, false, null);
            channel.queueDeclare(QUEUE_NAME_2, true, false, false, null);
            channel.queueBind(QUEUE_NAME_1, EXCHANGE_NAME, ROUTING_KEY_1);
            channel.queueBind(QUEUE_NAME_2, EXCHANGE_NAME, ROUTING_KEY_2);
            for (int i = 0; i < 10; i++) {
                String message = "Message " + i;
                if (i % 2 == 0) {
                    channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY_1, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
                } else {
                    channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY_2, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
                }
                System.out.println("Sent message: " + message);
            }
        }
    }
}

在上述代码中,我们创建了一个 RabbitMQ 的生产者并将消息设置为持久化消息。在 main 方法中,我们创建了一个交换机并将其类型设置为 direct,这意味着交换机将消息路由到与之绑定的队列中。我们还创建了两个队列并将其绑定到交换机上,并指定了不同的路由键。在 for 循环中,我们发送了 10 条消息到交换机中,并根据消息的奇偶性指定不同的路由键。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:RabbitMQ 最常用的三大模式实例解析 - Python技术站

(0)
上一篇 2023年5月15日
下一篇 2023年5月15日

相关文章

  • Docker学习之搭建ActiveMQ消息服务的方法步骤

    以下是“Docker学习之搭建ActiveMQ消息服务的方法步骤”的完整攻略,包含两个示例说明。 简介 ActiveMQ是一个流行的开源消息中间件,可以用于构建高性能、可靠的分布式系统。本攻略将介绍如何使用Docker搭建ActiveMQ消息服务,并提供相应示例说明。 步骤1:安装Docker 在使用Docker搭建ActiveMQ消息服务之前,需要先安装D…

    RabbitMQ 2023年5月15日
    00
  • 单元测试代码覆盖率解析

    以下是“单元测试代码覆盖率解析”的完整攻略,包含两个示例。 简介 在本攻略中,我们将介绍单元测试代码覆盖率的概念和解析方法。通过攻略的学习,您将了解如何计算代码覆盖率、如何分析代码覆盖率报告以及如何提高代码覆盖率。 示例一:计算代码覆盖率 以下是计算代码覆盖率的示例: 编写单元测试 在计算代码覆盖率之前,我们需要编写单元测试。以下是一个简单的单元测试示例: …

    RabbitMQ 2023年5月15日
    00
  • python代码 FTP备份交换机配置脚本实例解析

    以下是“python代码 FTP备份交换机配置脚本实例解析”的完整攻略,包含两个示例说明。 简介 在网络设备管理中,备份交换机配置是一项非常重要的任务。本教程将介绍如何使用Python编写一个FTP备份交换机配置脚本,并提供相应的示例说明。 步骤1:安装ftplib库 在Python中,可以使用ftplib库来连接FTP服务器。在终端中执行以下命令,安装ft…

    RabbitMQ 2023年5月15日
    00
  • Windows下RabbitMQ安装及配置详解

    Windows下RabbitMQ安装及配置详解 RabbitMQ 是一个开源的消息队列系统,支持多种消息传递协议。在 Windows 系统中,可以使用以下步骤安装和配置 RabbitMQ。 步骤一:下载安装 RabbitMQ 在 RabbitMQ 官网下载页面(https://www.rabbitmq.com/download.html)下载适合 Windo…

    RabbitMQ 2023年5月15日
    00
  • RabbitMQ之什么是WebSocket协议?

    WebSocket是一种在单个TCP连接上进行全双工通信的协议。它可以帮助我们在Web浏览器和服务器之间进行实时通信。RabbitMQ支持WebSocket协议,可以帮助我们在Web浏览器和RabbitMQ之间进行实时通信。以下是关于RabbitMQ的WebSocket协议的完整攻略: WebSocket协议的特点 WebSocket协议具有以下特点: 实时…

    云计算 2023年5月5日
    00
  • SpringBoot集成ActiveMQ的实战全过程

    以下是“SpringBoot集成ActiveMQ的实战全过程”的完整攻略,包含两个示例。 简介 ActiveMQ是Apache基金会的一个开源消息中间件,支持多种协议和编程语言。本攻略将详细介绍如何在SpringBoot中集成ActiveMQ,并提供两个示例,演示如何使用ActiveMQ进行消息发送和接收。 基础知识 在进行SpringBoot集成Activ…

    RabbitMQ 2023年5月15日
    00
  • Java搭建RabbitMq消息中间件过程详解

    以下是Java搭建RabbitMQ消息中间件过程详解的完整攻略,包含两个示例说明。 示例1:简单队列模式 步骤1:安装RabbitMQ 首先,您需要安装RabbitMQ。您可以从RabbitMQ官网下载适合您操作系统的安装包进行安装。 步骤2:添加依赖 在pom.xml文件中添加以下依赖: <dependency> <groupId>…

    RabbitMQ 2023年5月15日
    00
  • RabbitMQ如何保证消息的顺序性?

    RabbitMQ是一个可靠的消息代理,它提供了多种机制来保证消息的顺序性。以下是RabbitMQ保证消息顺序性的完整攻略: 消息顺序性机制 RabbitMQ提供多种机制来保证消息的顺序性,包括: 单一消费者模式 消息分组机制 这些机制可以帮助我们保证消息的顺序性,确保消息能够按照发送的顺序被正确地处理。 示例说明 以下是使用单一消费者模式和消息分组机制保证消…

    云计算 2023年5月5日
    00
合作推广
合作推广
分享本页
返回顶部