RabbitMQ 的七种队列模式和应用场景

RabbitMQ 的七种队列模式和应用场景

RabbitMQ 是一个开源的消息队列系统,支持多种消息传递协议。在 RabbitMQ 中,队列是消息的载体,生产者将消息发送到队列中,消费者从队列中获取并进行处理。RabbitMQ 的队列模式决定了消息在队列中的存储方式和消费方式,不同的队列模式适用于不同的应用场景。本文将详细讲解 RabbitMQ 的七种队列模式和应用场景。

1. 简单队列模式

简单队列模式是 RabbitMQ 中最简单的队列模式,也是最常用的队列模式。在简单队列模式中,生产者将消息发送到队列中,消费者从队列中获取并进行处理。如果有多个消费者同时监听同一个队列,RabbitMQ 会将消息平均分配给每个消费者,以实现负载均衡。

应用场景:适用于单个生产者和单个消费者的场景,例如任务的异步处理、日志的记录等。

示例代码:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

在上述代码中,connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) 表示创建一个连接到 RabbitMQ 服务器的连接,channel.queue_declare(queue='hello') 表示声明一个名为 hello 的队列,def callback(ch, method, properties, body): {...} 表示接收到消息后的处理逻辑,channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True) 表示从队列 hello 中接收消息,并调用 callback 处理消息。

2. 工作队列模式

工作队列模式是 RabbitMQ 中最常用的队列模式之一。在工作队列模式中,生产者将消息发送到队列中,多个消费者从队列中获取并进行处理。在多个消费者的情况下,RabbitMQ 会将消息平均分配给每个消费者,以实现负载均衡。

应用场景:适用于多个消费者处理同一类型任务的场景,例如任务的异步处理、邮件的发送等。

示例代码:

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

在上述代码中,channel.queue_declare(queue='task_queue', durable=True) 表示声明一个名为 task_queue 的队列,并将队列设置为持久化队列,def callback(ch, method, properties, body): {...} 表示接收到消息后的处理逻辑,time.sleep(body.count(b'.')) 表示模拟任务处理的时间,ch.basic_ack(delivery_tag=method.delivery_tag) 表示消息处理完成后发送确认消息。

3. 发布/订阅模式

发布/订阅模式是 RabbitMQ 中最常用的队列模式之一。在发布/订阅模式中,生产者将消息发送到交换机中,交换机将消息广播给所有绑定到该交换机的队列中。

应用场景:适用于消息的广播场景,例如日志的记录、新闻的发布等。

示例代码:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs', queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

在上述代码中,channel.exchange_declare(exchange='logs', exchange_type='fanout') 表示声明一个名为 logs 的交换机,result = channel.queue_declare(queue='', exclusive=True) 表示声明一个随机的、独占的、自动的队列,channel.queue_bind(exchange='logs', queue=queue_name) 表示将队列绑定到交换机 logs 上,def callback(ch, method, properties, body): {...} 表示接收到消息后的处理逻辑,channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) 表示从队列 queue_name 中接收消息,并调用 callback 处理消息。

4. 路由模式

路由模式是 RabbitMQ 中常用的队列模式之一。在路由模式中,生产者将消息发送到交换机中,并指定消息的路由键,消费者只会接收到与绑定的队列中指定路由键的消息。

应用场景:适用于消息的选择性接收场景,例如不同级别的日志记录、不同类型的任务处理等。

示例代码:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

severities = ['info', 'warning', 'error']
for severity in severities:
    channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

在上述代码中,channel.exchange_declare(exchange='direct_logs', exchange_type='direct') 表示声明一个名为 direct_logs 的交换机,result = channel.queue_declare(queue='', exclusive=True) 表示声明一个随机的、独占的、自动的队列,channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) 表示将队列绑定到交换机 direct_logs 上,并指定路由键 severitydef callback(ch, method, properties, body): {...} 表示接收到消息后的处理逻辑,channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) 表示从队列 queue_name 中接收消息,并调用 callback 处理消息。

5. 主题模式

主题模式是 RabbitMQ 中常用的队列模式之一。在主题模式中,生产者将消息发送到交换机中,并指定消息的主题,消费者可以使用通配符匹配主题,以接收到符合条件的消息。

应用场景:适用于消息的多重选择性接收场景,例如不同类型的任务处理、不同地区的新闻发布等。

示例代码:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

binding_keys = ['*.critical', 'kern.*']
for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

在上述代码中,channel.exchange_declare(exchange='topic_logs', exchange_type='topic') 表示声明一个名为 topic_logs 的交换机,result = channel.queue_declare(queue='', exclusive=True) 表示声明一个随机的、独占的、自动的队列,channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key) 表示将队列绑定到交换机 topic_logs 上,并指定路由键 binding_keydef callback(ch, method, properties, body): {...} 表示接收到消息后的处理逻辑,channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) 表示从队列 queue_name 中接收消息,并调用 callback 处理消息。

6. 延迟队列模式

延迟队列模式是 RabbitMQ 中常用的队列模式之一。在延迟队列模式中,生产者将消息发送到队列中,并指定消息的过期时间,RabbitMQ 会在消息过期后将消息发送到指定的队列中。

应用场景:适用于消息的延迟处理场景,例如订单的超时取消、短信的定时发送等。

示例代码:

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='delayed_exchange', exchange_type='x-delayed-message', arguments={'x-delayed-type': 'direct'})

channel.queue_declare(queue='delayed_queue', durable=True, arguments={'x-dead-letter-exchange': 'normal_exchange'})
channel.queue_bind(queue='delayed_queue', exchange='delayed_exchange', routing_key='delayed_routing_key')

channel.queue_declare(queue='normal_queue', durable=True)
channel.queue_bind(queue='normal_queue', exchange='normal_exchange', routing_key='normal_routing_key')

message = 'Hello World!'
headers = {'x-delay': 5000}
channel.basic_publish(exchange='delayed_exchange', routing_key='delayed_routing_key', body=message, properties=pika.BasicProperties(headers=headers))

print(" [x] Sent %r" % message)
connection.close()

在上述代码中,channel.exchange_declare(exchange='delayed_exchange', exchange_type='x-delayed-message', arguments={'x-delayed-type': 'direct'}) 表示声明一个名为 delayed_exchange 的交换机,并将交换机设置为延迟交换机,channel.queue_declare(queue='delayed_queue', durable=True, arguments={'x-dead-letter-exchange': 'normal_exchange'}) 表示声明一个名为 delayed_queue 的队列,并将队列设置为持久化队列和延迟队列,channel.queue_bind(queue='delayed_queue', exchange='delayed_exchange', routing_key='delayed_routing_key') 表示将队列绑定到交换机 delayed_exchange 上,并指定路由键 delayed_routing_keychannel.queue_declare(queue='normal_queue', durable=True) 表示声明一个名为 normal_queue 的队列,并将队列设置为持久化队列,channel.queue_bind(queue='normal_queue', exchange='normal_exchange', routing_key='normal_routing_key') 表示将队列绑定到交换机 normal_exchange 上,并指定路由键 normal_routing_keychannel.basic_publish(exchange='delayed_exchange', routing_key='delayed_routing_key', body=message, properties=pika.BasicProperties(headers=headers)) 表示将消息发送到交换机 delayed_exchange 中,并指定消息的过期时间为 5000 毫秒。

7. RPC 队列模式

RPC 队列模式是 RabbitMQ 中常用的队列模式之一。在 RPC 队列模式中,客户端将请求消息发送到队列中,服务端从队列中获取请求消息并进行处理,然后将处理结果发送回客户端。

应用场景:适用于需要远程调用的场景,例如分布式系统的调用、远程过程调用等。

示例代码:

import pika
import uuid

class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        self.channel = self.connection.channel()

        result = self.channel.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(queue=self.callback_queue, on_message_callback=self.on_response, auto_ack=True)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(
            exchange='',
            routing_key='rpc_queue',
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,
                correlation_id=self.corr_id,
            ),
            body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)

fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)

connection.close()

在上述代码中,result = self.channel.queue_declare(queue='', exclusive=True) 表示声明一个随机的、独占的、自动的队列,self.channel.basic_consume(queue=self.callback_queue, on_message_callback=self.on_response, auto_ack=True) 表示从队列 self.callback_queue 中接收消息,并调用 self.on_response 处理消息,self.channel.basic_publish(...) 表示将消息发送到队列 rpc_queue 中,并指定回调队列和关联 ID。

总结

本文详细讲解了 RabbitMQ 的七种队列模式和应用场景,包括简单队列模式、工作队列模式、发布/订阅模式、路由模式、主题模式、延迟队列模式和 RPC 队列模式,并提供了示例代码。在使用 RabbitMQ 时,需要根据实际需求选择合适的队列模式,并注意消息的可靠性和正确性。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:RabbitMQ 的七种队列模式和应用场景 - Python技术站

(0)
上一篇 2023年5月15日
下一篇 2023年5月15日

相关文章

  • 盘点MQ中的异常测试

    以下是“盘点MQ中的异常测试”的完整攻略,包含两个示例。 简介 在使用消息队列(Message Queue,MQ)时,我们需要考虑各种异常情况,例如消息发送失败、消息丢失、消息重复等。本攻略将详细介绍如何在MQ中进行异常测试,并提供两个示例,演示如何处理MQ中的异常情况。 基础知识 在进行MQ异常测试之前,我们需要了解以下基础知识: 消息队列:消息队列是一种…

    RabbitMQ 2023年5月15日
    00
  • 如何用.NETCore操作RabbitMQ

    如何用.NET Core操作RabbitMQ RabbitMQ是一个功能强大的消息队列系统,可以用于构建高可用性、高性能的分布式应用程序。在本文中,我们将介绍如何使用.NET Core操作RabbitMQ,并提供两个示例说明。 环境准备 在开始之前,需要确保已安装了以下环境: .NET Core SDK 2.0或更高版本 RabbitMQ 步骤一:安装Rab…

    RabbitMQ 2023年5月15日
    00
  • RabbitMQ之什么是持久化?

    在RabbitMQ中,持久化是指将消息或队列存储在磁盘上,以确保即使RabbitMQ服务器崩溃,消息和队列也不会丢失。持久化可以应用于Exchange、Queue和消息。 以下是RabbitMQ中持久化的两个示例: 持久化队列 可以使用RabbitMQ的管理界面或命令行工具来创建持久化队列。以下是使用命令行工具创建持久化队列的示例: # 创建一个名为pers…

    云计算 2023年5月5日
    00
  • RabbitMQ如何创建Exchange?

    在RabbitMQ中,Exchange是消息路由器,它接收来自生产者的消息并将其路由到一个或多个队列中。Exchange根据路由键将消息路由到队列中。以下是RabbitMQ中创建Exchange的详细说明: Exchange类型 RabbitMQ支持四种类型的Exchange:direct、fanout、topic和headers。 direct:将消息路由…

    云计算 2023年5月5日
    00
  • RabbitMQ有哪些最佳实践?

    RabbitMQ是一个可靠的消息代理,它可以帮助我们构建分布式系统。以下是RabbitMQ的最佳实践: 使用持久化队列 持久化队列可以确保在RabbitMQ服务器崩溃或重启时,队列中的消息不会丢失。为了使用持久化队列,我们需要在创建队列时将其标记为持久化。示例代码如下: import pika connection = pika.BlockingConnec…

    云计算 2023年5月5日
    00
  • Golang中优秀的消息队列NSQ基础安装及使用详解

    以下是“Golang中优秀的消息队列NSQ基础安装及使用详解”的完整攻略,包含两个示例说明。 简介 NSQ是一款基于Go语言开发的分布式消息队列系统,具有高性能、高可用性、易于扩展等特点。在本攻略中,我们将介绍如何在Golang中安装和使用NSQ。 安装NSQ 1. 下载NSQ 首先,我们需要从NSQ的官方网站(https://nsq.io/)下载NSQ的二…

    RabbitMQ 2023年5月15日
    00
  • Springboot项目全局异常统一处理案例代码

    以下是“Spring Boot项目全局异常统一处理案例代码”的完整攻略,包含两个示例。 简介 在Spring Boot应用程序中,可以使用@ControllerAdvice和@ExceptionHandler注释来实现全局异常处理。这些注释允许开发人员定义一个或多个异常处理程序,以便在应用程序中捕获和处理异常。本攻略将介绍如何使用@ControllerAdv…

    RabbitMQ 2023年5月15日
    00
  • springboot2.0+elasticsearch5.5+rabbitmq搭建搜索服务的坑

    以下是“springboot2.0+elasticsearch5.5+rabbitmq搭建搜索服务的坑”的完整攻略,包含两个示例。 简介 Elasticsearch是一个流行的搜索引擎,可以用于实现全文搜索和分析。RabbitMQ是一种流行的消息队列中间件,可以用于实现异步消息处理和调度。本攻略介绍如何使用Spring Boot 2.0、Elasticsea…

    RabbitMQ 2023年5月15日
    00
合作推广
合作推广
分享本页
返回顶部