下面是关于Python实现RabbitMQ的消息队列的完整攻略,具体内容如下:
RabbitMQ简介
RabbitMQ是一个开源的消息代理和队列系统,它使用Erlang编写,是一个高度可靠、可扩展的平台,适用于许多不同的企业和应用程序。使用RabbitMQ可以帮助应用程序的各个部分之间进行分布式计算,同时保证数据的可靠性和一致性。
RabbitMQ的安装
首先,需要安装RabbitMQ。
Windows
对于Windows系统,请访问RabbitMQ官网下载相应的安装包。
Linux
对于Debian或Ubuntu系统,请在终端中运行以下命令:
sudo apt-get install rabbitmq-server
对于CentOS或Fedora系统,请在终端中运行以下命令:
sudo yum install rabbitmq-server
安装完毕后,可以在终端中运行以下命令启动RabbitMQ:
sudo systemctl start rabbitmq-server
RabbitMQ的Python客户端——Pika
Python的RabbitMQ客户端是Pika,它是一个纯Python实现的RabbitMQ客户端库,提供了与RabbitMQ服务器交互的功能。
可以使用pip安装Pika:
pip install pika
示例一
首先,我们需要导入Pika库,并创建一个RabbitMQ连接:
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
接着,我们需要在RabbitMQ中创建一个消息队列。如果消息队列不存在,将自动创建。
channel = connection.channel()
channel.queue_declare(queue='hello')
然后,我们需要向消息队列中发送一条消息。下面的示例展示了如何把一个简单的字符串格式的消息发送到hello
队列中。
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
最后,当我们完成了消息队列的使用后,我们需要释放连接并关闭连接。
connection.close()
完整代码如下:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
示例二
我们可以在不同的Python进程之间建立一个消息队列来进行通信。下面的代码展示了在Python中使用RabbitMQ的方法。
首先,我们需要创建一个新的队列。
channel.queue_declare(queue='task_queue', durable=True)
然后,我们需要创建一个callback
函数,该函数接收消息,模拟一个耗时的任务,并在任务完成后删除该消息。
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
接下来,我们需要告诉RabbitMQ,当有新的消息到达时,调用这个callback
函数。
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
然后我们开始接收消息,并等待新的消息到达。
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
完整代码如下:
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()
这个示例建立了一个作为task_queue
的消息队列,并模拟了一个耗时的任务来处理消息。在这个示例中,我们使用了channel.basic_qos(prefetch_count=1)
设置了Consumer的预取计数,以保证只有一个消息被同时分配给每个Worker。同时我们设置了durable=True
来保证即使RabbitMQ出现异常情况导致消息队列崩溃,队列仍然可以持久化存储和恢复。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:python实现RabbitMQ的消息队列的示例代码 - Python技术站