Python操作RabbitMQ的三种工作模式
RabbitMQ是一个开源的消息队列系统,支持多种消息传递协议。Python中使用RabbitMQ进行队列通信的方法,包括RabbitMQ的安装、Python RabbitMQ客户端的安装、RabbitMQ的基础知识、消息列模式、消息的可靠性和正确性等内容,并提供三种工作模式的示例说明。
RabbitMQ的安装
在Windows系统中,可以通过以下步骤安装RabbitMQ:
- 下载RabbitMQ安装包,下载地址为:https://www.rabbitmq.com/download.html
- 安装Erlang,下载地址为:https://www.erlang.org/downloads
- 安装RabbitMQ,双击安装包并按照提示进行安装。
Python RabbitMQ客户端的安装
在Python中使用RabbitMQ需要安装pika库。可以通过以下步骤安装pika库:
- 在命令行中输入以下命令:pip install pika
RabbitMQ的基础知识
RabbitMQ的架构包括生产者、消费者、队列、交换机和绑定。生者将消息发送到交换机中,交换机根据绑定将消息路由到相应的队列中,消费者从队列中获取并进行处理。
消息队列模式
RabbitMQ中常用的消息队列模式包括简单模式、工作队列模式、发布/订阅模式、由模式和主题模式。
- 简单模式:生产者将消息发送到列中,消费者从队列中获取消息并进行处理。
- 工作队列模式:生产者将消息发送到队列中,多个消费者队列中获取消息并进行处理。在多个消费者的情况下,RabbitMQ会将消息平均分配给每个消费者,以实现负载均衡。
- 发布/订阅模式:生产者将消息到交换机中,交换机将消息广播给所有绑定到该交换机的队列中。
- 路由模式:生产者将消息发送到交换机中,并指定消息的路由键,消费者只会接收到与绑定的队列中的指定路由键的消息。
- 主题模式:产者将消息发送到交换机中,并指定消息的主题,消费者可以使用通配符匹配主题,以接收到符合条件的消息。
消息的可靠性和正确性
在使用RabbitMQ时,需要注意消息的可靠性和正确性。为了保证消息的可靠性,可以使用持久化队列和持久化消息。为了保证消息的正确性,可以使用事务和确认机制。
三种工作模式的示例
简单模式
使用以下代码实现消息的发送和接收:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
在上述代码中,connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
表示创建一个连接到RabbitMQ服务器的连接,channel.queue_declare(queue='hello')
表示声明一个名为 hello
的队列,def callback(ch, method, properties, body): {...}
表示接收到消息后的处理逻辑,channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
表示从队列 hello
中接收消息,并调用 callback
处理消息。
使用以下代码实现消息的发送:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
message = 'Hello World!'
channel.basic_publish(exchange='', routing_key='hello', body=message)
print(" [x] Sent %r" % message)
connection.close()
在上述代码中,channel.queue_declare(queue='hello')
表示声明一个名为 hello
的队列,channel.basic_publish(exchange='', routing_key='hello', body=message)
表示将消息发送到队列 hello
中。
工作队列模式
使用以下代码实现消息的发送和接收:
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
在上述代码中,channel.queue_declare(queue='task_queue', durable=True)
表示声明一个名为 task_queue
的队列,并将队列设置为持久化队列,def callback(ch, method, properties, body): {...}
表示接收到消息后的处理逻辑,time.sleep(body.count(b'.'))
表示模拟任务处理的时间,ch.basic_ack(delivery_tag=method.delivery_tag)
表示消息处理完成后发送确认消息。
使用以下代码实现消息的发送:
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
))
print(" [x] Sent %r" % message)
connection.close()
在上述代码中,channel.queue_declare(queue='task_queue', durable=True)
表示声明一个名为 task_queue
的队列,并将队列设置为持久化队列,channel.basic_publish(...)
表示将消息发送到队列 task_queue
中,并将消息设置为持久化消息。
发布/订阅模式
使用以下代码实现消息的发布和订阅:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
在上述代码中,channel.exchange_declare(exchange='logs', exchange_type='fanout')
表示声明一个名为 logs
的交换机,result = channel.queue_declare(queue='', exclusive=True)
表示声明一个随机的、独占的、自动的队列,channel.queue_bind(exchange='logs', queue=queue_name)
表示将队列绑定到交换机 logs
上,def callback(ch, method, properties, body): {...}
表示接收到消息后的处理逻辑,channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
表示从队列 queue_name
中接收消息,并调用 callback
处理消息。
使用以下代码实现消息的发布:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
message = 'Hello World!'
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Sent %r" % message)
connection.close()
在上述代码中,channel.exchange_declare(exchange='logs', exchange_type='fanout')
表示声明一个名为 logs
的交换机,channel.basic_publish(exchange='logs', routing_key='', body=message)
表示将消息发送到交换机 logs
中。
总结
本文详细讲解了Python操作RabbitMQ的三种工作模式,包括简单模式、工作队列模式和发布/订阅模式,并提供了示例说明。在使用RabbitMQ时,需要根据实际需求选择合适的特性,并注意消息的可靠性和正确性。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:python操作RabbitMq的三种工作模式 - Python技术站