Django 如何使用 Celery 完成异步任务或定时任务

yizhihongxing

以前版本的 Celery 需要一个单独的库(django-celery)才能与 Django 一起工作, 但从 Celery 3.1 开始,情况便不再如此,我们可以直接通过 Celery 库来完成在 Django 中的任务。

安装 Redis 服务端

以 Docker 安装为例,安装一个密码为 mypassword 的 Redis 服务端

docker run -itd --name redis -p 127.0.0.1:6379:6379 redis:alpine redis-server --requirepass mypassword

在 Python 中安装 Celery 和 Redis

pip install celery redis

在 Django 项目中添加 Celery 配置

在 Django 项目中创建一个 celery.py 文件,并配置 Celery 应用程序。这个文件应该与 settings.py 文件位于同一目录下:

import os

from celery import Celery

# 设置 Django 的默认环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

app = Celery('myproject')

# 使用 Django 的 settings.py 文件配置 Celery
app.config_from_object('django.conf:settings', namespace='CELERY')

# 从所有已安装的应用中自动发现并加载任务模块
app.autodiscover_tasks()

然后在 settings.py 文件中添加配置:

# 使用 Redis 作为消息代理(broker)来传递任务消息,连接地址为 localhost:6379/0,并提供密码 mypassword 进行身份验证。
CELERY_BROKER_URL = 'redis://:mypassword@localhost:6379/0'

# 使用 Redis 作为结果存储后端,连接地址同上,使用相同的密码进行身份验证。
CELERY_RESULT_BACKEND = 'redis://:mypassword@localhost:6379/0'

# 指定发送到代理(broker)的任务消息序列化格式为 JSON 格式。
CELERY_TASK_SERIALIZER = 'json'

# 指定从结果后端获取的结果序列化格式为 JSON 格式。
CELERY_RESULT_SERIALIZER = 'json'

# 指定支持接收的内容类型为 JSON 格式。
CELERY_ACCEPT_CONTENT = ['json']

# 将时区设置为亚洲/上海时区。
CELERY_TIMEZONE = 'Asia/Shanghai'

# 启用 UTC 时间。
CELERY_ENABLE_UTC = True

在 Django 应用程序中创建一个 tasks.py 文件,并编写要运行的任务函数。例如,此处我们将编写一个名为 send_email() 的任务,来定期发送电子邮件:

from django.core.mail import send_mail
from celery import shared_task

@shared_task
def send_email():
    # 发送电子邮件的代码
    pass

如果想要实现异步任务的功能,在 Django 项目中的任何位置调用任务函数即可。例如,在 views.py 文件中,我们可以从视图函数中启动任务,如下所示:

from myapp.tasks import send_email

def my_view(request):
    send_email.delay()
    return HttpResponse('任务已经在后台执行。')

如果想要实现定时任务的功能,可以在 Celery 的配置文件中设置定时任务的调度方式。例如,要每小时运行一次 send_email() 任务,我们可以添加以下代码:

from celery.task.schedules import crontab

app.conf.beat_schedule = {
    'send-email-every-hour': {
        'task': 'myapp.tasks.send_email',
        'schedule': crontab(minute=0, hour='*/1'),
    },
}

定时任务的具体写法可以参考官方文档:https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html?highlight=crontab

运行 Celery-worker 与 Celery-beat

Celery是一个分布式任务队列,由三个主要组件组成:Celery worker、Celery beat 和消息代理(例如 Redis 或 RabbitMQ)。这些组件一起协作,让开发者能够轻松地执行异步任务和定时任务。

Celery worker:负责接收任务请求并执行任务。当您在 Django 应用程序中调用 apply_async 方法时,任务将被发送到 Celery worker,然后由 worker 执行。

Celery beat:负责调度定时任务。它会根据定义的规则定期触发任务,并将其发送到 Celery worker 处理。

所以,对于需要运行定时任务的情况,我们需要同时启动 Celery worker 和 Celery beat 进程来确保所有任务都可以被正确地处理和执行。

如果只需要使用 Celery 来执行异步任务,那么只需启动 Celery worker 即可。但如果需要周期性地执行任务,那么需要启动 Celery beat 来帮助完成调度这些任务。

# 运行 worker 与 beat
celery -A proj worker --loglevel=info --detach --pidfile=worker.pid --logfile=./logs/worker.log
celery -A proj beat --loglevel=info --detach --pidfile=beat.pid --logfile=./logs/beat.log
  • -A proj:指定 Celery 应用程序所在的模块或包,这里假设其名为 proj。
  • worker 或 beat:启动的进程名称,分别对应 worker 和 beat 两种类型的 Celery 进程。
  • --loglevel=info:设置日志级别为 info,即只记录 info 级别及以上的日志信息。
  • --detach:以守护进程(daemonized)方式启动 Celery 进程,使其在后台运行。
  • --pidfile=worker.pid 或 --pidfile=beat.pid:将进程 ID(PID)写入指定的 PID 文件,方便后续管理和监控。
  • --logfile=./logs/worker.log 或 --logfile=./logs/beat.log:指定日志文件路径,所有日志信息都会输出到该文件中。

随后我们设定的定时任务便会按规则执行,可以通过指定的日志文件查看执行结果。当我们需要停止 Celery worker 与 Celery beat 时,可以执行以下操作:

kill -TERM $(cat worker.pid)
kill -TERM $(cat beat.pid)

参考

[1] [Using Celery with Django]: https://docs.celeryq.dev/en/stable/django/first-steps-with-django.html#using-celery-with-django
[2] [Periodic Tasks]: https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html?highlight=crontab

原文链接:https://www.cnblogs.com/desireroot7/p/17352598.html

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Django 如何使用 Celery 完成异步任务或定时任务 - Python技术站

(0)
上一篇 2023年4月25日
下一篇 2023年4月25日

相关文章

  • Python Web服务器Tornado使用小结

    Python Web服务器Tornado使用小结 Tornado是一个Python Web框架,它是一个轻量级的Web服务器,具有高性能和可扩展性。Tornado支持异步I/O操作,可以处理大量的并发,适用于高并发的Web应用程序。本文将详细讲解Tornado的使用方法和注意事项,并提供两个示例来Tornado的使用过程。 Tornado的安装 在使用Tor…

    python 2023年5月14日
    00
  • python实现自动化报表功能(Oracle/plsql/Excel/多线程)

    当然,我很乐意为您讲解Python实现自动化报表功能的完整实例教程。以下是教程的详细步骤: 1. 准备工作 在开始学习和实现自动化报表功能之前,有几个准备工作需要完成。首先,需要安装Oracle数据库和PL/SQLDeveloper。其次,还需要Python编程语言的基本知识,以及对Excel文件格式的了解和掌握。 2. 连接Oracle数据库 在PL/SQ…

    python 2023年5月13日
    00
  • Python使用defaultdict解决字典默认值

    当我们使用Python自带的字典对象时,如果使用中遇到一个还未在字典中被定义的键,那么Python会抛出一个KeyError的错误。为了避免这种情况,我们需要在使用前判断键是否存在,或者事先为键设置默认值。 Python标准库中有一个collections模块,其中的defaultdict类给我们提供了设置默认值的一种简单、优雅的方法。接下来,我们将进一步解…

    python 2023年5月13日
    00
  • python编写函数注意事项总结

    Python编写函数注意事项总结 函数的命名 函数名应该有意义,清晰易懂 应该符合PEP 8规范,即使用小写字母和下划线拼接,且具备描述性 例如: # bad def func(a, b): pass # good def calculate_sum(a, b): pass 函数的文档注释 应该使用文档注释对函数进行描述,使得用户可以快速理解函数的作用和使用…

    python 2023年5月14日
    00
  • python匹配两个短语之间的字符实例

    以下是详细讲解“Python匹配两个短语之间的字符实例”的完整攻略,包括正则表达式的介绍、Python中re模块的使用、示例说明和注意事项。 正则表达式的介绍 正则表达式是一种用于匹配字符串的工具,它可以用来检查一个字符串是否符合某种模式。正则表达式通常由一些特殊字符和普通字符组成,用于描述字符串的特征。 Python中re模块的使用 在Python中可以使…

    python 2023年5月14日
    00
  • python如何建立全零数组

    建立全零数组是指在Python中创建一个所有元素都为0的数组。Python中可以使用NumPy库中的zeros方法来创建全零数组。下面我将给出详细的步骤和示例说明: 步骤一:导入NumPy库 可以使用import语句导入NumPy库: import numpy as np 步骤二:使用zeros方法创建全零数组 zeros方法可以使用一个整数参数来指定数组的…

    python 2023年6月5日
    00
  • 在Python中使用NumPy对x和y的笛卡尔乘积的二维赫米特级数进行评估,并使用三维系数阵列

    为了评估二维赫米特级数的笛卡尔乘积,我们可以使用Python中最常用的数学库之一——NumPy。下面是详细的步骤: 步骤1:导入NumPy库 import numpy as np 步骤2:生成假设的x和y的数组 x = np.array([0, 1, 2])y = np.array([3, 4, 5]) 步骤3:使用NumPy的meshgrid函数生成笛卡尔…

    python-answer 2023年3月25日
    00
  • python读取中文路径时出错(2种解决方案)

    在Python编程中,有时候我们会遇到读取中文路径时出错的问题。这通常是由于编码问题引起的。本攻略将提供解决问题的两种方法,并提供两个示例。 解决方法 以下是解决读取中文路径时出错的两种方法: os.path.abspath方法 使用os.path.join方法 使用os.path.abspath方法 我们可以使用os.path.abspath方法来解决读取…

    python 2023年5月13日
    00
合作推广
合作推广
分享本页
返回顶部