RabbitMQ 最常用的三大模式实例解析

RabbitMQ 最常用的三大模式实例解析

RabbitMQ 是一个开源的消息队列系统,支持多种消息递协议。在 RabbitMQ 中,有三种最常用的消息模式,分别是发布/订阅模式、工作队列模式和路由模式。本文将详细讲解这三种模式的实现方法,并提供两个示例说明。

发布/订阅模式

发布/订阅模式是一种常见的消息模型,也称为广播模式。在发布/订阅模式中,生产者将消息发送到一个交换机上,交换机将消息广播给所有与之绑定的队列。每个队列都有自己的消费者,消费者从队列中接收消息并进行处理。

示例一:使用 Java 实现发布/订阅模式

在本例中,我们将使用 Java 实现发布/订阅模式。具体步骤如下:

  1. 创建一个 RabbitMQ 的生产者并将消息设置为持久化消息。
  2. 创建一个队列并将其绑定到一个交换机上。
  3. 发送消息到交换机中。
public class RabbitMQPubSubExample {

    private static final String EXCHANGE_NAME = "myExchange";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
            for (int i = 0; i < 10; i++) {
                String message = "Message " + i;
                channel.basicPublish(EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
                System.out.println("Sent message: " + message);
            }
        }
    }
}

在上述代码中,我们创建了一个 RabbitMQ 的生产者并将消息设置为持久化消息。在 main 方法中,我们创建了一个交换机并将其类型设置为 fanout,这意味着交换机将消息广播给所有与之绑定的队列。在 for 循环中,我们发送了 10 条消息到交换机中。

示例二:使用 Java 实现发布/订阅模式与多个队列

在本例中,我们将使用 Java 实现发布/订阅模式与多个队列。具体步骤如下:

  1. 创建一个 RabbitMQ 的生产者并将消息设置为持久化消息。
  2. 创建两个队列并将其绑定到一个交换机上。
  3. 发送消息到交换机中。
public class RabbitMQPubSubMultipleQueuesExample {

    private static final String EXCHANGE_NAME = "myExchange";
    private static final String QUEUE_NAME_1 = "myQueue1";
    private static final String QUEUE_NAME_2 = "myQueue2";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
            channel.queueDeclare(QUEUE_NAME_1, true, false, false, null);
            channel.queueDeclare(QUEUE_NAME_2, true, false, false, null);
            channel.queueBind(QUEUE_NAME_1, EXCHANGE_NAME, "");
            channel.queueBind(QUEUE_NAME_2, EXCHANGE_NAME, "");
            for (int i = 0; i < 10; i++) {
                String message = "Message " + i;
                channel.basicPublish(EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
                System.out.println("Sent message: " + message);
            }
        }
    }
}

在上述代码中,我们创建了一个 RabbitMQ 的生产者并将消息设置为持久化消息。在 main 方法中,我们创建了一个交换机并将其类型设置为 fanout,这意味着交换机将消息广播给所有与之绑定的队列。我们还创建了两个队列并将其绑定到交换机上。在 for 循环中,我们发送了 10 条消息到交换机中。

工作队列模式

工作队列模式是一种常见的消息模型,也称为任务队列。在工作队列模式中,多个消费者共同消费同一个队列中的消息。当消息被发送到队列中时,它将被分配给其中一个消费者进行处理。每个消息只能被一个消费者处理,但是一个消费者可以处理多个消息。

示例一:使用 Java 实现工作队列模式

在本例中,我们将使用 Java 实现工作队列模式。具体步骤如下:

  1. 创建一个 RabbitMQ 的生产者并将消息设置为持久化消息。
  2. 创建一个 RabbitMQ 的消费者并确认消息已被接收。
  3. 创建一个队列并将其绑定到一个交换机上。
  4. 发送消息到队列中。
public class RabbitMQWorkerQueueExample {

    private static final String QUEUE_NAME = "myQueue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            channel.basicQos(1);
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("Received message: " + message);
                    try {
                        doWork(message);
                    } finally {
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
            channel.basicConsume(QUEUE_NAME, false, consumer);
            for (int i = 0; i < 10; i++) {
                String message = "Message " + i;
                channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
                System.out.println("Sent message: " + message);
            }
        }
    }

    private static void doWork(String message) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

在上述代码中,我们创建了一个 RabbitMQ 的生产者并将消息设置为持久化消息,然后创建了一个 RabbitMQ 的消费者并确认消息已被接收。在 main 方法中,我们创建了一个队列并将其绑定到一个交换机上。在 handleDelivery 方法中,我们处理消息并确认消息已被费。在 main 方法中,我们发送了 10 条消息到队列中。

示例二:使用 Java 实现工作队列模式与消息应答

在本例中,我们将使用 Java 实现工作队列模式与消息应答。具体步骤如下:

  1. 创建一个 RabbitMQ 的生产者并将消息设置为持久化消息。
  2. 创建一个 RabbitMQ 的消费者并确认消息已被接收。
  3. 创建一个队列并将其绑定到一个交换机上。
  4. 发送消息到队列中。
  5. 确认消息已被消费。
public class RabbitMQWorkerQueueAckExample {

    private static final String QUEUE_NAME = "myQueue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            channel.basicQos(1);
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("Received message: " + message);
                    try {
                        doWork(message);
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    } catch (Exception e) {
                        channel.basicNack(envelope.getDeliveryTag(), false, true);
                    }
                }
            };
            channel.basicConsume(QUEUE_NAME, false, consumer);
            for (int i = 0; i < 10; i++) {
                String message = "Message " + i;
                channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
                System.out.println("Sent message: " + message);
            }
        }
    }

    private static void doWork(String message) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

在上述代码中,我们创建了一个 RabbitMQ 的生产者并将消息设置为持久化消息,然后创建了一个 RabbitMQ 的消费者并确认消息已被接收。在 main 方法中,我们创建了一个队列并将其绑定到一个交换机上。在 handleDelivery 方法中,我们处理消息并确认消息已被消费。在 main 方法中我们发送了 10 条消息到队列中,并在 doWork 方法中模拟了一个耗时任务。如果任务处理失败,我们将使用 channel.basicNack 方法将消息重新放回队列中。

路由模式

路由模式是一种常见的消息模型,也称为直连模式。在路由模式中,生产者将消息发送到一个交换机上,并指定一个路由键。交换机将消息路由到与之绑定的队列中,路由键与队列的绑定关系是通过路由键和队列名的匹配来实现的。

示例一:使用 Java 实现路由模式

在本例中,我们将使用 Java 实现路由模式。具体步骤如下:

  1. 创建一个 RabbitMQ 的生产者并将消息设置为持久化消息。
  2. 创建一个 RabbitMQ 的消费者并确认消息已被接收。
  3. 创建一个队列并将其绑定到一个交换机上,并指定一个路由键。
  4. 发送消息到交换机中,并指定一个路由键。
public class RabbitMQRoutingExample {

    private static final String EXCHANGE_NAME = "myExchange";
    private static final String QUEUE_NAME = "myQueue";
    private static final String ROUTING_KEY = "myRoutingKey";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
            for (int i = 0; i < 10; i++) {
                String message = "Message " + i;
                channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
                System.out.println("Sent message: " + message);
            }
        }
    }
}

在上述代码中,我们创建了一个 RabbitMQ 的生产者并将消息设置为持久化消息。在 main 方法中,我们创建了一个交换机并将其类型设置为 direct,这意味着交换机将消息路由到与之绑定的队列中。我们还创建了一个队列并将其绑定到交换机上,并指定了一个路由键。在 for 循环中,我们发送了 10 条消息到交换机中,并指定了一个路由键。

示例二:使用 Java 实现路由模式与多个队列

在本例中,我们将使用 Java 实现路由模式与多个队列。具体步骤如下:

  1. 创建一个 RabbitMQ 的生产者并将消息设置为持久化消息。
  2. 创建两个队列并将其绑定到一个交换机上,并指定不同的路由键。
  3. 发送消息到交换机中,并指定不同的路由键。
public class RabbitMQRoutingMultipleQueuesExample {

    private static final String EXCHANGE_NAME = "myExchange";
    private static final String QUEUE_NAME_1 = "myQueue1";
    private static final String QUEUE_NAME_2 = "myQueue2";
    private static final String ROUTING_KEY_1 = "myRoutingKey1";
    private static final String ROUTING_KEY_2 = "myRoutingKey2";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            channel.queueDeclare(QUEUE_NAME_1, true, false, false, null);
            channel.queueDeclare(QUEUE_NAME_2, true, false, false, null);
            channel.queueBind(QUEUE_NAME_1, EXCHANGE_NAME, ROUTING_KEY_1);
            channel.queueBind(QUEUE_NAME_2, EXCHANGE_NAME, ROUTING_KEY_2);
            for (int i = 0; i < 10; i++) {
                String message = "Message " + i;
                if (i % 2 == 0) {
                    channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY_1, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
                } else {
                    channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY_2, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
                }
                System.out.println("Sent message: " + message);
            }
        }
    }
}

在上述代码中,我们创建了一个 RabbitMQ 的生产者并将消息设置为持久化消息。在 main 方法中,我们创建了一个交换机并将其类型设置为 direct,这意味着交换机将消息路由到与之绑定的队列中。我们还创建了两个队列并将其绑定到交换机上,并指定了不同的路由键。在 for 循环中,我们发送了 10 条消息到交换机中,并根据消息的奇偶性指定不同的路由键。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:RabbitMQ 最常用的三大模式实例解析 - Python技术站

(0)
上一篇 2023年5月15日
下一篇 2023年5月15日

相关文章

  • RabbitMQ如何创建Exchange?

    在RabbitMQ中,Exchange是消息路由器,它接收来自生产者的消息并将其路由到一个或多个队列中。Exchange根据路由键将消息路由到队列中。以下是RabbitMQ中创建Exchange的详细说明: Exchange类型 RabbitMQ支持四种类型的Exchange:direct、fanout、topic和headers。 direct:将消息路由…

    云计算 2023年5月5日
    00
  • Python实现RabbitMQ6种消息模型的示例代码

    RabbitMQ是一个开源的消息代理软件,它实现了高级消息队列协议(AMQP)并支持多种消息模型。在本文中,我们将详细讲解如何使用Python实现RabbitMQ的6种消息模型。我们将提供两个示例,分别是发布/订阅模型和RPC模型。 RabbitMQ基本概念 在使用RabbitMQ前,需要了解一些基本概念: 生产者(Producer):发送消息的应用程序。 …

    RabbitMQ 2023年5月15日
    00
  • 关于利用RabbitMQ实现延迟任务的方法详解

    关于利用RabbitMQ实现延迟任务的方法详解 RabbitMQ是一个功能强大的消息队列系统,可以用于构建高可用性、高性能的分布式应用程序。在本文中,我们将介绍如何使用RabbitMQ实现延迟任务,并提供两个示例说明。 环境准备 在开始之前,需要确保已安装了以下环境: RabbitMQ Python 3.x pika库 示例一:使用RabbitMQ实现延迟任…

    RabbitMQ 2023年5月15日
    00
  • SpringBoot整合RabbitMQ消息队列的完整步骤

    SpringBoot整合RabbitMQ消息队列的完整步骤 RabbitMQ是一个功能强大的消息队列系统,可以用于构建高可用性、高性能的分布式应用程序。在本文中,我们将介绍如何使用SpringBoot整合RabbitMQ消息队列,并提供两个示例说明。 环境准备 在开始之前,需要确保已安装了以下环境: JDK 1.8或更高版本 Maven RabbitMQ 步…

    RabbitMQ 2023年5月15日
    00
  • SpringBoot使用RabbitMQ延时队列(小白必备)

    SpringBoot使用RabbitMQ延时队列(小白必备) 在本文中,我们将详细讲解如何在SpringBoot中使用RabbitMQ延时队列。我们将提供两个示例说明,以帮助您更好地理解如何使用延时队列。 准备工作 在开始之前,需要确保已安装了以下环境: Java RabbitMQ SpringBoot 示例一:使用插件实现延时队列 在本例中,我们将使用Ra…

    RabbitMQ 2023年5月15日
    00
  • python实现跨进程(跨py文件)通信示例

    以下是“Python实现跨进程(跨py文件)通信示例”的完整攻略,包含两个示例。 简介 在本攻略中,我们将介绍如何使用Python实现跨进程(跨py文件)通信。通过攻略的学习,您将了解如何使用socket和multiprocessing等模块实现跨进程通信。 示例一:使用socket实现跨进程通信 以下是使用socket实现跨进程通信的示例: # serve…

    RabbitMQ 2023年5月15日
    00
  • docker-compose安装RabbitMQ及插件操作步骤

    Docker Compose安装RabbitMQ及插件操作步骤 RabbitMQ是一个功能强大的消息队列系统,可以用于构建高可用性、高性能的分布式应用程序。在本文中,我们将介绍如何使用Docker Compose安装RabbitMQ及插件操作步骤,并提供两个示例说明。 环境准备 在开始之前,需要确保已安装了以下环境: Docker Docker Compos…

    RabbitMQ 2023年5月15日
    00
  • springBoot整合rabbitMQ的方法详解

    Spring Boot整合RabbitMQ实例详解(Fanout模式) 在本文中,我们将详细讲解如何使用Spring Boot整合RabbitMQ,并使用Fanout模式进行消息传递。本文将提供两个示例说明。 环境准备 在开始本文之前,需要确保已经安装了以下软件: JDK 1.8或更高版本 RabbitMQ服务器 创建Spring Boot项目 首先,我们需…

    RabbitMQ 2023年5月15日
    00
合作推广
合作推广
分享本页
返回顶部