RabbitMQ 的七种队列模式和应用场景
RabbitMQ 是一个开源的消息队列系统,支持多种消息传递协议。在 RabbitMQ 中,队列是消息的载体,生产者将消息发送到队列中,消费者从队列中获取并进行处理。RabbitMQ 的队列模式决定了消息在队列中的存储方式和消费方式,不同的队列模式适用于不同的应用场景。本文将详细讲解 RabbitMQ 的七种队列模式和应用场景。
1. 简单队列模式
简单队列模式是 RabbitMQ 中最简单的队列模式,也是最常用的队列模式。在简单队列模式中,生产者将消息发送到队列中,消费者从队列中获取并进行处理。如果有多个消费者同时监听同一个队列,RabbitMQ 会将消息平均分配给每个消费者,以实现负载均衡。
应用场景:适用于单个生产者和单个消费者的场景,例如任务的异步处理、日志的记录等。
示例代码:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
在上述代码中,connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
表示创建一个连接到 RabbitMQ 服务器的连接,channel.queue_declare(queue='hello')
表示声明一个名为 hello
的队列,def callback(ch, method, properties, body): {...}
表示接收到消息后的处理逻辑,channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
表示从队列 hello
中接收消息,并调用 callback
处理消息。
2. 工作队列模式
工作队列模式是 RabbitMQ 中最常用的队列模式之一。在工作队列模式中,生产者将消息发送到队列中,多个消费者从队列中获取并进行处理。在多个消费者的情况下,RabbitMQ 会将消息平均分配给每个消费者,以实现负载均衡。
应用场景:适用于多个消费者处理同一类型任务的场景,例如任务的异步处理、邮件的发送等。
示例代码:
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
在上述代码中,channel.queue_declare(queue='task_queue', durable=True)
表示声明一个名为 task_queue
的队列,并将队列设置为持久化队列,def callback(ch, method, properties, body): {...}
表示接收到消息后的处理逻辑,time.sleep(body.count(b'.'))
表示模拟任务处理的时间,ch.basic_ack(delivery_tag=method.delivery_tag)
表示消息处理完成后发送确认消息。
3. 发布/订阅模式
发布/订阅模式是 RabbitMQ 中最常用的队列模式之一。在发布/订阅模式中,生产者将消息发送到交换机中,交换机将消息广播给所有绑定到该交换机的队列中。
应用场景:适用于消息的广播场景,例如日志的记录、新闻的发布等。
示例代码:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
在上述代码中,channel.exchange_declare(exchange='logs', exchange_type='fanout')
表示声明一个名为 logs
的交换机,result = channel.queue_declare(queue='', exclusive=True)
表示声明一个随机的、独占的、自动的队列,channel.queue_bind(exchange='logs', queue=queue_name)
表示将队列绑定到交换机 logs
上,def callback(ch, method, properties, body): {...}
表示接收到消息后的处理逻辑,channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
表示从队列 queue_name
中接收消息,并调用 callback
处理消息。
4. 路由模式
路由模式是 RabbitMQ 中常用的队列模式之一。在路由模式中,生产者将消息发送到交换机中,并指定消息的路由键,消费者只会接收到与绑定的队列中指定路由键的消息。
应用场景:适用于消息的选择性接收场景,例如不同级别的日志记录、不同类型的任务处理等。
示例代码:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
severities = ['info', 'warning', 'error']
for severity in severities:
channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
在上述代码中,channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
表示声明一个名为 direct_logs
的交换机,result = channel.queue_declare(queue='', exclusive=True)
表示声明一个随机的、独占的、自动的队列,channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)
表示将队列绑定到交换机 direct_logs
上,并指定路由键 severity
,def callback(ch, method, properties, body): {...}
表示接收到消息后的处理逻辑,channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
表示从队列 queue_name
中接收消息,并调用 callback
处理消息。
5. 主题模式
主题模式是 RabbitMQ 中常用的队列模式之一。在主题模式中,生产者将消息发送到交换机中,并指定消息的主题,消费者可以使用通配符匹配主题,以接收到符合条件的消息。
应用场景:适用于消息的多重选择性接收场景,例如不同类型的任务处理、不同地区的新闻发布等。
示例代码:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
binding_keys = ['*.critical', 'kern.*']
for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
在上述代码中,channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
表示声明一个名为 topic_logs
的交换机,result = channel.queue_declare(queue='', exclusive=True)
表示声明一个随机的、独占的、自动的队列,channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key)
表示将队列绑定到交换机 topic_logs
上,并指定路由键 binding_key
,def callback(ch, method, properties, body): {...}
表示接收到消息后的处理逻辑,channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
表示从队列 queue_name
中接收消息,并调用 callback
处理消息。
6. 延迟队列模式
延迟队列模式是 RabbitMQ 中常用的队列模式之一。在延迟队列模式中,生产者将消息发送到队列中,并指定消息的过期时间,RabbitMQ 会在消息过期后将消息发送到指定的队列中。
应用场景:适用于消息的延迟处理场景,例如订单的超时取消、短信的定时发送等。
示例代码:
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='delayed_exchange', exchange_type='x-delayed-message', arguments={'x-delayed-type': 'direct'})
channel.queue_declare(queue='delayed_queue', durable=True, arguments={'x-dead-letter-exchange': 'normal_exchange'})
channel.queue_bind(queue='delayed_queue', exchange='delayed_exchange', routing_key='delayed_routing_key')
channel.queue_declare(queue='normal_queue', durable=True)
channel.queue_bind(queue='normal_queue', exchange='normal_exchange', routing_key='normal_routing_key')
message = 'Hello World!'
headers = {'x-delay': 5000}
channel.basic_publish(exchange='delayed_exchange', routing_key='delayed_routing_key', body=message, properties=pika.BasicProperties(headers=headers))
print(" [x] Sent %r" % message)
connection.close()
在上述代码中,channel.exchange_declare(exchange='delayed_exchange', exchange_type='x-delayed-message', arguments={'x-delayed-type': 'direct'})
表示声明一个名为 delayed_exchange
的交换机,并将交换机设置为延迟交换机,channel.queue_declare(queue='delayed_queue', durable=True, arguments={'x-dead-letter-exchange': 'normal_exchange'})
表示声明一个名为 delayed_queue
的队列,并将队列设置为持久化队列和延迟队列,channel.queue_bind(queue='delayed_queue', exchange='delayed_exchange', routing_key='delayed_routing_key')
表示将队列绑定到交换机 delayed_exchange
上,并指定路由键 delayed_routing_key
,channel.queue_declare(queue='normal_queue', durable=True)
表示声明一个名为 normal_queue
的队列,并将队列设置为持久化队列,channel.queue_bind(queue='normal_queue', exchange='normal_exchange', routing_key='normal_routing_key')
表示将队列绑定到交换机 normal_exchange
上,并指定路由键 normal_routing_key
,channel.basic_publish(exchange='delayed_exchange', routing_key='delayed_routing_key', body=message, properties=pika.BasicProperties(headers=headers))
表示将消息发送到交换机 delayed_exchange
中,并指定消息的过期时间为 5000 毫秒。
7. RPC 队列模式
RPC 队列模式是 RabbitMQ 中常用的队列模式之一。在 RPC 队列模式中,客户端将请求消息发送到队列中,服务端从队列中获取请求消息并进行处理,然后将处理结果发送回客户端。
应用场景:适用于需要远程调用的场景,例如分布式系统的调用、远程过程调用等。
示例代码:
import pika
import uuid
class FibonacciRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
self.channel = self.connection.channel()
result = self.channel.queue_declare(queue='', exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(queue=self.callback_queue, on_message_callback=self.on_response, auto_ack=True)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(
exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=str(n))
while self.response is None:
self.connection.process_data_events()
return int(self.response)
fibonacci_rpc = FibonacciRpcClient()
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)
connection.close()
在上述代码中,result = self.channel.queue_declare(queue='', exclusive=True)
表示声明一个随机的、独占的、自动的队列,self.channel.basic_consume(queue=self.callback_queue, on_message_callback=self.on_response, auto_ack=True)
表示从队列 self.callback_queue
中接收消息,并调用 self.on_response
处理消息,self.channel.basic_publish(...)
表示将消息发送到队列 rpc_queue
中,并指定回调队列和关联 ID。
总结
本文详细讲解了 RabbitMQ 的七种队列模式和应用场景,包括简单队列模式、工作队列模式、发布/订阅模式、路由模式、主题模式、延迟队列模式和 RPC 队列模式,并提供了示例代码。在使用 RabbitMQ 时,需要根据实际需求选择合适的队列模式,并注意消息的可靠性和正确性。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:RabbitMQ 的七种队列模式和应用场景 - Python技术站