flask 使用 flask_apscheduler 做定时循环任务的实现

下面是关于“flask 使用 flask_apscheduler 做定时循环任务的实现”的完整攻略,包含两条示例说明:

1. 安装 flask_apscheduler

在终端中输入以下命令安装 flask_apscheduler:

pip install flask_apscheduler

2. 创建 Flask 应用

在 Python 代码中引入 Flask 和 flask_apscheduler 模块,编写 Flask 应用:

from flask import Flask
from flask_apscheduler import APScheduler

app = Flask(__name__)

@app.route('/')
def hello_world():
    return 'Hello, World!'

if __name__ == '__main__':
    app.run()

其中,@app.route('/') 装饰器定义了根路由的处理函数 hello_world。

3. 创建任务函数

在刚才创建的 Flask 应用中,可以定义定时执行的任务函数。任务函数可以执行任何操作,比如发送邮件、刷新数据库等。

import datetime

def my_job():
    print('任务执行时间:', datetime.datetime.now())

scheduler = APScheduler()
scheduler.add_job(func=my_job, trigger='interval', seconds=5)
scheduler.start()

任务函数 my_job() 是一个简单的示例,每隔五秒打印出“任务执行时间”和当前时间。

add_job() 方法用于向调度器中添加任务,它接受以下参数:

  • func:要执行的任务函数;
  • trigger:任务触发方式,可以是 SimpleTrigger(一次触发)、IntervalTrigger(基于时间间隔触发)、CronTrigger(基于 CRON 表达式触发)等;
  • seconds:触发间隔时间,单位秒,可以是整数或浮点数。

4. 启动应用和任务调度器

在 Flask 应用中启动任务调度器:

if __name__ == '__main__':
    scheduler.start()
    app.run(debug=True)

调用 scheduler.start() 方法和 app.run() 方法启动任务调度器和 Flask 应用。注意,一定要在 if name == 'main' 中执行这两个方法才能保证启动成功。

示例1:使用 Flask 和 flask_apscheduler 实现定时发送邮件

下面是使用 Flask 和 flask_apscheduler 实现定时发送邮件的完整代码:

import smtplib
from email.mime.text import MIMEText
from email.header import Header
import datetime
from flask import Flask
from flask_apscheduler import APScheduler

app = Flask(__name__)

def send_email():
    # 发件人邮箱
    sender = '发件人邮箱'
    # 收件人邮箱,可以是一个列表
    receivers = ['收件人邮箱1', '收件人邮箱2']
    # 邮件主题
    subject = '邮件主题'

    # 邮件正文
    mail_body = '邮件正文'

    # SMTP 服务器地址和端口
    smtp_server = 'smtp.163.com'
    smtp_port = 25

    # 邮箱登录用户名和密码
    username = '邮箱登录用户名'
    password = '邮箱登录密码'

    # 创建一个 MIMEText 对象,发送纯文本邮件
    msg = MIMEText(mail_body, 'plain', 'utf-8')
    msg['From'] = Header(sender, 'utf-8')
    msg['To'] = Header(','.join(receivers), 'utf-8')
    msg['Subject'] = Header(subject, 'utf-8')

    # 创建一个 SMTP 对象,连接 SMTP 服务器
    smtp_obj = smtplib.SMTP(smtp_server, smtp_port)

    # 开启调试模式,可以查看 SMTP 服务器的交互信息
    smtp_obj.set_debuglevel(1)

    # 登录邮箱,发送邮件
    smtp_obj.login(username, password)
    smtp_obj.sendmail(sender, receivers, msg.as_string())
    smtp_obj.quit()

    print('邮件发送成功:', datetime.datetime.now())

scheduler = APScheduler()
scheduler.add_job(func=send_email, trigger='interval', minutes=1, id='send_email')
scheduler.start()

if __name__ == '__main__':
    scheduler.start()
    app.run(debug=True)

定时发送邮件的任务函数 send_email() 中主要的操作包括设置发件人、收件人、SMTP 服务器等信息,创建 MIMEText 对象,连接 SMTP 服务器并登录邮箱发送邮件。

在任务调度器中通过 scheduler.add_job() 方法来循环执行 send_email() 方法,每隔 1 分钟执行一次。

示例2:使用 Flask 和 flask_apscheduler 实现定时更新数据库

下面是使用 Flask 和 flask_apscheduler 实现定时更新数据库的完整代码:

import sqlite3
import datetime
from flask import Flask
from flask_apscheduler import APScheduler

app = Flask(__name__)
app.config['DATABASE'] = 'test.db'

def init_db():
    with app.app_context():
        db = get_db()
        db.execute('CREATE TABLE IF NOT EXISTS tasks (id INTEGER PRIMARY KEY AUTOINCREMENT, content TEXT NOT NULL, start_time TEXT NOT NULL)')
        db.commit()

def get_db():
    if 'db' not in g:
        g.db = sqlite3.connect(app.config['DATABASE'], detect_types=sqlite3.PARSE_DECLTYPES)
        g.db.row_factory = sqlite3.Row
    return g.db

def write_db(content):
    with app.app_context():
        db = get_db()
        now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        db.execute('INSERT INTO tasks (content, start_time) VALUES (?, ?)', (content, now))
        db.commit()

def update_db():
    content = '定时更新任务:' + datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    write_db(content)
    print('任务执行时间:', datetime.datetime.now())

scheduler = APScheduler()
scheduler.add_job(func=update_db, trigger='interval', seconds=10, id='update_db')
scheduler.start()

if __name__ == '__main__':
    with app.app_context():
        init_db()
    scheduler.start()
    app.run(debug=True)

定时更新数据库的任务函数 update_db() 中主要的操作包括获取当前时间并写入数据库中,使用 scheduler.add_job() 方法来循环执行 update_db() 方法,每隔 10 秒执行一次。

Flask 应用中还定义了两个数据库操作函数:init_db() 和 write_db()。init_db() 用于初始化数据库,write_db() 用于将数据写入到数据库中。

注意,由于 Flask 应用会启动多个线程,因此需要使用 app_context() 函数创建应用的上下文,并在函数中将数据库连接对象赋值给 g.db,确保每个线程使用的都是同一个数据库连接。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:flask 使用 flask_apscheduler 做定时循环任务的实现 - Python技术站

(0)
上一篇 2023年5月16日
下一篇 2023年5月16日

相关文章

  • 带你用Python实现Saga 分布式事务的方法

    首先我们先来介绍什么是Saga分布式事务。 Saga分布式事务简介 Saga是目前一种常用的分布式事务解决方案,它弥补了传统两阶段提交协议2PC存在的一些问题,比如性能瓶颈、可扩展性差等问题。 Saga解决方案的核心思想就是将一个大的分布式事务进一步拆分成多个子事务,并将这些子事务串联成一条事务流程,即Saga流程,以完成整个分布式事务。每个子事务完成时都会…

    Flask 2023年5月16日
    00
  • CentOS7部署Flask(Apache、mod_wsgi、Python36、venv)

    下面是详细讲解 “CentOS7部署Flask(Apache、mod_wsgi、Python36、venv)” 的完整攻略。 环境准备 CentOS7 服务器系统; 安装 Apache Web 服务器; 安装 Python3.6 版本; 安装 mod_wsgi Apache 模块; 在系统上创建一个 Python3 的虚拟环境; Flask 应用程序开发 在…

    Flask 2023年5月15日
    00
  • python全栈要学什么 python全栈学习路线

    Python全栈是指掌握从前端到后端开发中所有技术的开发者,下面是Python全栈学习路线的完整攻略及示例说明。 前端开发 基础HTML、CSS和JavaScript概念 HTML:超文本标记语言(HyperText Markup Language)是一种用来描述网页的语言。 CSS:层叠样式表(Cascading Style Sheets)用于控制网页的布…

    Flask 2023年5月15日
    00
  • flask-socketio实现WebSocket的方法

    下面是详细讲解“flask-socketio实现WebSocket的方法”的完整攻略,包含两条示例说明。 简介 WebSocket是基于HTTP协议的TCP连接,它能够在客户端和服务端之间实现真正的实时双向通信。而flask-socketio是Flask框架下一个用于实现WebSocket的库,它能够帮助我们方便、快捷地实现WebSocket通信。 步骤 第…

    Flask 2023年5月15日
    00
  • 如何使用Flask-Migrate拓展数据库表结构

    使用Flask-Migrate拓展数据库表结构的步骤如下: 安装Flask-Migrate 在终端或命令行输入以下命令:pip install Flask-Migrate 配置Flask-Migrate 在Flask应用程序中,导入Flask-Migrate扩展并初始化它。使用以下代码创建一个migrate对象: “`python from flask_m…

    Flask 2023年5月16日
    00
  • Python Type Hints 学习之从入门到实践

    下面是详细讲解“Python Type Hints 学习之从入门到实践”的完整攻略: Python Type Hints 学习之从入门到实践 什么是 Python Type Hints Python 从 3.5 版本开始引入了 Type Hints 的概念,它是一种用于标注函数、变量、类等对象类型的注释。Python 并不会在运行时对其进行强制校验,但是可以…

    Flask 2023年5月16日
    00
  • Flask-Sqlalchemy的基本使用详解

    下面是关于”Flask-Sqlalchemy的基本使用详解”的完整攻略,包括两个示例说明。 什么是Flask-Sqlalchemy Flask-Sqlalchemy是Flask框架中的一个扩展,其提供了对SQLAlchemy ORM的集成支持。其主要提供了以下功能: 方便地在Flask应用程序中使用数据库。 管理数据库模型,自动生成SQL语句。 安装Flas…

    Flask 2023年5月15日
    00
  • Flask框架路由和视图用法实例分析

    Flask框架路由和视图用法实例分析 Flask是一种使用Python编写的Web开发框架。Flask框架能够帮助我们快速构建Web应用程序。在Flask框架中,我们需要关注的一些关键概念包括路由(routing)、视图(views)、模板(templates)和表单(forms)。在本文中,我将详细介绍Flask框架中的路由和视图的用法,并提供两个完整的代…

    Flask 2023年5月16日
    00
合作推广
合作推广
分享本页
返回顶部