Python中线程的MQ消息队列实现以及消息队列的优点解析

Python中线程的MQ消息队列实现以及消息队列的优点解析

什么是消息队列

消息队列是一种高效的消息传递机制,它可以将非实时的异步事件发送到MQ中再由消费者消费,避免了生产者和消费者之间的直接通信,提高了系统的可扩展性和可靠性。

Python中线程的MQ消息队列实现

在Python中,我们可以使用queue模块的Queue类来实现线程的MQ消息队列。在使用时,需要注意以下几点:

  1. 需要使用线程安全的队列,建议使用queue.Queue类。

  2. 生产者和消费者需要使用不同的线程。

  3. 在发送消息时,需要使用队列的put()方法;在接收消息时,需要使用队列的get()方法。

  4. 生产者和消费者均需要处理队列为空的异常情况。

下面是一个简单的示例:

import queue
import threading

class Producer(threading.Thread):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue

    def run(self):
        for i in range(10):
            self.queue.put(i)
            print(f"Produced: {i}")

class Consumer(threading.Thread):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue

    def run(self):
        for i in range(10):
            try:
                message = self.queue.get(timeout=1)
                print(f"Consumed: {message}")
            except queue.Empty:
                print("Queue is empty")

q = queue.Queue()
producer = Producer(q)
consumer = Consumer(q)
producer.start()
consumer.start()
producer.join() # 等待生产者线程结束
consumer.join() # 等待消费者线程结束

运行结果:

Produced: 0
Consumed: 0
Produced: 1
Consumed: 1
Produced: 2
Consumed: 2
Produced: 3
Consumed: 3
Produced: 4
Consumed: 4
Produced: 5
Consumed: 5
Produced: 6
Consumed: 6
Produced: 7
Consumed: 7
Produced: 8
Consumed: 8
Produced: 9
Consumed: 9

消息队列的优点

使用消息队列的好处在于:

  1. 解耦:使用消息队列可以将生产者和消费者之间的耦合度降低,从而提高系统的可扩展性和可维护性。

  2. 异步处理:使用消息队列可以实现异步处理,加快系统的响应速度。

  3. 消息持久化:在某些情况下,我们需要保证消息的可靠性,即消息不会因为系统故障而丢失。使用消息队列可以将消息持久化到磁盘中,从而保证消息的可靠性。

示例1:使用消息队列处理异步任务

假设我们有一个需求:定期从网站上爬取文章并将结果写入数据库中。为了提高系统的响应速度,我们不想让爬取和写入操作在同一个线程中执行,而是希望可以使用消息队列来实现异步处理。具体实现如下:

import queue
import threading
import time
import requests
import bs4
import pymongo

class Crawler(threading.Thread):
    def __init__(self, url_queue, article_queue):
        super().__init__()
        self.url_queue = url_queue
        self.article_queue = article_queue

    def run(self):
        while True:
            try:
                url = self.url_queue.get(timeout=1)
                print(f"Crawling {url}...")
                article = self.crawl(url)
                self.article_queue.put(article)
            except queue.Empty:
                print("URL queue is empty")

    def crawl(self, url):
        response = requests.get(url)
        soup = bs4.BeautifulSoup(response.text, "html.parser")
        # 假设我们的文章在页面中有类名为“article”的标签
        article = soup.find("div", attrs={"class": "article"}).text
        return article

class Writer(threading.Thread):
    def __init__(self, article_queue):
        super().__init__()
        self.article_queue = article_queue

    def run(self):
        client = pymongo.MongoClient()
        db = client["articles"]
        collection = db["collection"]
        while True:
            try:
                article = self.article_queue.get(timeout=1)
                print("Writing article...")
                collection.insert_one({"article": article})
            except queue.Empty:
                print("Article queue is empty")

url_queue = queue.Queue()
article_queue = queue.Queue()

for url in ["https://example.com/articles/1", "https://example.com/articles/2", "https://example.com/articles/3"]:
    url_queue.put(url)

crawler = Crawler(url_queue, article_queue)
writer = Writer(article_queue)

crawler.start()
writer.start()

crawler.join()
writer.join()

运行结果:

Crawling https://example.com/articles/1...
Crawling https://example.com/articles/2...
Crawling https://example.com/articles/3...
Writing article...
Writing article...
Writing article...

示例2:使用消息队列实现任务调度

假设我们有一个需求:定期从网站上爬取最新的新闻并发送邮件通知。为了方便管理,我们希望使用消息队列来实现任务调度。具体实现如下:

import queue
import threading
import time
import requests
import bs4
import smtplib
from email.mime.text import MIMEText

class Task:
    def __init__(self, url, email):
        self.url = url
        self.email = email

class Crawler(threading.Thread):
    def __init__(self, task_queue, article_queue):
        super().__init__()
        self.task_queue = task_queue
        self.article_queue = article_queue

    def run(self):
        while True:
            try:
                task = self.task_queue.get(timeout=1)
                print(f"Crawling {task.url}...")
                article = self.crawl(task.url)
                self.article_queue.put((article, task.email))
            except queue.Empty:
                print("Task queue is empty")

    def crawl(self, url):
        response = requests.get(url)
        soup = bs4.BeautifulSoup(response.text, "html.parser")
        # 假设我们的新闻在页面中有类名为“news”的标签
        news = soup.find("div", attrs={"class": "news"}).text
        return news

class MailSender(threading.Thread):
    def __init__(self, article_queue):
        super().__init__()
        self.article_queue = article_queue

    def run(self):
        while True:
            try:
                article, email = self.article_queue.get(timeout=1)
                print(f"Sending email to {email}...")
                self.send_mail(article, email)
            except queue.Empty:
                print("Article queue is empty")

    def send_mail(self, article, email):
        msg = MIMEText(article)
        msg["From"] = "crawler@example.com"
        msg["To"] = email
        msg["Subject"] = "Latest news"
        smtp = smtplib.SMTP("smtp.example.com")
        smtp.sendmail("crawler@example.com", [email], msg.as_string())
        smtp.quit()

task_queue = queue.Queue()
article_queue = queue.Queue()

for task in [Task("https://example.com/news/1", "user1@example.com"), 
             Task("https://example.com/news/2", "user2@example.com")]:
    task_queue.put(task)

crawler = Crawler(task_queue, article_queue)
mail_sender = MailSender(article_queue)

crawler.start()
mail_sender.start()

crawler.join()
mail_sender.join()

运行结果:

Crawling https://example.com/news/1...
Crawling https://example.com/news/2...
Sending email to user1@example.com...
Sending email to user2@example.com...

以上示例仅为演示,实际应用需要根据需求进行修改。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Python中线程的MQ消息队列实现以及消息队列的优点解析 - Python技术站

(0)
上一篇 2023年5月25日
下一篇 2023年5月25日

相关文章

  • Django–权限Permissions的例子

    下面是关于Django中权限Permissions的例子的详细攻略。 1. 什么是Permissions Permissions是Django中的一种权限控制系统。通过这个系统,我们可以根据用户的身份或者角色,对不同的访问控制进行限制。例如,我们可以设置只有管理员才能删除数据,而普通用户只能查看数据等等。 2. Permissions的应用 2.1 在视图函…

    人工智能概览 2023年5月25日
    00
  • Ubuntu中搭建Nginx、PHP环境最简单的方法

    搭建Nginx和PHP环境需要以下步骤: 1. 安装Nginx 在Ubuntu系统中,可以通过以下命令安装Nginx: sudo apt update sudo apt install nginx 安装完成后,可以使用以下命令检查Nginx是否安装成功: nginx -v 这会输出Nginx的版本号,表示安装成功。 2. 安装PHP 在Ubuntu系统中,可…

    人工智能概论 2023年5月25日
    00
  • Python的Django框架中的Context使用

    下面是Python的Django框架中的Context使用的完整攻略: 什么是Context? Context是Django框架中一个非常重要的部分,它负责传递模板中需要的变量以及函数等信息。在Django框架中,Context通常是一个字典对象,其中键为变量名,值为对应变量的值。 如何定义Context? 在Django框架中,可以通过定义一个字典来创建C…

    人工智能概览 2023年5月25日
    00
  • 亲手教你Docker Compose安装DOClever的详细过程

    下面就详细讲解“亲手教你Docker Compose安装DOClever的详细过程”。 1. 准备工作 在进行Docker Compose安装DOClever之前,需要进行一些准备工作: 1.1 下载DOClever 首先,需要下载DOClever的项目文件或者从Github上clone下来DOClever的代码。下载地址为:https://github.c…

    人工智能概览 2023年5月25日
    00
  • ORM Django 终端打印 SQL 语句实现解析

    实现Django终端打印SQL语句可以帮助我们更深入地理解Django的ORM系统,了解执行SQL语句的过程以及如何优化SQL语句。下面是步骤: 步骤1:安装django-extensions 在使用之前,需要安装django-extensions库。使用pip安装即可: pip install django-extensions 步骤2:设置Django扩…

    人工智能概论 2023年5月25日
    00
  • zookeeper概述图文详解

    Zookeeper概述图文详解 什么是Zookeeper? Zookeeper是一种开放源代码的分布式协同服务,其主要功能是维护同时多达数百个进程间的协同动作。 Zookeeper提供以下功能: 配置管理:save/update 命名服务:节点注册与查找 分布式锁 故而通常Zookeeper被作为实现其它分布式服务的基础服务,例如Hadoop、HBase等等…

    人工智能概览 2023年5月25日
    00
  • Python 就业方面的选择与应用分析

    Python 就业方面的选择与应用分析 Python是一种高级、解释性、面向对象的编程语言,具有简单、易学、易读的特点。随着大数据、人工智能等技术的兴起,Python已经成为了一门非常热门的编程语言。在接下来的内容中,我们将从Python就业选择和应用两个方面做出详细分析。 Python 就业选择分析 在选择Python作为就业方向时,需要了解以下几个方面:…

    人工智能概览 2023年5月25日
    00
  • MapReduce中ArrayWritable 使用指南

    MapReduce中ArrayWritable 使用指南 在MapReduce中,ArrayWritable是一个很有用的类,它可以帮助我们更好地处理多个数据类型的输出。本文将介绍如何使用ArrayWritable类,包括如何定义ArrayWritable子类以及如何在MapReduce中使用它。 定义ArrayWritable子类 在使用ArrayWrit…

    人工智能概览 2023年5月25日
    00
合作推广
合作推广
分享本页
返回顶部