下面是关于“flask 使用 flask_apscheduler 做定时循环任务的实现”的完整攻略,包含两条示例说明:
1. 安装 flask_apscheduler
在终端中输入以下命令安装 flask_apscheduler:
pip install flask_apscheduler
2. 创建 Flask 应用
在 Python 代码中引入 Flask 和 flask_apscheduler 模块,编写 Flask 应用:
from flask import Flask
from flask_apscheduler import APScheduler
app = Flask(__name__)
@app.route('/')
def hello_world():
return 'Hello, World!'
if __name__ == '__main__':
app.run()
其中,@app.route('/') 装饰器定义了根路由的处理函数 hello_world。
3. 创建任务函数
在刚才创建的 Flask 应用中,可以定义定时执行的任务函数。任务函数可以执行任何操作,比如发送邮件、刷新数据库等。
import datetime
def my_job():
print('任务执行时间:', datetime.datetime.now())
scheduler = APScheduler()
scheduler.add_job(func=my_job, trigger='interval', seconds=5)
scheduler.start()
任务函数 my_job() 是一个简单的示例,每隔五秒打印出“任务执行时间”和当前时间。
add_job() 方法用于向调度器中添加任务,它接受以下参数:
- func:要执行的任务函数;
- trigger:任务触发方式,可以是 SimpleTrigger(一次触发)、IntervalTrigger(基于时间间隔触发)、CronTrigger(基于 CRON 表达式触发)等;
- seconds:触发间隔时间,单位秒,可以是整数或浮点数。
4. 启动应用和任务调度器
在 Flask 应用中启动任务调度器:
if __name__ == '__main__':
scheduler.start()
app.run(debug=True)
调用 scheduler.start() 方法和 app.run() 方法启动任务调度器和 Flask 应用。注意,一定要在 if name == 'main' 中执行这两个方法才能保证启动成功。
示例1:使用 Flask 和 flask_apscheduler 实现定时发送邮件
下面是使用 Flask 和 flask_apscheduler 实现定时发送邮件的完整代码:
import smtplib
from email.mime.text import MIMEText
from email.header import Header
import datetime
from flask import Flask
from flask_apscheduler import APScheduler
app = Flask(__name__)
def send_email():
# 发件人邮箱
sender = '发件人邮箱'
# 收件人邮箱,可以是一个列表
receivers = ['收件人邮箱1', '收件人邮箱2']
# 邮件主题
subject = '邮件主题'
# 邮件正文
mail_body = '邮件正文'
# SMTP 服务器地址和端口
smtp_server = 'smtp.163.com'
smtp_port = 25
# 邮箱登录用户名和密码
username = '邮箱登录用户名'
password = '邮箱登录密码'
# 创建一个 MIMEText 对象,发送纯文本邮件
msg = MIMEText(mail_body, 'plain', 'utf-8')
msg['From'] = Header(sender, 'utf-8')
msg['To'] = Header(','.join(receivers), 'utf-8')
msg['Subject'] = Header(subject, 'utf-8')
# 创建一个 SMTP 对象,连接 SMTP 服务器
smtp_obj = smtplib.SMTP(smtp_server, smtp_port)
# 开启调试模式,可以查看 SMTP 服务器的交互信息
smtp_obj.set_debuglevel(1)
# 登录邮箱,发送邮件
smtp_obj.login(username, password)
smtp_obj.sendmail(sender, receivers, msg.as_string())
smtp_obj.quit()
print('邮件发送成功:', datetime.datetime.now())
scheduler = APScheduler()
scheduler.add_job(func=send_email, trigger='interval', minutes=1, id='send_email')
scheduler.start()
if __name__ == '__main__':
scheduler.start()
app.run(debug=True)
定时发送邮件的任务函数 send_email() 中主要的操作包括设置发件人、收件人、SMTP 服务器等信息,创建 MIMEText 对象,连接 SMTP 服务器并登录邮箱发送邮件。
在任务调度器中通过 scheduler.add_job() 方法来循环执行 send_email() 方法,每隔 1 分钟执行一次。
示例2:使用 Flask 和 flask_apscheduler 实现定时更新数据库
下面是使用 Flask 和 flask_apscheduler 实现定时更新数据库的完整代码:
import sqlite3
import datetime
from flask import Flask
from flask_apscheduler import APScheduler
app = Flask(__name__)
app.config['DATABASE'] = 'test.db'
def init_db():
with app.app_context():
db = get_db()
db.execute('CREATE TABLE IF NOT EXISTS tasks (id INTEGER PRIMARY KEY AUTOINCREMENT, content TEXT NOT NULL, start_time TEXT NOT NULL)')
db.commit()
def get_db():
if 'db' not in g:
g.db = sqlite3.connect(app.config['DATABASE'], detect_types=sqlite3.PARSE_DECLTYPES)
g.db.row_factory = sqlite3.Row
return g.db
def write_db(content):
with app.app_context():
db = get_db()
now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
db.execute('INSERT INTO tasks (content, start_time) VALUES (?, ?)', (content, now))
db.commit()
def update_db():
content = '定时更新任务:' + datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
write_db(content)
print('任务执行时间:', datetime.datetime.now())
scheduler = APScheduler()
scheduler.add_job(func=update_db, trigger='interval', seconds=10, id='update_db')
scheduler.start()
if __name__ == '__main__':
with app.app_context():
init_db()
scheduler.start()
app.run(debug=True)
定时更新数据库的任务函数 update_db() 中主要的操作包括获取当前时间并写入数据库中,使用 scheduler.add_job() 方法来循环执行 update_db() 方法,每隔 10 秒执行一次。
Flask 应用中还定义了两个数据库操作函数:init_db() 和 write_db()。init_db() 用于初始化数据库,write_db() 用于将数据写入到数据库中。
注意,由于 Flask 应用会启动多个线程,因此需要使用 app_context() 函数创建应用的上下文,并在函数中将数据库连接对象赋值给 g.db,确保每个线程使用的都是同一个数据库连接。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:flask 使用 flask_apscheduler 做定时循环任务的实现 - Python技术站