Python Celery动态添加定时任务生产实践指南
什么是Celery
Celery 是一个基于 Python 实现的分布式任务队列,用于处理大量的异步任务。Celery 可以让你的应用程序分布式地运行,而不必担心每个任务在哪台机器上运行。Celery 提供了简单易用的 API,可以让我们将代码实现成一个异步任务,并且能够在多个 worker 中执行,支持同时处理数以百万计的任务。
安装Celery
安装Celery可以使用pip直接安装,具体步骤如下:
pip install celery
编写Celery任务
编写一个Celery任务非常简单,只需要以下步骤:
- 定义一个函数,将它装饰为
@app.task
。
from celery import Celery
app = Celery('tasks', broker='pyamqp://guest@localhost//')
@app.task
def add(x, y):
return x + y
- 启动
Celery worker
celery -A tasks worker --loglevel=info
Celery动态添加定时任务
我们可以使用 celery beat
来动态添加定时任务。celery beat
是 Celery 提供的一个定时任务调度程序,它可以帮助我们在指定的时间运行任务。
以下是一个简单示例,定时每分钟打印一段文字。
from celery import Celery
from celery.schedules import crontab
app = Celery('tasks', broker='pyamqp://guest@localhost//')
@app.task
def print_hello():
print("Hello, world!")
app.conf.beat_schedule = {
'print-hello-every-minute': {
'task': 'tasks.print_hello',
'schedule': crontab(),
},
}
上面的代码中,我们定义了一个 print_hello
的任务,并将它安排到每分钟运行一次。这个任务会输出 "Hello, world!"。
接下来,我们需要启动 Celery worker 和 celery beat:
# 启动 celery worker
celery -A tasks worker --loglevel=info
# 启动 celery beat
celery -A tasks beat --loglevel=info
然后就可以每分钟看到一次 "Hello, world!" 了。
Celery动态添加定时任务生产实践
在生产环境中,我们需要动态地添加、删除、修改定时任务。以下是一个示例,演示如何使用 Django 中的模型动态添加定时任务。
-
创建一个 Django app,命名为
mytasks
。 -
在
mytasks/models.py
中定义一个Task
模型:
from django.db import models
class Task(models.Model):
name = models.CharField(max_length=255)
task = models.CharField(max_length=255)
schedule = models.CharField(max_length=255)
- 在
mytasks/tasks.py
文件中编写任务代码:
from celery import Celery
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
app = Celery('mytasks', broker='pyamqp://guest@localhost//')
@app.task
def run_task(task_id):
try:
task = Task.objects.get(id=task_id)
logger.info('[%s] start' % task.name)
# 执行任务代码
logger.info('[%s] finish' % task.name)
except Task.DoesNotExist:
logger.error('Task does not exist')
这个任务会从数据库中查找之前创建的 Task
实例,执行任务代码,然后记录日志。
- 在
mytasks/__init__.py
中添加以下代码:
from mytasks.tasks import app as celery_app
__all__ = ['celery_app']
这么做是为了让 django 通过这个 app 自动发现我们的任务代码。
- 修改
settings.py
,添加以下配置:
INSTALLED_APPS = [
# 其他app
'mytasks',
]
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
CELERY_BEAT_SCHEDULE = {
'run-task-once-a-minute': {
'task': 'mytasks.tasks.run_task',
'schedule': crontab(minute='*/1'),
'args': (1, ),
},
}
这里,我们指定了 CELERY_BEAT_SCHEDULER
,告诉 Celery 要使用 Django 的数据库来管理定时任务。
使用 CELERY_BEAT_SCHEDULE
,我们可以定义每个 Task
的定时配置,任务执行时需要的参数,等等。
- 在 Django shell 中创建一个任务:
from mytasks.models import Task
from celery.schedules import crontab
task = Task.objects.create(name='Task 1', task='mytasks.tasks.run_task', schedule=crontab(minute='*/2'))
创建一个名为 "Task 1" 的任务,定时每两分钟执行。
- 启动 Celery worker 和 celery beat:
# 启动 celery worker
celery -A mysite worker --loglevel=info
# 启动 celery beat
celery -A mysite beat --loglevel=info
- 执行
run_task
,我们将在启动一分钟后看到日志。
到这里,我们就演示了如何使用 Django 中的模型动态添加定时任务,以及如何使用Celery调度器来执行这些任务。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Python Celery动态添加定时任务生产实践指南 - Python技术站