Python使用Pika库调用RabbitMQ交换机模式详解
在本文中,我们将介绍如何使用Python的Pika库调用RabbitMQ交换机模式,并提供两个示例说明。
环境准备
在开始之前,需要确保已安装了以下环境:
- Python 3.x
- Pika库
- RabbitMQ
示例一:使用直接交换机发送和接收消息
在本例中,我们将使用直接交换机发送和接收消息。具体步骤如下:
- 导入Pika库。
- 建立连接。
- 创建一个发送者。
- 创建一个接收者。
- 启动应用程序并测试。
1. 导入Pika库
import pika
2. 建立连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
在上述代码中,我们使用pika.BlockingConnection
建立与RabbitMQ的连接,并使用channel
创建一个通道。
3. 创建一个发送者
channel.exchange_declare(exchange='direct_exchange', exchange_type='direct')
message = 'Hello, RabbitMQ!'
channel.basic_publish(exchange='direct_exchange', routing_key='direct_key', body=message)
print("Sent message: %s" % message)
在上述代码中,我们使用exchange_declare
声明一个直接交换机,并使用basic_publish
发送消息到交换机中。
4. 创建一个接收者
def callback(ch, method, properties, body):
print("Received message: %s" % body)
channel.exchange_declare(exchange='direct_exchange', exchange_type='direct')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='direct_exchange', queue=queue_name, routing_key='direct_key')
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print('Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
在上述代码中,我们使用queue_declare
声明一个队列,并使用queue_bind
将队列绑定到直接交换机上。然后使用basic_consume
监听队列中的消息,并使用callback
函数处理接收到的消息。
5. 启动应用程序并测试
启动应用程序并发送消息到交换机中,然后接收消息并输出。
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_exchange', exchange_type='direct')
message = 'Hello, RabbitMQ!'
channel.basic_publish(exchange='direct_exchange', routing_key='direct_key', body=message)
print("Sent message: %s" % message)
def callback(ch, method, properties, body):
print("Received message: %s" % body)
channel.exchange_declare(exchange='direct_exchange', exchange_type='direct')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='direct_exchange', queue=queue_name, routing_key='direct_key')
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print('Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
在上述代码中,我们使用BlockingConnection
建立与RabbitMQ的连接,并使用channel
创建一个通道。然后发送消息到交换机中,并监听队列中的消息。
示例二:使用主题交换机发送和接收消息
在本例中,我们将使用主题交换机发送和接收消息。具体步骤如下:
- 导入Pika库。
- 建立连接。
- 创建一个发送者。
- 创建一个接收者。
- 启动应用程序并测试。
1. 导入Pika库
import pika
2. 建立连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
在上述代码中,我们使用pika.BlockingConnection
建立与RabbitMQ的连接,并使用channel
创建一个通道。
3. 创建一个发送者
channel.exchange_declare(exchange='topic_exchange', exchange_type='topic')
message = 'Hello, RabbitMQ!'
channel.basic_publish(exchange='topic_exchange', routing_key='topic.key', body=message)
print("Sent message: %s" % message)
在上述代码中,我们使用exchange_declare
声明一个主题交换机,并使用basic_publish
发送消息到交换机中。
4. 创建一个接收者
def callback(ch, method, properties, body):
print("Received message: %s" % body)
channel.exchange_declare(exchange='topic_exchange', exchange_type='topic')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='topic_exchange', queue=queue_name, routing_key='topic.#')
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print('Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
在上述代码中,我们使用queue_declare
声明一个队列,并使用queue_bind
将队列绑定到主题交换机上。然后使用basic_consume
监听队列中的消息,并使用callback
函数处理接收到的消息。
5. 启动应用程序并测试
启动应用程序并发送消息到交换机中,然后接收消息并输出。
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_exchange', exchange_type='topic')
message = 'Hello, RabbitMQ!'
channel.basic_publish(exchange='topic_exchange', routing_key='topic.key', body=message)
print("Sent message: %s" % message)
def callback(ch, method, properties, body):
print("Received message: %s" % body)
channel.exchange_declare(exchange='topic_exchange', exchange_type='topic')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='topic_exchange', queue=queue_name, routing_key='topic.#')
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print('Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
在上述代码中,我们使用BlockingConnection
建立与RabbitMQ的连接,并使用channel
创建一个通道。然后发送消息到交换机中,并监听队列中的消息。
总结
本文介绍了如何使用Python的Pika库调用RabbitMQ交换机模式,并提供了两个示例说明。通过使用Pika库和RabbitMQ,可以更方便地发送和接收消息。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:python使用pika库调用rabbitmq交换机模式详解 - Python技术站