当我们需要在Django项目中使用定时任务时,通常会选择Celery作为任务队列。Celery支持使用crontab表达式或固定时间间隔来设定定时任务,可实现灵活的定时任务管理。在某些情况下,我们需要支持动态设置定时任务,即在运行时可以动态添加、修改和删除定时任务。本文将详细讲解如何使用Celery实现动态设置定时任务。
环境准备
在使用Celery的过程中,需要安装以下软件包:
- Redis或者RabbitMQ等消息代理
- Celery以及其依赖库
若使用Redis作为任务队列,则需要安装redis-server软件包,并启动redis服务。若使用RabbitMQ作为任务队列,则需要安装rabbitmq-server软件包并启动rabbitmq服务。以上软件包均可以使用操作系统自身软件包管理器下载安装。
在安装完上述依赖库和软件包后,我们需要在Django项目中安装Celery,使用pip安装即可:
pip install celery
为了能够在Django项目中运行Celery,需要项目中新建一个celery.py文件,定义应用程序和Celery的实例。并且在Django项目的settings.py中添加必要的配置,如下所示:
# celery.py
import os
from celery import Celery
# 设置环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
# 创建celery实例
app = Celery('myproject')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks()
# settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE='Asia/Shanghai'
# 任务模块的路径
CELERY_IMPORTS = [
'myapp.tasks',
]
在代码中,我们首先从系统环境变量中获取Django项目的配置文件,然后使用config_from_object方法和autodiscover_tasks方法初始化Celery实例。其中,app参数设置的是应用程序的名称。配置文件中的CELERY_BROKER_URL参数指定了任务队列使用的消息代理。
动态设置定时任务
Celery中默认支持使用crontab表达式或固定时间间隔来定义定时任务。例如,我们使用以下代码可以定义一个每10秒执行一次的任务:
from celery.task.schedules import crontab
from celery.decorators import periodic_task
@periodic_task(run_every=crontab(minute='*', second=10))
def my_periodic_task():
print('执行定时任务')
以上代码定义了一个名为my_periodic_task的定时任务,该任务使用crontab表达式定时运行,每个10秒执行一次。此时,无法对任务进行修改。
要支持动态设置定时任务,我们需要手动创建任务对象并添加到Celery实例中。创建任务对象的代码如下所示:
from celery import shared_task
from celery.schedules import crontab
from datetime import datetime, timedelta
from celery.task import Task
from celery.utils.log import get_task_logger
logger = get_task_logger("my_task")
class DynamicScheduleTask(Task):
def __init__(self, task_obj):
self.task_obj = task_obj
self.run_every = crontab(
minute=task_obj.minute,
hour=task_obj.hour,
day_of_month=task_obj.day_of_month,
month_of_year=task_obj.month_of_year,
day_of_week=task_obj.day_of_week
)
def run(self, *args, **kwargs):
logger.info(f"{datetime.now()} - Running: {self.task_obj.task_name}")
self.task_obj.task_func(*self.task_obj.args, **self.task_obj.kwargs)
def get_task_name(self):
return self.task_obj.task_name
def schedule_task(task_obj):
task = DynamicScheduleTask(task_obj)
app.conf.beat_schedule[task_obj.task_name] = {
'task': task.get_task_name(),
'schedule': task.run_every
}
以上代码中,我们通过继承Task类创建一个DynamicScheduleTask类。该类接受一个task_obj参数,该参数是一个自定义类,用于存储任务名称、任务函数、定时规则和任务参数等信息。在DynamicScheduleTask类中,我们动态创建run_every属性,使用crontab对象定义定时规则。在run方法中,任务函数通过task_func属性动态调用。最终,我们创建schedule_task函数,将DynamicScheduleTask对象添加到Celery实例的任务调度中。
在使用以上代码之前,需要定义task_obj类,来存放我们的任务信息,如下所示:
class TaskObj(object):
def __init__(self, task_name, task_func, **kwargs):
self.task_name = task_name
self.task_func = task_func
self.args = kwargs.get('args', [])
self.kwargs = kwargs.get('kwargs', {})
self.minute = kwargs.get('minute', '*')
self.hour = kwargs.get('hour', '*')
self.day_of_month = kwargs.get('day_of_month', '*')
self.month_of_year = kwargs.get('month_of_year', '*')
self.day_of_week = kwargs.get('day_of_week', '*')
在以上代码中,我们创建了一个TaskObj类,使用该类存储任务名称、任务函数、定时规则和任务参数等信息。
接下来,我们可以使用schedule_task函数动态设置定时任务:
# tasks.py
from myapp.celery import app
from myapp.models import MyTask, TaskResult
from datetime import datetime
import time
@app.task
def my_task():
print('执行任务')
def example_dynamic_schedule_task():
my_task_obj = TaskObj(
'my_task',
my_task,
args=[],
kwargs={}
)
schedule_task(my_task_obj)
def example_modify_dynamic_schedule_task():
# 修改任务的执行时间,每30秒执行一次
task = app.conf.beat_schedule['my_task']
task['schedule'].minute = '*/30'
在以上代码中,我们定义了一个my_task任务函数,在执行该任务函数时,会打印一条测试信息。我们首先调用example_dynamic_schedule_task函数,动态创建一个名为my_task的定时任务,每分钟执行一次。接着,我们调用example_modify_dynamic_schedule_task函数,修改my_task定时任务的执行时间为每30秒执行一次。
总结
通过Celery,我们可以支持灵活的定时任务管理。本文讲解了如何使用Celery实现动态设置定时任务,适用于需要在运行时动态添加、修改和删除定时任务的场景,提高了系统的灵活性和可扩展性。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:celery实现动态设置定时任务 - Python技术站