Python第三方模块apscheduler安装和基本使用
当我们需要在Python中实现定时执行任务的功能时,可以使用第三方模块 apscheduler
。 apscheduler
是一个轻量级的 Python 定时任务框架,可以支持间隔触发、周期触发和定时执行等功能。下面是 apscheduler
的安装和基本使用方法。
安装
使用 pip
包管理器安装 apscheduler
可以很容易地进行安装:
pip install apscheduler
基本使用
示例1:使用 BlockingScheduler
类执行一次性任务
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
# 定义任务
def job():
print(f"当前时间为:{datetime.datetime.now()}")
# 创建 `BlockingScheduler` 对象
scheduler = BlockingScheduler()
# 添加任务
scheduler.add_job(job, 'date', run_date=datetime.datetime.now()+datetime.timedelta(seconds=5))
# 启动任务调度器
scheduler.start()
在上面的示例中,我们首先导入 BlockingScheduler
类并引入 datetime
模块来获取当前时间。然后定义了一个 job()
函数用于输出当前的时间。
接下来,创建了一个 BlockingScheduler
对象,并调用了其 add_job()
方法来将任务添加到调度列表中。'date'
参数表示这是一次性任务。 另外, run_date
参数指定了任务的启动时间,这里我们设置了 5 秒后启动。
最后,启动任务调度器,scheduler.start()
方法将阻塞在这里。在任务执行完成时会自动退出。
示例2:使用 BackgroundScheduler
类周期性执行任务
from apscheduler.schedulers.background import BackgroundScheduler
import time
# 定义任务
def job():
print(f"当前时间为:{time.strftime('%Y-%m-%d %H:%M:%S')}")
# 创建 `BackgroundScheduler` 对象
scheduler = BackgroundScheduler()
# 添加任务,每 5 秒执行一次
scheduler.add_job(job, 'interval', seconds=5)
# 启动任务调度器
scheduler.start()
# 持续运行,保证任务可以周期性执行
while True:
time.sleep(60)
在这个示例中,我们定义了一个 job()
函数,以当前时间为参数输出信息。然后,创建了一个 BackgroundScheduler
对象,并使用其 add_job()
添加了一个周期性任务,该任务会每隔 5 秒执行一次。
在类似于 Flask 和 Django 的 Web 框架中常常使用 WSGI、Gunicorn 或 uWSGI 等服务器来运行应用程序。这些服务器可以帮助你持续运行应用程序并向客户端提供服务。在这里,我们使用一个持续运行的循环来保证 scheduler
可以周期性地执行任务,这是因为当前脚本是从终端运行的。在 Web 应用中,通常会使用类似 systemd
或 upstart
等进程控制系统运行脚本,保证系统可以持续运行。
以上就是 apscheduler
的基本用法,通过 apscheduler
可以轻松处理复杂的定时任务需求。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Python第三方模块apscheduler安装和基本使用 - Python技术站