关于Django使用 django-celery-beat动态添加定时任务的方法

关于Django使用django-celery-beat动态添加定时任务的方法

Django是一个开放源代码的高层次Python Web框架。开发人员可以利用Django的许多条款和模块来开发完整的Web应用程序。而celery是Python语言使用的一个异步任务队列,它轻量级、高效,可靠,非常适用于处理高并发的异步任务。而django-celery-beat则是一个Celery Beat调度器的Django扩展,提供了在Django admin界面中管理定时任务的功能。在本攻略中,我们将讲述如何使用django-celery-beat动态添加定时任务的方法。

环境要求

在开始之前,您需要准备如下环境:
1. Python 2.7或3.4以上版本
2. Django 1.11或更高版本
3. celery 4.2.0或更高版本
4. django-celery-beat 1.4.0或更高版本

步骤一:添加django-celery-beat

在终端中运行以下命令安装django-celery-beat:

pip install django-celery-beat

然后,在您的项目的settings.py文件中添加如下配置:

INSTALLED_APPS = (
    ...
    'django_celery_beat',
    ...
)

CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

步骤二:创建定时任务模型

首先,在您的项目中创建一个tasks.py文件。在tasks.py文件中,定义以下两个定时任务:

from celery import shared_task
@shared_task
def task_one():
    print('Task One Done.')
    return 'Task One Done.'

@shared_task
def task_two():
    print('Task Two Done.')
    return 'Task Two Done.'

接着,在您的项目models.py文件中定义如下模型:

from django.db import models
from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule

class DynamicTask(models.Model):
    name = models.CharField(max_length=50)
    active = models.BooleanField(default=True)
    run_at = models.DateTimeField()
    interval_schedule = models.ForeignKey(IntervalSchedule, on_delete=models.CASCADE, blank=True, null=True)
    crontab_schedule = models.ForeignKey(CrontabSchedule, on_delete=models.CASCADE, blank=True, null=True)
    task = models.CharField(max_length=50)

在该模型中,定义了DynamicTask对象,它包含一些字段:
1. name: 任务的名称
2. active: 是否启用该任务的标志(默认为True)
3. run_at: 任务的下一次执行时间
4. interval_schedule: 任务的间隔调度(如果是周期性任务)
5. crontab_schedule: 任务的Crontab调度(如果是定时任务)
6. task: 执行任务的函数名

步骤三:添加定时任务视图和表单

接着,在您的项目views.py文件中添加以下视图和表单:

from django.shortcuts import render, redirect
from .forms import DynamicTaskForm
from .models import DynamicTask, IntervalSchedule, CrontabSchedule
from celery import current_app
from datetime import datetime

def home(request):
    tasks = DynamicTask.objects.all()
    return render(request, 'home.html', {'tasks': tasks})

def add_task(request):
    if request.method == 'POST':
        form = DynamicTaskForm(request.POST)
        if form.is_valid():
            task = form.save(commit=False)
            task.run_at = datetime.now()  # 设置任务的起始执行时间为当前时间
            task.save()
            current_app.conf.beat_schedule[str(
                task.id)] = {'task': task.task, 'schedule': task.schedule}
            return redirect('home')
    else:
        form = DynamicTaskForm()
    return render(request, 'add_task.html', {'form': form})

from django import forms
from .models import DynamicTask, IntervalSchedule, CrontabSchedule

class DynamicTaskForm(forms.ModelForm):
    interval = forms.CharField(
        widget=forms.Select(choices=(
            ('days', 'Days'),
            ('hours', 'Hours'),
            ('minutes', 'Minutes')
        ))
    )

    week_days = forms.MultipleChoiceField(
        required=False,
        widget=forms.CheckboxSelectMultiple,
        choices=(
            (0, 'Monday'),
            (1, 'Tuesday'),
            (2, 'Wednesday'),
            (3, 'Thursday'),
            (4, 'Friday'),
            (5, 'Saturday'),
            (6, 'Sunday')
        )
    )

    class Meta:
        model = DynamicTask
        fields = ['name', 'active', 'task', 'interval']

步骤四:添加模板

最后,在您的项目templates文件夹中创建以下HTML模板:
home.html:

{% extends 'base.html' %}

{% block content %}
    <h1>List of Scheduled Tasks</h1>
    <hr>

    <table class="table table-striped">
        <thead>
        <tr>
            <th>Name</th>
            <th>Task</th>
            <th>Active</th>
            <th>Next Run</th>
            <th>Actions</th>
        </tr>
        </thead>

        <tbody>
        {% for task in tasks %}
            <tr>
                <td>{{ task.name }}</td>
                <td>{{ task.task }}</td>
                <td>{{ task.active }}</td>
                <td>{{ task.run_at }}</td>
                <td>
                    <a href="/run_task/{{ task.id }}/" class="btn btn-success">Run Now</a>
                    <a href="/delete_task/{{ task.id }}/" class="btn btn-danger">Delete</a>
                </td>
            </tr>
        {% endfor %}
        </tbody>
    </table>


    <a href="{% url 'add_task' %}" class="btn btn-primary">Add Task</a>

{% endblock %}

add_task.html:

{% extends 'base.html' %}

{% block content %}
    <h1>Add Task</h1>
    <hr>

    <form method="post" class="form">
        {% csrf_token %}
        {{ form.as_p }}

        <button type="submit" class="btn btn-success">Save</button>
        <a href="{% url 'home' %}" class="btn btn-secondary">Cancel</a>
    </form>

{% endblock %}

测试

启动Celery Beat调度器,然后访问首页即可看到已经定义的所有定时任务。单击“Add Task”按钮,我们就可以添加一个新的定时任务。

例如,我们添加如下任务:
1. Task Name: 每分钟执行一次的任务
Task: tasks.task_one
Schedule: Interval(Minutes) = 1

  1. Task Name: 每周一下午4:30执行一次的任务
    Task: tasks.task_two
    Schedule: Crontab - Minute = 30,Hour = 4,Day of week = Monday

执行这两个任务有一下两种方法:

  1. 在模板中通过“Run Now”按钮手动启动任务并查看控制台输出
  2. 在Celery Beat中调度任务

综上,本攻略中详细讲述了如何使用django-celery-beat动态添加定时任务的方法,通过以上的步骤及示例,读者可以灵活地使用django-celery-beat来在Django应用程序中定时执行任务。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:关于Django使用 django-celery-beat动态添加定时任务的方法 - Python技术站

(0)
上一篇 2023年5月25日
下一篇 2023年5月25日

相关文章

  • 使用tensorflow显示pb模型的所有网络结点方式

    显示pb模型的所有网络节点可以通过TensorFlow提供的工具tf.GraphDef().返回一个TensorFlow计算图的protocol buffer定义。可以通过以下步骤在Python API中使用tf.GraphDef(): 1.导入TensorFlow模块 import tensorflow as tf 2.定义待加载的pb模型文件路径。其中w…

    人工智能概论 2023年5月24日
    00
  • 使用Python编写vim插件的简单示例

    下面是使用Python编写vim插件的简单示例攻略。 1. 编写vim插件的起步 1.1 创建vim插件目录 首先,我们需要在vim的插件目录下创建一个新的文件夹,通常这个目录是~/.vim/plugins/。在这个目录下,我们新建一个文件夹,用来存放我们要编写的插件。比如,我们可以在~/.vim/plugins/目录下新建一个名为demo_plugin的文…

    人工智能概论 2023年5月25日
    00
  • Django项目uwsgi+Nginx保姆级部署教程实现

    Django项目的uwsgi+Nginx部署是一种高效而稳定的部署方式。本教程将为您提供一步步的操作说明,以实现Django项目的uwsgi+Nginx保姆级部署。 一、安装uwsgi 使用pip安装uwsgi: pip install uwsgi 使用pip安装uwsgi后,需要在Django项目的根目录下创建uwsgi配置文件,以便启动uwsgi服务。 …

    人工智能概览 2023年5月25日
    00
  • Python中if语句的使用方法及实例代码

    针对“Python中if语句的使用方法及实例代码”的完整攻略,我将按照以下几个方面进行讲解: if语句的概述:if语句是Python中最基本的流程控制语句,用于根据条件的真假执行不同的代码段。 if语句的语法:Python中if语句的语法格式如下: if 条件语句: 执行语句1 else: 执行语句2 其中,条件语句可以使用关系运算符、逻辑运算符或位运算符等…

    人工智能概论 2023年5月24日
    00
  • 利用Python通过获取剪切板数据实现百度划词搜索功能

    实现Python通过获取剪切板数据实现百度划词搜索功能,一般分为以下几个步骤: 1.安装必要的库:要实现这项任务,需要安装pyperclip和requests库。它们可以通过pip进行安装,命令如下: $ pip install pyperclip requests 2.剪切板数据获取:通过调用pyperclip库中的方法get()可以获取系统剪切板上的数据…

    人工智能概览 2023年5月25日
    00
  • vue-cli3 项目从搭建优化到docker部署的方法

    下面是详细讲解“vue-cli3 项目从搭建优化到docker部署的方法”的完整攻略。 一、搭建vue-cli3项目 搭建vue-cli3项目可以通过以下步骤: 1.安装vue-cli 在终端执行以下命令: npm install -g @vue/cli 2.创建项目 在终端执行以下命令: vue create <project-name> 3.…

    人工智能概览 2023年5月25日
    00
  • 小米miui14最新官方消息 于12月1日更新 第一批升级机型名单曝光

    小米MIUI14最新官方消息 小米官方最新消息称,MIUI14将于2021年12月1日开始陆续推送,升级覆盖范围包括MIUI全球版、中国大陆版和印度版。本次升级对于小米手机用户而言,是一次重大的升级,拥有更好的用户体验和更加完美的系统优化。 第一批升级机型名单曝光 小米官方透露了第一批升级机型名单,包括小米11、小米11 Pro、小米11 Ultra、小米1…

    人工智能概览 2023年5月25日
    00
  • centos下安装redis服务详细节介绍

    CentOS下安装Redis服务详细攻略 1. 安装Redis依赖 sudo yum update sudo yum install epel-release sudo yum install gcc sudo yum install tcl 2. 下载和解压Redis 可以从Redis官网下载最新的版本:https://redis.io/download …

    人工智能概览 2023年5月25日
    00
合作推广
合作推广
分享本页
返回顶部