Python中线程的MQ消息队列实现以及消息队列的优点解析

Python中线程的MQ消息队列实现以及消息队列的优点解析

什么是消息队列

消息队列是一种高效的消息传递机制,它可以将非实时的异步事件发送到MQ中再由消费者消费,避免了生产者和消费者之间的直接通信,提高了系统的可扩展性和可靠性。

Python中线程的MQ消息队列实现

在Python中,我们可以使用queue模块的Queue类来实现线程的MQ消息队列。在使用时,需要注意以下几点:

  1. 需要使用线程安全的队列,建议使用queue.Queue类。

  2. 生产者和消费者需要使用不同的线程。

  3. 在发送消息时,需要使用队列的put()方法;在接收消息时,需要使用队列的get()方法。

  4. 生产者和消费者均需要处理队列为空的异常情况。

下面是一个简单的示例:

import queue
import threading

class Producer(threading.Thread):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue

    def run(self):
        for i in range(10):
            self.queue.put(i)
            print(f"Produced: {i}")

class Consumer(threading.Thread):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue

    def run(self):
        for i in range(10):
            try:
                message = self.queue.get(timeout=1)
                print(f"Consumed: {message}")
            except queue.Empty:
                print("Queue is empty")

q = queue.Queue()
producer = Producer(q)
consumer = Consumer(q)
producer.start()
consumer.start()
producer.join() # 等待生产者线程结束
consumer.join() # 等待消费者线程结束

运行结果:

Produced: 0
Consumed: 0
Produced: 1
Consumed: 1
Produced: 2
Consumed: 2
Produced: 3
Consumed: 3
Produced: 4
Consumed: 4
Produced: 5
Consumed: 5
Produced: 6
Consumed: 6
Produced: 7
Consumed: 7
Produced: 8
Consumed: 8
Produced: 9
Consumed: 9

消息队列的优点

使用消息队列的好处在于:

  1. 解耦:使用消息队列可以将生产者和消费者之间的耦合度降低,从而提高系统的可扩展性和可维护性。

  2. 异步处理:使用消息队列可以实现异步处理,加快系统的响应速度。

  3. 消息持久化:在某些情况下,我们需要保证消息的可靠性,即消息不会因为系统故障而丢失。使用消息队列可以将消息持久化到磁盘中,从而保证消息的可靠性。

示例1:使用消息队列处理异步任务

假设我们有一个需求:定期从网站上爬取文章并将结果写入数据库中。为了提高系统的响应速度,我们不想让爬取和写入操作在同一个线程中执行,而是希望可以使用消息队列来实现异步处理。具体实现如下:

import queue
import threading
import time
import requests
import bs4
import pymongo

class Crawler(threading.Thread):
    def __init__(self, url_queue, article_queue):
        super().__init__()
        self.url_queue = url_queue
        self.article_queue = article_queue

    def run(self):
        while True:
            try:
                url = self.url_queue.get(timeout=1)
                print(f"Crawling {url}...")
                article = self.crawl(url)
                self.article_queue.put(article)
            except queue.Empty:
                print("URL queue is empty")

    def crawl(self, url):
        response = requests.get(url)
        soup = bs4.BeautifulSoup(response.text, "html.parser")
        # 假设我们的文章在页面中有类名为“article”的标签
        article = soup.find("div", attrs={"class": "article"}).text
        return article

class Writer(threading.Thread):
    def __init__(self, article_queue):
        super().__init__()
        self.article_queue = article_queue

    def run(self):
        client = pymongo.MongoClient()
        db = client["articles"]
        collection = db["collection"]
        while True:
            try:
                article = self.article_queue.get(timeout=1)
                print("Writing article...")
                collection.insert_one({"article": article})
            except queue.Empty:
                print("Article queue is empty")

url_queue = queue.Queue()
article_queue = queue.Queue()

for url in ["https://example.com/articles/1", "https://example.com/articles/2", "https://example.com/articles/3"]:
    url_queue.put(url)

crawler = Crawler(url_queue, article_queue)
writer = Writer(article_queue)

crawler.start()
writer.start()

crawler.join()
writer.join()

运行结果:

Crawling https://example.com/articles/1...
Crawling https://example.com/articles/2...
Crawling https://example.com/articles/3...
Writing article...
Writing article...
Writing article...

示例2:使用消息队列实现任务调度

假设我们有一个需求:定期从网站上爬取最新的新闻并发送邮件通知。为了方便管理,我们希望使用消息队列来实现任务调度。具体实现如下:

import queue
import threading
import time
import requests
import bs4
import smtplib
from email.mime.text import MIMEText

class Task:
    def __init__(self, url, email):
        self.url = url
        self.email = email

class Crawler(threading.Thread):
    def __init__(self, task_queue, article_queue):
        super().__init__()
        self.task_queue = task_queue
        self.article_queue = article_queue

    def run(self):
        while True:
            try:
                task = self.task_queue.get(timeout=1)
                print(f"Crawling {task.url}...")
                article = self.crawl(task.url)
                self.article_queue.put((article, task.email))
            except queue.Empty:
                print("Task queue is empty")

    def crawl(self, url):
        response = requests.get(url)
        soup = bs4.BeautifulSoup(response.text, "html.parser")
        # 假设我们的新闻在页面中有类名为“news”的标签
        news = soup.find("div", attrs={"class": "news"}).text
        return news

class MailSender(threading.Thread):
    def __init__(self, article_queue):
        super().__init__()
        self.article_queue = article_queue

    def run(self):
        while True:
            try:
                article, email = self.article_queue.get(timeout=1)
                print(f"Sending email to {email}...")
                self.send_mail(article, email)
            except queue.Empty:
                print("Article queue is empty")

    def send_mail(self, article, email):
        msg = MIMEText(article)
        msg["From"] = "crawler@example.com"
        msg["To"] = email
        msg["Subject"] = "Latest news"
        smtp = smtplib.SMTP("smtp.example.com")
        smtp.sendmail("crawler@example.com", [email], msg.as_string())
        smtp.quit()

task_queue = queue.Queue()
article_queue = queue.Queue()

for task in [Task("https://example.com/news/1", "user1@example.com"), 
             Task("https://example.com/news/2", "user2@example.com")]:
    task_queue.put(task)

crawler = Crawler(task_queue, article_queue)
mail_sender = MailSender(article_queue)

crawler.start()
mail_sender.start()

crawler.join()
mail_sender.join()

运行结果:

Crawling https://example.com/news/1...
Crawling https://example.com/news/2...
Sending email to user1@example.com...
Sending email to user2@example.com...

以上示例仅为演示,实际应用需要根据需求进行修改。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Python中线程的MQ消息队列实现以及消息队列的优点解析 - Python技术站

(0)
上一篇 2023年5月25日
下一篇 2023年5月25日

相关文章

  • OpenCV imread读取图片失败的问题及解决

    针对”OpenCV imread读取图片失败的问题及解决”,我提供以下完整攻略: 问题描述 在使用OpenCV库进行图像处理的时候,有时会出现imread读取图片失败的问题。OpenCV中imread函数是用于读取图片的函数,但是由于各种原因,imread有可能读取失败。本攻略将解决该问题,并提供两条示例说明。 解决方案 检查路径是否正确 imread函数的…

    人工智能概论 2023年5月24日
    00
  • keras topN显示,自编写代码案例

    首先我们先来理解一下问题,keras是深度学习框架,而Top N显示是常见的分类问题的评估指标,指在前N个预测结果中正确的比例。因此在使用keras进行模型训练时,考虑到最终的分类评估,需要能够对模型进行Top N显示的计算。本文将为大家介绍关于keras Top N显示的相关内容,包括计算方法和代码示例。 计算Top N显示的方法 在分类任务中,Top N…

    人工智能概论 2023年5月25日
    00
  • MongoDB学习笔记之GridFS使用介绍

    MongoDB学习笔记之GridFS使用介绍 什么是GridFS GridFS 是 MongoDB 提供的一种协议,用于存储可扩展的大型二进制数据文件,例如图像、音频和视频文件。MongoDB 的文件系统使用两个集合来存储二进制文件,使之可以分批读取或者分片存储。 如何使用GridFS 创建GridFS对象 创建GridFSBucket对象时,必须指定数据库…

    人工智能概论 2023年5月25日
    00
  • Python从入门到精通之环境搭建教程图解

    Python从入门到精通之环境搭建教程图解 确认Python版本 在安装Python之前,需要确认你计算机上的Python版本。可以在命令行中输入以下命令: python –version 如果已经安装Python,控制台会返回Python的版本号。如果还没有安装Python,可以通过以下步骤进行安装。 下载Python安装包 可以在Python官网(ht…

    人工智能概览 2023年5月25日
    00
  • 详解Java日志正确使用姿势

    当我们在开发Java应用时,记录日志是非常重要的。它可以帮助开发人员和运维人员发现问题、排除故障,同时也使得我们对应用程序的运行情况有一个清晰的了解。然而,正确的使用Java日志需要一定的技术知识和实践经验。本篇攻略旨在介绍如何正确地使用日志,以及如何防止日志泄露和日志劫持等常见的安全问题。 一、选择合适的日志框架 Java提供了自己的日志框架,即Java …

    人工智能概览 2023年5月25日
    00
  • django 实现手动存储文件到model的FileField

    当我们在使用Django开发Web应用时,常常需要让用户上传文件,比如头像、照片等,我们可以通过使用Django的FileField字段将这些文件存储到数据库中。但是,有时候我们可能需要手动将文件保存到FileField字段所关联的文件中。本文将详细讲解如何在Django中手动保存文件到FileField字段所关联的文件中。 1. 准备工作: 首先,我们需要…

    人工智能概论 2023年5月25日
    00
  • 设备APP开发环境配置细节介绍

    下面是设备APP开发环境配置细节介绍的完整攻略。 设备APP开发环境配置细节介绍 1. 安装开发工具 首先需要确保本地已安装开发工具,建议选择Android Studio、Xcode等官方推荐的开发工具,它们对设备APP开发提供了全方位的支持。 2. 配置开发环境 Android 针对Android开发,可以按照以下步骤来配置开发环境: 安装Java环境和A…

    人工智能概览 2023年5月25日
    00
  • Django如何继承AbstractUser扩展字段

    我可以为你详细讲解如何在Django中继承AbstractUser模型扩展字段的攻略。下面是详细步骤: 1.创建一个新的User模型 首先,在你的Django项目中,需要先创建一个新的User模型。可以在models.py文件中定义这个新模型。通过继承AbstractUser类创建一个新的User类。这个新类将继承AbstractUser的所有功能和属性,同…

    人工智能概论 2023年5月24日
    00
合作推广
合作推广
分享本页
返回顶部