关于Django使用django-celery-beat动态添加定时任务的方法
Django是一个开放源代码的高层次Python Web框架。开发人员可以利用Django的许多条款和模块来开发完整的Web应用程序。而celery是Python语言使用的一个异步任务队列,它轻量级、高效,可靠,非常适用于处理高并发的异步任务。而django-celery-beat则是一个Celery Beat调度器的Django扩展,提供了在Django admin界面中管理定时任务的功能。在本攻略中,我们将讲述如何使用django-celery-beat动态添加定时任务的方法。
环境要求
在开始之前,您需要准备如下环境:
1. Python 2.7或3.4以上版本
2. Django 1.11或更高版本
3. celery 4.2.0或更高版本
4. django-celery-beat 1.4.0或更高版本
步骤一:添加django-celery-beat
在终端中运行以下命令安装django-celery-beat:
pip install django-celery-beat
然后,在您的项目的settings.py文件中添加如下配置:
INSTALLED_APPS = (
...
'django_celery_beat',
...
)
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
步骤二:创建定时任务模型
首先,在您的项目中创建一个tasks.py文件。在tasks.py文件中,定义以下两个定时任务:
from celery import shared_task
@shared_task
def task_one():
print('Task One Done.')
return 'Task One Done.'
@shared_task
def task_two():
print('Task Two Done.')
return 'Task Two Done.'
接着,在您的项目models.py文件中定义如下模型:
from django.db import models
from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule
class DynamicTask(models.Model):
name = models.CharField(max_length=50)
active = models.BooleanField(default=True)
run_at = models.DateTimeField()
interval_schedule = models.ForeignKey(IntervalSchedule, on_delete=models.CASCADE, blank=True, null=True)
crontab_schedule = models.ForeignKey(CrontabSchedule, on_delete=models.CASCADE, blank=True, null=True)
task = models.CharField(max_length=50)
在该模型中,定义了DynamicTask对象,它包含一些字段:
1. name: 任务的名称
2. active: 是否启用该任务的标志(默认为True)
3. run_at: 任务的下一次执行时间
4. interval_schedule: 任务的间隔调度(如果是周期性任务)
5. crontab_schedule: 任务的Crontab调度(如果是定时任务)
6. task: 执行任务的函数名
步骤三:添加定时任务视图和表单
接着,在您的项目views.py文件中添加以下视图和表单:
from django.shortcuts import render, redirect
from .forms import DynamicTaskForm
from .models import DynamicTask, IntervalSchedule, CrontabSchedule
from celery import current_app
from datetime import datetime
def home(request):
tasks = DynamicTask.objects.all()
return render(request, 'home.html', {'tasks': tasks})
def add_task(request):
if request.method == 'POST':
form = DynamicTaskForm(request.POST)
if form.is_valid():
task = form.save(commit=False)
task.run_at = datetime.now() # 设置任务的起始执行时间为当前时间
task.save()
current_app.conf.beat_schedule[str(
task.id)] = {'task': task.task, 'schedule': task.schedule}
return redirect('home')
else:
form = DynamicTaskForm()
return render(request, 'add_task.html', {'form': form})
from django import forms
from .models import DynamicTask, IntervalSchedule, CrontabSchedule
class DynamicTaskForm(forms.ModelForm):
interval = forms.CharField(
widget=forms.Select(choices=(
('days', 'Days'),
('hours', 'Hours'),
('minutes', 'Minutes')
))
)
week_days = forms.MultipleChoiceField(
required=False,
widget=forms.CheckboxSelectMultiple,
choices=(
(0, 'Monday'),
(1, 'Tuesday'),
(2, 'Wednesday'),
(3, 'Thursday'),
(4, 'Friday'),
(5, 'Saturday'),
(6, 'Sunday')
)
)
class Meta:
model = DynamicTask
fields = ['name', 'active', 'task', 'interval']
步骤四:添加模板
最后,在您的项目templates文件夹中创建以下HTML模板:
home.html:
{% extends 'base.html' %}
{% block content %}
<h1>List of Scheduled Tasks</h1>
<hr>
<table class="table table-striped">
<thead>
<tr>
<th>Name</th>
<th>Task</th>
<th>Active</th>
<th>Next Run</th>
<th>Actions</th>
</tr>
</thead>
<tbody>
{% for task in tasks %}
<tr>
<td>{{ task.name }}</td>
<td>{{ task.task }}</td>
<td>{{ task.active }}</td>
<td>{{ task.run_at }}</td>
<td>
<a href="/run_task/{{ task.id }}/" class="btn btn-success">Run Now</a>
<a href="/delete_task/{{ task.id }}/" class="btn btn-danger">Delete</a>
</td>
</tr>
{% endfor %}
</tbody>
</table>
<a href="{% url 'add_task' %}" class="btn btn-primary">Add Task</a>
{% endblock %}
add_task.html:
{% extends 'base.html' %}
{% block content %}
<h1>Add Task</h1>
<hr>
<form method="post" class="form">
{% csrf_token %}
{{ form.as_p }}
<button type="submit" class="btn btn-success">Save</button>
<a href="{% url 'home' %}" class="btn btn-secondary">Cancel</a>
</form>
{% endblock %}
测试
启动Celery Beat调度器,然后访问首页即可看到已经定义的所有定时任务。单击“Add Task”按钮,我们就可以添加一个新的定时任务。
例如,我们添加如下任务:
1. Task Name: 每分钟执行一次的任务
Task: tasks.task_one
Schedule: Interval(Minutes) = 1
- Task Name: 每周一下午4:30执行一次的任务
Task: tasks.task_two
Schedule: Crontab - Minute = 30,Hour = 4,Day of week = Monday
执行这两个任务有一下两种方法:
- 在模板中通过“Run Now”按钮手动启动任务并查看控制台输出
- 在Celery Beat中调度任务
综上,本攻略中详细讲述了如何使用django-celery-beat动态添加定时任务的方法,通过以上的步骤及示例,读者可以灵活地使用django-celery-beat来在Django应用程序中定时执行任务。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:关于Django使用 django-celery-beat动态添加定时任务的方法 - Python技术站