flask 使用 flask_apscheduler 做定时循环任务的实现

下面是关于“flask 使用 flask_apscheduler 做定时循环任务的实现”的完整攻略,包含两条示例说明:

1. 安装 flask_apscheduler

在终端中输入以下命令安装 flask_apscheduler:

pip install flask_apscheduler

2. 创建 Flask 应用

在 Python 代码中引入 Flask 和 flask_apscheduler 模块,编写 Flask 应用:

from flask import Flask
from flask_apscheduler import APScheduler

app = Flask(__name__)

@app.route('/')
def hello_world():
    return 'Hello, World!'

if __name__ == '__main__':
    app.run()

其中,@app.route('/') 装饰器定义了根路由的处理函数 hello_world。

3. 创建任务函数

在刚才创建的 Flask 应用中,可以定义定时执行的任务函数。任务函数可以执行任何操作,比如发送邮件、刷新数据库等。

import datetime

def my_job():
    print('任务执行时间:', datetime.datetime.now())

scheduler = APScheduler()
scheduler.add_job(func=my_job, trigger='interval', seconds=5)
scheduler.start()

任务函数 my_job() 是一个简单的示例,每隔五秒打印出“任务执行时间”和当前时间。

add_job() 方法用于向调度器中添加任务,它接受以下参数:

  • func:要执行的任务函数;
  • trigger:任务触发方式,可以是 SimpleTrigger(一次触发)、IntervalTrigger(基于时间间隔触发)、CronTrigger(基于 CRON 表达式触发)等;
  • seconds:触发间隔时间,单位秒,可以是整数或浮点数。

4. 启动应用和任务调度器

在 Flask 应用中启动任务调度器:

if __name__ == '__main__':
    scheduler.start()
    app.run(debug=True)

调用 scheduler.start() 方法和 app.run() 方法启动任务调度器和 Flask 应用。注意,一定要在 if name == 'main' 中执行这两个方法才能保证启动成功。

示例1:使用 Flask 和 flask_apscheduler 实现定时发送邮件

下面是使用 Flask 和 flask_apscheduler 实现定时发送邮件的完整代码:

import smtplib
from email.mime.text import MIMEText
from email.header import Header
import datetime
from flask import Flask
from flask_apscheduler import APScheduler

app = Flask(__name__)

def send_email():
    # 发件人邮箱
    sender = '发件人邮箱'
    # 收件人邮箱,可以是一个列表
    receivers = ['收件人邮箱1', '收件人邮箱2']
    # 邮件主题
    subject = '邮件主题'

    # 邮件正文
    mail_body = '邮件正文'

    # SMTP 服务器地址和端口
    smtp_server = 'smtp.163.com'
    smtp_port = 25

    # 邮箱登录用户名和密码
    username = '邮箱登录用户名'
    password = '邮箱登录密码'

    # 创建一个 MIMEText 对象,发送纯文本邮件
    msg = MIMEText(mail_body, 'plain', 'utf-8')
    msg['From'] = Header(sender, 'utf-8')
    msg['To'] = Header(','.join(receivers), 'utf-8')
    msg['Subject'] = Header(subject, 'utf-8')

    # 创建一个 SMTP 对象,连接 SMTP 服务器
    smtp_obj = smtplib.SMTP(smtp_server, smtp_port)

    # 开启调试模式,可以查看 SMTP 服务器的交互信息
    smtp_obj.set_debuglevel(1)

    # 登录邮箱,发送邮件
    smtp_obj.login(username, password)
    smtp_obj.sendmail(sender, receivers, msg.as_string())
    smtp_obj.quit()

    print('邮件发送成功:', datetime.datetime.now())

scheduler = APScheduler()
scheduler.add_job(func=send_email, trigger='interval', minutes=1, id='send_email')
scheduler.start()

if __name__ == '__main__':
    scheduler.start()
    app.run(debug=True)

定时发送邮件的任务函数 send_email() 中主要的操作包括设置发件人、收件人、SMTP 服务器等信息,创建 MIMEText 对象,连接 SMTP 服务器并登录邮箱发送邮件。

在任务调度器中通过 scheduler.add_job() 方法来循环执行 send_email() 方法,每隔 1 分钟执行一次。

示例2:使用 Flask 和 flask_apscheduler 实现定时更新数据库

下面是使用 Flask 和 flask_apscheduler 实现定时更新数据库的完整代码:

import sqlite3
import datetime
from flask import Flask
from flask_apscheduler import APScheduler

app = Flask(__name__)
app.config['DATABASE'] = 'test.db'

def init_db():
    with app.app_context():
        db = get_db()
        db.execute('CREATE TABLE IF NOT EXISTS tasks (id INTEGER PRIMARY KEY AUTOINCREMENT, content TEXT NOT NULL, start_time TEXT NOT NULL)')
        db.commit()

def get_db():
    if 'db' not in g:
        g.db = sqlite3.connect(app.config['DATABASE'], detect_types=sqlite3.PARSE_DECLTYPES)
        g.db.row_factory = sqlite3.Row
    return g.db

def write_db(content):
    with app.app_context():
        db = get_db()
        now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        db.execute('INSERT INTO tasks (content, start_time) VALUES (?, ?)', (content, now))
        db.commit()

def update_db():
    content = '定时更新任务:' + datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    write_db(content)
    print('任务执行时间:', datetime.datetime.now())

scheduler = APScheduler()
scheduler.add_job(func=update_db, trigger='interval', seconds=10, id='update_db')
scheduler.start()

if __name__ == '__main__':
    with app.app_context():
        init_db()
    scheduler.start()
    app.run(debug=True)

定时更新数据库的任务函数 update_db() 中主要的操作包括获取当前时间并写入数据库中,使用 scheduler.add_job() 方法来循环执行 update_db() 方法,每隔 10 秒执行一次。

Flask 应用中还定义了两个数据库操作函数:init_db() 和 write_db()。init_db() 用于初始化数据库,write_db() 用于将数据写入到数据库中。

注意,由于 Flask 应用会启动多个线程,因此需要使用 app_context() 函数创建应用的上下文,并在函数中将数据库连接对象赋值给 g.db,确保每个线程使用的都是同一个数据库连接。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:flask 使用 flask_apscheduler 做定时循环任务的实现 - Python技术站

(0)
上一篇 2023年5月16日
下一篇 2023年5月16日

相关文章

  • flask中使用蓝图将路由分开写在不同文件实例解析

    在Flask中使用蓝图将路由分开写在不同文件的过程如下: 创建蓝图对象 在Flask应用程序实例化后,我们首先需要创建一个蓝图对象,来管理我们将要拆分的路由和视图函数。我们可以在自己的代码文件中导入蓝图并创建实例: from flask import Blueprint bp = Blueprint(‘example’, __name__) 此时,bp就是我…

    Flask 2023年5月16日
    00
  • python 解决flask uwsgi 获取不到全局变量的问题

    一、问题描述 在Flask应用中,有时我们需要定义一些全局变量,比如数据库连接、缓存对象等等。而当使用uWSGI运行Flask应用时,有时会出现获取不到全局变量的情况。 这是因为uWSGI采用了多进程的方式运行应用,不同进程间的内存空间是独立的,因此在一个进程中定义的全局变量在另一个进程中是无法访问的。这种情况下,我们需要采用一些特殊的方式来解决该问题。 二…

    Flask 2023年5月16日
    00
  • Flask web上传获取图像Image读取并使用方式

    下面我将详细讲解 Flask web上传获取图像Image读取并使用方式的完整攻略,包含两条示例说明。 Flask Web上传获取图像并读取 在 Flask Web 应用程序中,最简单的上传图像的方法就是使用 Python 的 werkzeug 库中的 FileStorage 对象。可以在 HTML 表单中添加 file 类型的 input,然后在 Flas…

    Flask 2023年5月16日
    00
  • Flask-Mail用法实例分析

    下面我来为您讲解Flask-Mail用法实例分析。本篇攻略分为两个部分,分别是Flask-Mail的基本用法和常见功能示例。接下来我将逐一介绍。 一、Flask-Mail的基本用法 Flask-Mail是一个用于在Flask应用程序中发送电子邮件的扩展。它提供了发送邮件所需的所有功能,并且易于使用。下面介绍Flask-Mail最常用的三个功能。 1.配置邮件…

    Flask 2023年5月15日
    00
  • js实现录音上传功能

    下面我会为你详细讲解如何使用JS实现录音上传功能。 背景介绍 录音上传功能是一种常见的Web应用程序功能,它可以使用户在Web端录制音频并将其上传到服务器上。这种功能可以用于许多应用,比如在线音乐教育、在线语音识别、在线语音聊天等等。 实现录音上传功能需要使用Web开发中的一种技术,Web Audio API。Web Audio API提供了一个丰富、强大的…

    Flask 2023年5月16日
    00
  • 使用Django和Flask获取访问来源referrer

    获取访问来源referrer是一个很有用的功能,它可以让我们查看访问者是从哪个页面跳转而来。在Django和Flask中,获取referrer的方法也是不同的,下面我会分别提供两个完整的攻略来实现这个功能。 Django中获取referrer的方法 Django中获取referrer的方法比较简单,我们直接在视图函数中获取request.META属性中的HT…

    Flask 2023年5月16日
    00
  • Python安装Flask环境及简单应用示例

    下面是关于“Python安装Flask环境及简单应用示例”的完整攻略。 安装 Python 首先需要安装Python。推荐下载Python 3.6或3.7的稳定版本。 官网下载地址:https://www.python.org/downloads 在安装时,请注意勾选“Add Python to PATH”(将Python添加到环境变量中)选项。 安装和配置…

    Flask 2023年5月15日
    00
  • JQuery异步post上传表单数据标准化模板

    JQuery异步post上传表单数据标准化模板是一种常用的前端技术。本攻略将详细讲解此过程,并提供两条示例说明。具体步骤如下: 一、设置请求 url 和 data 请求 url 可以指向一个后台处理请求的页面。 data 是现有表单的序列化数据和其他要提交的数据的对象。对象的主要属性应与表单中的输入字段的“name”属性匹配。 二、设置异步ajax请求 设置…

    Flask 2023年5月16日
    00
合作推广
合作推广
分享本页
返回顶部