Python定时任务框架APScheduler安装使用详解
一、概述
APScheduler是Python的一个开源的任务调度框架,可以用来执行定时任务、循环任务、一次性任务等。
APScheduler支持多种存储模式,并且提供了灵活的RESTful API和WebSocket接口,可以实现与其他服务进行交互。同时,APScheduler是跨平台和可扩展的,可用于各种应用场景。
二、安装
使用pip安装APScheduler,如下所示:
pip install apscheduler
三、使用方法
1.创建定时任务:
from apscheduler.schedulers.blocking import BlockingScheduler
def job():
print("I'm working...")
scheduler = BlockingScheduler()
# 每隔5秒钟执行一次任务
scheduler.add_job(job, trigger='interval', seconds=5)
scheduler.start()
这个例子中,我们定义了一个job函数,在函数内部打印"I'm working..."。然后使用BlockingScheduler来创建一个调度器对象,然后使用add_job方法来添加任务。我们设置了任务的触发方式为每隔5秒钟,然后调用start方法启动任务调度器。
2.一次性任务:
from apscheduler.schedulers.background import BackgroundScheduler
def job():
print("I'm working...")
scheduler = BackgroundScheduler()
# 在2018-03-30 15:45:00执行一次任务
scheduler.add_job(job, 'date', run_date='2018-03-30 15:45:00')
scheduler.start()
这个例子中,我们同样定义了一个job函数,在函数内部打印"I'm working..."。然后使用BackgroundScheduler来创建一个调度器对象,然后使用add_job方法来添加任务。我们设置了任务的触发方式为一次性,在2018-03-30 15:45:00执行一次,然后调用start方法启动任务调度器。
四、总结
APScheduler是一个非常优秀的Python任务调度框架,可以帮助我们快速实现任务调度,提高开发效率。如果有需要实现定时任务等的场景,APScheduler是一个绝佳的选择。
五、示例
示例1:每天定时发送一封邮件
import smtplib
import datetime
import time
from email.mime.text import MIMEText
from email.header import Header
from apscheduler.schedulers.blocking import BlockingScheduler
# 发件人和收件人信息
sender = 'sender@example.com'
receivers = ['recipient1@example.com', 'recipient2@example.com']
# 邮件信息
message = MIMEText('这是一封测试邮件,请勿回复', 'plain', 'utf-8')
message['From'] = Header('测试邮件', 'utf-8')
message['To'] = Header('收件人', 'utf-8')
subject = '测试邮件'
message['Subject'] = Header(subject, 'utf-8')
# 发送邮件
def send_mail():
try:
smtpObj = smtplib.SMTP_SSL('smtp.example.com')
smtpObj.login(sender, 'password')
smtpObj.sendmail(sender, receivers, message.as_string())
print('邮件发送成功')
except smtplib.SMTPException as e:
print('邮件发送失败:', e)
# 创建任务调度对象
scheduler = BlockingScheduler()
# 定时任务
scheduler.add_job(send_mail, 'cron', hour=9)
scheduler.start()
这个例子中,我们使用了APScheduler和smtplib来实现每天定时发送一封邮件的功能。我们创建了一个send_mail函数来实现发送邮件的功能,然后使用BlockingScheduler创建任务调度对象,设置任务的触发方式为每天9点钟通过cron表达式来实现。
示例2:循环任务
from apscheduler.schedulers.background import BackgroundScheduler
def job():
print("I'm working...")
scheduler = BackgroundScheduler()
# 每隔5秒钟执行一次任务
scheduler.add_job(job, trigger='interval', seconds=5)
scheduler.start()
try:
while True:
time.sleep(2)
except(KeyboardInterrupt, SystemExit):
scheduler.shutdown()
这个例子中,我们同样定义了一个job函数,在函数内部打印"I'm working..."。使用BackgroundScheduler创建调度器对象,设置任务的触发方式为每隔5秒钟。最后创建一个while循环,调用time.sleep方法来保证程序不退出。当我们手动中断程序时,调用scheduler.shutdown方法来关闭调度器对象,此时程序退出。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Python定时任务框架APScheduler安装使用详解 - Python技术站