python操作RabbitMq的三种工作模式

Python操作RabbitMQ的三种工作模式

RabbitMQ是一个开源的消息队列系统,支持多种消息传递协议。Python中使用RabbitMQ进行队列通信的方法,包括RabbitMQ的安装、Python RabbitMQ客户端的安装、RabbitMQ的基础知识、消息列模式、消息的可靠性和正确性等内容,并提供三种工作模式的示例说明。

RabbitMQ的安装

在Windows系统中,可以通过以下步骤安装RabbitMQ:

  1. 下载RabbitMQ安装包,下载地址为:https://www.rabbitmq.com/download.html
  2. 安装Erlang,下载地址为:https://www.erlang.org/downloads
  3. 安装RabbitMQ,双击安装包并按照提示进行安装。

Python RabbitMQ客户端的安装

在Python中使用RabbitMQ需要安装pika库。可以通过以下步骤安装pika库:

  1. 在命令行中输入以下命令:pip install pika

RabbitMQ的基础知识

RabbitMQ的架构包括生产者、消费者、队列、交换机和绑定。生者将消息发送到交换机中,交换机根据绑定将消息路由到相应的队列中,消费者从队列中获取并进行处理。

消息队列模式

RabbitMQ中常用的消息队列模式包括简单模式、工作队列模式、发布/订阅模式、由模式和主题模式。

  • 简单模式:生产者将消息发送到列中,消费者从队列中获取消息并进行处理。
  • 工作队列模式:生产者将消息发送到队列中,多个消费者队列中获取消息并进行处理。在多个消费者的情况下,RabbitMQ会将消息平均分配给每个消费者,以实现负载均衡。
  • 发布/订阅模式:生产者将消息到交换机中,交换机将消息广播给所有绑定到该交换机的队列中。
  • 路由模式:生产者将消息发送到交换机中,并指定消息的路由键,消费者只会接收到与绑定的队列中的指定路由键的消息。
  • 主题模式:产者将消息发送到交换机中,并指定消息的主题,消费者可以使用通配符匹配主题,以接收到符合条件的消息。

消息的可靠性和正确性

在使用RabbitMQ时,需要注意消息的可靠性和正确性。为了保证消息的可靠性,可以使用持久化队列和持久化消息。为了保证消息的正确性,可以使用事务和确认机制。

三种工作模式的示例

简单模式

使用以下代码实现消息的发送和接收:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

在上述代码中,connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) 表示创建一个连接到RabbitMQ服务器的连接,channel.queue_declare(queue='hello') 表示声明一个名为 hello 的队列,def callback(ch, method, properties, body): {...} 表示接收到消息后的处理逻辑,channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True) 表示从队列 hello 中接收消息,并调用 callback 处理消息。

使用以下代码实现消息的发送:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

message = 'Hello World!'
channel.basic_publish(exchange='', routing_key='hello', body=message)

print(" [x] Sent %r" % message)
connection.close()

在上述代码中,channel.queue_declare(queue='hello') 表示声明一个名为 hello 的队列,channel.basic_publish(exchange='', routing_key='hello', body=message) 表示将消息发送到队列 hello 中。

工作队列模式

使用以下代码实现消息的发送和接收:

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

在上述代码中,channel.queue_declare(queue='task_queue', durable=True) 表示声明一个名为 task_queue 的队列,并将队列设置为持久化队列,def callback(ch, method, properties, body): {...} 表示接收到消息后的处理逻辑,time.sleep(body.count(b'.')) 表示模拟任务处理的时间,ch.basic_ack(delivery_tag=method.delivery_tag) 表示消息处理完成后发送确认消息。

使用以下代码实现消息的发送:

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=2,  # make message persistent
    ))
print(" [x] Sent %r" % message)
connection.close()

在上述代码中,channel.queue_declare(queue='task_queue', durable=True) 表示声明一个名为 task_queue 的队列,并将队列设置为持久化队列,channel.basic_publish(...) 表示将消息发送到队列 task_queue 中,并将消息设置为持久化消息。

发布/订阅模式

使用以下代码实现消息的发布和订阅:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs', queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

在上述代码中,channel.exchange_declare(exchange='logs', exchange_type='fanout') 表示声明一个名为 logs 的交换机,result = channel.queue_declare(queue='', exclusive=True) 表示声明一个随机的、独占的、自动的队列,channel.queue_bind(exchange='logs', queue=queue_name) 表示将队列绑定到交换机 logs 上,def callback(ch, method, properties, body): {...} 表示接收到消息后的处理逻辑,channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) 表示从队列 queue_name 中接收消息,并调用 callback 处理消息。

使用以下代码实现消息的发布:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

message = 'Hello World!'
channel.basic_publish(exchange='logs', routing_key='', body=message)

print(" [x] Sent %r" % message)
connection.close()

在上述代码中,channel.exchange_declare(exchange='logs', exchange_type='fanout') 表示声明一个名为 logs 的交换机,channel.basic_publish(exchange='logs', routing_key='', body=message) 表示将消息发送到交换机 logs 中。

总结

本文详细讲解了Python操作RabbitMQ的三种工作模式,包括简单模式、工作队列模式和发布/订阅模式,并提供了示例说明。在使用RabbitMQ时,需要根据实际需求选择合适的特性,并注意消息的可靠性和正确性。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:python操作RabbitMq的三种工作模式 - Python技术站

(0)
上一篇 2023年5月15日
下一篇 2023年5月15日

相关文章

  • RabbitMQ如何进行流量控制?

    RabbitMQ如何进行流量控制? RabbitMQ是一个流行的消息代理,它支持流量控制来确保系统的可靠性和稳定性。流量控制是一种机制,用于限制消息的发送速率,以避免过载和系统崩溃。在RabbitMQ中,流量控制可以通过设置QoS(Quality of Service)参数和使用Publisher Confirms机制来实现。 以下是RabbitMQ如何进行…

    云计算 2023年5月5日
    00
  • SpringBoot中启动时如何忽略某项检测

    以下是“SpringBoot中启动时如何忽略某项检测”的完整攻略,包含两个示例。 简介 在本攻略中,我们将介绍如何在SpringBoot中启动时忽略某项检测。通过攻略的学习,您将了解如何使用SpringBoot的配置文件和注解实现该功能。 示例一:使用配置文件忽略某项检测 以下是使用配置文件忽略某项检测的示例: 在application.properties…

    RabbitMQ 2023年5月15日
    00
  • 基于Java ActiveMQ的实例讲解

    以下是“基于Java ActiveMQ的实例讲解”的完整攻略,包含两个示例。 简介 ActiveMQ是一个流行的开源消息中间件,它实现了JMS(Java消息服务)规范,提供了可靠的消息传递和异步通信功能。ActiveMQ支持多种消息协议和传输协议,例如AMQP、STOMP、MQTT、TCP、UDP等,可以在不同的应用场景中使用。本攻略将详细介绍ActiveM…

    RabbitMQ 2023年5月15日
    00
  • Django+Celery实现定时任务的示例

    以下是“Django+Celery实现定时任务的示例”的完整攻略,包含两个示例。 简介 在本攻略中,我们将详细讲解如何使用Django和Celery实现定时任务。通过攻略的学习,您将了解Django和Celery的基本概念、如何配置Django和Celery、如何编写定时任务以及如何优化Django和Celery应用。 示例一:配置Django和Celery…

    RabbitMQ 2023年5月15日
    00
  • RabbitMQ消费端ACK NACK及重回队列机制详解

    RabbitMQ消费端ACK NACK及重回队列机制详解 在RabbitMQ中,消费端ACK和NACK是非常重要的概念。ACK表示消息已经被消费,NACK表示消息未被消费。本文将详细讲解RabbitMQ消费端ACK NACK及重回队列机制,并提供两个示例说明。 消费端ACK和NACK 在RabbitMQ中,消费端ACK和NACK是用来确认消息是否被消费的。当…

    RabbitMQ 2023年5月15日
    00
  • springboot整合RabbitMQ发送短信的实现

    以下是“SpringBoot整合RabbitMQ发送短信的实现”的完整攻略,包含两个示例。 简介 在SpringBoot应用程序中,可以使用RabbitMQ作为消息队列系统,实现短信发送功能。本攻略将详细介绍如何在SpringBoot中整合RabbitMQ发送短信,包括创建RabbitMQ配置类、创建消息发送者、创建消息接收者等。 步骤 以下是SpringB…

    RabbitMQ 2023年5月15日
    00
  • Abp集成HangFire开源.NET任务调度框架

    以下是“Abp集成HangFire开源.NET任务调度框架”的完整攻略,包含两个示例。 简介 HangFire是一个.NET任务调度框架,可以帮助开发人员轻松地实现后台任务的调度和执行。HangFire具有易用性、可靠性和可扩展性等特点,被广泛应用于.NET开发领域。本攻略将介绍如何在Abp框架中集成HangFire。 示例1:集成HangFire 以下是集…

    RabbitMQ 2023年5月15日
    00
  • Linux RabbitMQ 集群搭建流程图解

    Linux RabbitMQ 集群搭建流程图解 在本文中,我们将介绍如何在Linux上搭建RabbitMQ集群,并提供两个示例说明。 环境准备 在开始之前,需要确保已安装了以下环境: Linux操作系统 Erlang RabbitMQ 步骤一:安装Erlang 在本步骤中,我们将安装Erlang。 sudo apt-get update sudo apt-g…

    RabbitMQ 2023年5月15日
    00
合作推广
合作推广
分享本页
返回顶部