RabbitMQ五种模式详解(含实现代码)
RabbitMQ 是一个开源的消息队列系统,支持多种消息传递协议。在 RabbitMQ 中,有五种常用的消息模式,分别是简单模式、工作队列模式、发布/订阅模式、路由模式和主题模式。本文将详细讲解这五种模式的实现方法,并提供相应的示例代码。
简单模式
简单模式是 RabbitMQ 中最简单的一种模式,也是最常用的一种模式。在简单模式中,生产者将消息发送到队列中,消费者从队列中获取消息并进行处理。
使用以下代码实现简单模式:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
在上述代码中,channel.queue_declare(queue='hello')
表示声明一个名为 hello
的队列,channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
表示将消息 Hello World!
发送到队列 hello
中。
使用以下代码消费简单模式:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
在上述代码中,channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
表示从队列 hello
中获取消息,并将消息传递给回调函数 callback
进行处理。
工作队列模式
工作队列模式是 RabbitMQ 中最常用的一种模式之一。在工作队列模式中,生产者将消息发送到队列中,多个消费者从队列中获取消息并进行处理。在多个消费者的情况下,RabbitMQ 会将消息平均分配给每个消费者,以实现负载均衡。
使用以下代码实现工作队列模式:
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
在上述代码中,channel.queue_declare(queue='task_queue', durable=True)
表示声明一个名为 task_queue
的队列,并将队列设置为持久化,channel.basic_qos(prefetch_count=1)
表示在多个消费者的情况下,每个消费者最多只能处理一个消息,ch.basic_ack(delivery_tag=method.delivery_tag)
表示在消息处理完成后,向 RabbitMQ 发送确认消息。
使用以下代码发送消息到工作队列:
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
))
print(" [x] Sent %r" % message)
connection.close()
在上述代码中,delivery_mode=2
表示将消息设置为持久化。
发布/订阅模式
发布/订阅模式是 RabbitMQ 中最常用的一种模式之一。在发布/订阅模式中,生产者将消息发送到交换机中,交换机将消息广播给所有绑定到该交换机的队列中。
使用以下代码实现发布/订阅模式:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
在上述代码中,channel.exchange_declare(exchange='logs', exchange_type='fanout')
表示声明一个名为 logs
的交换机,并将交换机的类型设置为 fanout
,result = channel.queue_declare(queue='', exclusive=True)
表示声明一个随机的、独占的、非持久化的队列,channel.queue_bind(exchange='logs', queue=queue_name)
表示将队列绑定到交换机上。
使用以下代码发送消息到发布/订阅模式:
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Sent %r" % message)
connection.close()
在上述代码中,exchange='logs'
表示将消息发送到名为 logs
的交换机中。
路由模式
路由模式是 RabbitMQ 中常用的一种模式之一。在路由模式中,生产者将消息发送到交换机中,并指定消息的路由键,消费者只会接收到与其绑定的队列中的指定路由键的消息。
使用以下代码实现路由模式:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
severities = ['info', 'warning', 'error']
for severity in severities:
channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
在上述代码中,channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
表示声明一个名为 direct_logs
的交换机,并将交换机的类型设置为 direct
,channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)
表示将队列绑定到交换机上,并指定路由键。
使用以下代码发送消息到路由模式:
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or "Hello World!"
channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
在上述代码中,routing_key=severity
表示将消息发送到指定路由键的队列中。
主题模式
主题模式是 RabbitMQ 中最灵活的一种模式之一。在主题模式中,生产者将消息发送到交换机中,并指定消息的主题,消费者可以使用通配符匹配主题,以接收到符合条件的消息。
使用以下代码实现主题模式:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
binding_keys = ['*.critical', 'kern.*']
for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key)
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
在上述代码中,channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
表示声明一个名为 topic_logs
的交换机,并将交换机的类型设置为 topic
,channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key)
表示将队列绑定到交换机上,并指定主题。
使用以下代码发送消息到主题模式:
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or "Hello World!"
channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
在上述代码中,routing_key=routing_key
表示将消息发送到符合条件的队列中。
总结
本文详细讲解了 RabbitMQ 中五种常用的消息模式的实现方法,并提供了相应的示例代码。在使用 RabbitMQ 时,需要根据实际需求选择合适的模式,并注意消息的可靠性和正确性。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:rabbitmq五种模式详解(含实现代码) - Python技术站