下面是关于Python中Celery的使用的完整攻略。
1. 什么是Celery
Celery是一个基于分布式消息传递的任务队列,允许您异步地调用执行代码,作为生产者将任务委派给工作者(即消费者),以便长时间的运行任务可以在后台完成,同时允许使用者对前端进行操作。
2. 安装Celery
可以使用pip进行安装,命令如下:
pip install celery
3. Celery的基本使用
3.1 定义任务
首先,在Python中定义一个任务的方式非常简单,定义一个函数即可。例如:
from celery import Celery
app = Celery('demo', broker='redis://localhost:6379/0')
@app.task
def add(x, y):
return x + y
这里我们定义了一个名为add的Celery任务。任务的返回值是两个给定数字相加的结果。@app.task
用于将下一个函数声明为一个Celery任务。
3.2 启动Celery
celery -A proj worker -l info
在上面的命令中,“-A proj”是指项目的名称。"worker
"是Celery的工作进程,用于执行工作单元。-l info
是表示开启日志记录,只显示警告和信息。
3.3 调用任务
已定义任务后,就可以调用任务并异步地执行它。例如:
from demo import add
result = add.delay(4, 4)
print(result.get())
在这里,我们导入并调用了我们之前定义的add
函数,然后将其异步调用并传入两个参数4和4. 使用result.get()
来等待异步任务完成并获取结果。
4. 示例1:发送邮件任务
下面示例演示Celery在异步执行发送邮件任务中的应用,代码如下所示:
4.1 定义任务
# 在tasks.py或其他文件中定义
from celery import Celery
from celery.schedules import crontab
from django.conf import settings
from django.core.mail import send_mail
app = Celery('tasks', broker='pyamqp://guest@localhost//')
@app.task
def send_email(subject, message, recipient_list):
send_mail(
subject,
message,
settings.DEFAULT_FROM_EMAIL,
recipient_list,
fail_silently=False,
)
定义了一个任务 send_email
,当任务被执行时,将传递三个参数:邮件主题 subject
,邮件内容 message
,和邮件的接收人 recipient_list
。
4.2 启动Celery
celery -A proj worker -l info
4.3 调用任务
可以在你的Django视图函数或其他位置中通过以下方式来使用这个任务:
from tasks import send_email
result = send_email.delay('Test Subject', 'Test Message', ['test@example.com'])
在这里,我们使用了之前定义的 send_email
任务,我们将要发送的邮件主题、内容和接收人作为参数传递给它,并调用 .delay()
方法执行异步任务。
示例2:定时任务
下面示例演示了如何使用Celery和Flask来执行定期任务。代码如下所示:
5.1 定义任务
# 在tasks.py或其他文件中定义
import datetime
from celery import Celery
app = Celery('tasks', broker='pyamqp://guest@localhost//')
@app.task
def task():
print('Executing task at %s' % datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
在这里,我们定义了一个简单的任务 task
,在每次被调用时输出执行任务的时间戳
5.2 启动Celery
celery -A tasks beat
celery -A tasks worker
在这里,我们使用 beat
命令来启动 Celery 的定时任务调度器,然后使用 worker
命令启动 Celery 的工人进程,以便在任务队列中运行工作单元。
5.3 定义定时任务
# 在tasks.py或其他文件中定义
from celery import Celery
from celery.schedules import crontab
app = Celery('tasks', broker='pyamqp://guest@localhost//')
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(10.0, task.s(), name='add every 10')
@app.task
def task():
# ...
我们使用 app.on_after_configure.connect()
来进行定时任务的设置,调用 sender.add_periodic_task()
来定义一个循环间隔为10秒的任务。
5.4 运行任务
最后,我们只需要启动 Celery 的工作进程即可:
celery -A proj worker -l info
现在,我们每十秒都会看到打印出当前执行任务的时间戳。
到这里,一个简单的示例就完成了,它演示了Celery在使用Python编写的应用程序中的异步执行和定期任务的应用。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Python中celery的使用 - Python技术站