rabbitmq五种模式详解(含实现代码)

RabbitMQ五种模式详解(含实现代码)

RabbitMQ 是一个开源的消息队列系统,支持多种消息传递协议。在 RabbitMQ 中,有五种常用的消息模式,分别是简单模式、工作队列模式、发布/订阅模式、路由模式和主题模式。本文将详细讲解这五种模式的实现方法,并提供相应的示例代码。

简单模式

简单模式是 RabbitMQ 中最简单的一种模式,也是最常用的一种模式。在简单模式中,生产者将消息发送到队列中,消费者从队列中获取消息并进行处理。

使用以下代码实现简单模式:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")

connection.close()

在上述代码中,channel.queue_declare(queue='hello') 表示声明一个名为 hello 的队列,channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') 表示将消息 Hello World! 发送到队列 hello 中。

使用以下代码消费简单模式:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

在上述代码中,channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True) 表示从队列 hello 中获取消息,并将消息传递给回调函数 callback 进行处理。

工作队列模式

工作队列模式是 RabbitMQ 中最常用的一种模式之一。在工作队列模式中,生产者将消息发送到队列中,多个消费者从队列中获取消息并进行处理。在多个消费者的情况下,RabbitMQ 会将消息平均分配给每个消费者,以实现负载均衡。

使用以下代码实现工作队列模式:

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

在上述代码中,channel.queue_declare(queue='task_queue', durable=True) 表示声明一个名为 task_queue 的队列,并将队列设置为持久化,channel.basic_qos(prefetch_count=1) 表示在多个消费者的情况下,每个消费者最多只能处理一个消息,ch.basic_ack(delivery_tag=method.delivery_tag) 表示在消息处理完成后,向 RabbitMQ 发送确认消息。

使用以下代码发送消息到工作队列:

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=2,  # make message persistent
    ))
print(" [x] Sent %r" % message)

connection.close()

在上述代码中,delivery_mode=2 表示将消息设置为持久化。

发布/订阅模式

发布/订阅模式是 RabbitMQ 中最常用的一种模式之一。在发布/订阅模式中,生产者将消息发送到交换机中,交换机将消息广播给所有绑定到该交换机的队列中。

使用以下代码实现发布/订阅模式:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs', queue=queue_name)

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

在上述代码中,channel.exchange_declare(exchange='logs', exchange_type='fanout') 表示声明一个名为 logs 的交换机,并将交换机的类型设置为 fanoutresult = channel.queue_declare(queue='', exclusive=True) 表示声明一个随机的、独占的、非持久化的队列,channel.queue_bind(exchange='logs', queue=queue_name) 表示将队列绑定到交换机上。

使用以下代码发送消息到发布/订阅模式:

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Sent %r" % message)

connection.close()

在上述代码中,exchange='logs' 表示将消息发送到名为 logs 的交换机中。

路由模式

路由模式是 RabbitMQ 中常用的一种模式之一。在路由模式中,生产者将消息发送到交换机中,并指定消息的路由键,消费者只会接收到与其绑定的队列中的指定路由键的消息。

使用以下代码实现路由模式:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

severities = ['info', 'warning', 'error']
for severity in severities:
    channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

在上述代码中,channel.exchange_declare(exchange='direct_logs', exchange_type='direct') 表示声明一个名为 direct_logs 的交换机,并将交换机的类型设置为 directchannel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) 表示将队列绑定到交换机上,并指定路由键。

使用以下代码发送消息到路由模式:

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or "Hello World!"
channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message)
print(" [x] Sent %r:%r" % (severity, message))

connection.close()

在上述代码中,routing_key=severity 表示将消息发送到指定路由键的队列中。

主题模式

主题模式是 RabbitMQ 中最灵活的一种模式之一。在主题模式中,生产者将消息发送到交换机中,并指定消息的主题,消费者可以使用通配符匹配主题,以接收到符合条件的消息。

使用以下代码实现主题模式:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

binding_keys = ['*.critical', 'kern.*']
for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key)

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

在上述代码中,channel.exchange_declare(exchange='topic_logs', exchange_type='topic') 表示声明一个名为 topic_logs 的交换机,并将交换机的类型设置为 topicchannel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key) 表示将队列绑定到交换机上,并指定主题。

使用以下代码发送消息到主题模式:

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or "Hello World!"
channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message)
print(" [x] Sent %r:%r" % (routing_key, message))

connection.close()

在上述代码中,routing_key=routing_key 表示将消息发送到符合条件的队列中。

总结

本文详细讲解了 RabbitMQ 中五种常用的消息模式的实现方法,并提供了相应的示例代码。在使用 RabbitMQ 时,需要根据实际需求选择合适的模式,并注意消息的可靠性和正确性。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:rabbitmq五种模式详解(含实现代码) - Python技术站

(0)
上一篇 2023年5月15日
下一篇 2023年5月15日

相关文章

  • MQ的消息模型及在工作上应用场景

    以下是“MQ的消息模型及在工作上应用场景”的完整攻略,包含两个示例。 简介 MQ(Message Queue)是一种消息队列,它可以在分布式系统中传递消息。MQ可以解耦系统之间的依赖关系,提高系统的可靠性和可扩展性。本攻略将详细介绍MQ的消息模型及在工作上的应用场景,并提供两个示例,演示如何使用MQ实现消息传递。 消息模型 MQ的消息模型通常包括以下几个概念…

    RabbitMQ 2023年5月15日
    00
  • 手把手带你掌握SpringBoot RabbitMQ延迟队列

    手把手带你掌握SpringBoot RabbitMQ延迟队列 RabbitMQ 是一个开源的消息队列系统,支持多种消息递协议。在 RabbitMQ 中,延迟队列可以用于实现消息的延迟处理。本文将详细讲解如何使用 SpringBoot 和 RabbitMQ 实现延迟队列,并提供两个示例说明。 环境准备 在开始之前,需要确保已经安装了以下环境: JDK 1.8 …

    RabbitMQ 2023年5月15日
    00
  • Python环境下安装使用异步任务队列包Celery的基础教程

    以下是“Python环境下安装使用异步任务队列包Celery的基础教程”的完整攻略,包含两个示例。 简介 在本攻略中,我们将介绍如何在Python环境下安装使用异步任务队列包Celery。通过攻略的学习,您将了解如何使用Celery来处理异步任务,并了解Celery的基本原理和使用方法。 示例一:安装Celery 以下是安装Celery的示例: 安装Rabb…

    RabbitMQ 2023年5月15日
    00
  • docker安装RabbitMQ及安装延迟插件的详细过程

    以下是“Docker安装RabbitMQ及安装延迟插件的详细过程”的完整攻略,包含两个示例。 简介 在本攻略中,我们将介绍如何使用Docker安装RabbitMQ,并安装延迟插件。RabbitMQ是一种常见的消息队列应用程序,通过本攻略的学习,您将掌握如何使用Docker安装RabbitMQ,并安装延迟插件。 示例一:使用Docker安装RabbitMQ 以…

    RabbitMQ 2023年5月15日
    00
  • Docker安装RabbitMQ AMQP协议及重要角色

    Docker安装RabbitMQ AMQP协议及重要角色 RabbitMQ是一个开源的消息队列系统,支持多种消息递协议。在使用RabbitMQ时,可以使用Docker来快速安装和部署RabbitMQ。本文将介绍如何使用Docker安装RabbitMQ,并介绍RabbitMQ中的重要角色。 Docker安装RabbitMQ 在使用Docker安装RabbitM…

    RabbitMQ 2023年5月15日
    00
  • 详解SpringBoot中使用RabbitMQ的RPC功能

    下面是详解SpringBoot中使用RabbitMQ的RPC功能的完整攻略,包含两条示例说明。 简介 RPC(Remote Procedure Call)是一种远程调用协议,它允许一个程序调用另一个程序中的函数或方法,而不需要了解底层的网络细节。在分布式系统中,RPC是一种常见的通信方式,它可以让不同的服务之间进行通信和协作。 RabbitMQ是一个开源的消…

    RabbitMQ 2023年5月16日
    00
  • 前端与RabbitMQ实时消息推送未读消息小红点实现示例

    以下是前端与RabbitMQ实时消息推送未读消息小红点实现示例的完整攻略,包含两个示例说明。 示例1:使用WebSocket实现实时消息推送 步骤1:添加依赖 在pom.xml文件中添加以下依赖: <dependency> <groupId>org.springframework.boot</groupId> <ar…

    RabbitMQ 2023年5月15日
    00
  • maven项目test执行main找不到资源文件的问题及解决

    以下是“maven项目test执行main找不到资源文件的问题及解决”的完整攻略,包含两个示例。 简介 在Maven项目中,有时候我们会遇到test执行main找不到资源文件的问题。这个问题通常是由于资源文件没有正确地被加载所导致的。本攻略将详细介绍如何解决这个问题,包括使用相对路径和绝对路径两种方式。 使用解 使用相对路径 可以使用相对路径来解决test执…

    RabbitMQ 2023年5月15日
    00
合作推广
合作推广
分享本页
返回顶部