下面是详细讲解Python使用Celery实现异步任务执行的完整攻略。
Celery 简介
Celery 是一个 Python 分布式任务队列,在异步执行任务和调度任务方面表现得非常优秀。它通常被用来处理高负载负责耗时的任务,例如邮件发送、数据处理等。Celery 是一个开源的分布式任务队列,使用 Python 编写。它基于消息传递,并允许您通过任务队列和工作进程来异步执行代码。
Celery 的工作原理基于四个主要组件:任务、任务队列、消息代理(broker)和工作者进程。生产者发送任务到队列中,消费者从队列中获取任务并执行,并将结果返回给消费者。
Celery 的安装
在安装 Celery 之前,需要先安装 RabbitMQ 作为消息代理。安装完 RabbitMQ 之后,可以使用 pip 安装 Celery:
pip install celery
创建 Celery 应用
在创建 Celery 应用之前,需要配置 Celery 的参数。例如指定任务队列和工作进程数量,以及指定序列化方式、消息代理等。
from celery import Celery
app = Celery('my_task',
broker='amqp://localhost',
backend='rpc://',
include=['my_task.tasks'])
app.conf.update(
task_serializer='json',
result_serializer='json',
accept_content=['json'],
timezone='Asia/Shanghai',
worker_prefetch_multiplier=1,
worker_concurrency=1,
task_default_queue='default_queue',
task_default_exchange='default_exchange',
task_default_routing_key='default_queue',
task_acks_late=True,
)
以上代码中创建了一个名字叫 my_task 的 Celery 应用,并指定了消息代理、任务序列化方式、时区等参数。
编写 Celery 任务
在创建 Celery 应用之后,可以编写任务。任何可调用的 Python 函数都可以成为 Celery 任务,只需要将其加上 @app.task
的装饰器即可:
from celery import Celery
app = Celery('my_task',
broker='amqp://localhost',
backend='rpc://',
include=['my_task.tasks'])
app.conf.update(
task_serializer='json',
result_serializer='json',
accept_content=['json'],
timezone='Asia/Shanghai',
worker_prefetch_multiplier=1,
worker_concurrency=1,
task_default_queue='default_queue',
task_default_exchange='default_exchange',
task_default_routing_key='default_queue',
task_acks_late=True,
)
@app.task
def add(x, y):
return x + y
以上代码中定义了一个名为 add 的任务,它接受两个参数并返回它们的和。
调用 Celery 任务
在定义好任务之后,就可以使用 Celery 调用它了。可以使用 delay() 方法异步调用任务:
from my_task.tasks import add
result = add.delay(4, 4)
以上代码中异步调用了 add 任务,并将调用结果赋给了 result 变量。
示例说明
下面通过两个示例说明 Celery 的使用。
示例一
假设有一个需要对一批数据进行处理的任务,但数据量非常大,无法一次性处理完毕,因此可以使用 Celery 分批执行任务。通过调用异步任务,每次处理一批数据,可以轻松地实现数据处理。
from celery import Celery
app = Celery('my_task',
broker='amqp://localhost',
backend='rpc://',
include=['my_task.tasks'])
app.conf.update(
task_serializer='json',
result_serializer='json',
accept_content=['json'],
timezone='Asia/Shanghai',
worker_prefetch_multiplier=1,
worker_concurrency=1,
task_default_queue='default_queue',
task_default_exchange='default_exchange',
task_default_routing_key='default_queue',
task_acks_late=True,
)
@app.task
def process_data(data):
# 这里是数据处理逻辑
pass
# 分批处理数据
for batch_data in batches(data, 100):
process_data.delay(batch_data)
以上代码中,使用 Celery 异步执行了数据处理任务,每次处理 100 条数据,通过批量处理可以轻松地完成数据处理任务。
示例二
假设有一个需要发送邮件的任务,但邮件发送时间比较长,因此可以使用 Celery 异步执行邮件发送任务。
from celery import Celery
from my_task.email import send_email
app = Celery('my_task',
broker='amqp://localhost',
backend='rpc://',
include=['my_task.tasks'])
app.conf.update(
task_serializer='json',
result_serializer='json',
accept_content=['json'],
timezone='Asia/Shanghai',
worker_prefetch_multiplier=1,
worker_concurrency=1,
task_default_queue='default_queue',
task_default_exchange='default_exchange',
task_default_routing_key='default_queue',
task_acks_late=True,
)
@app.task
def send_email_task(subject, body, to):
send_email(subject, body, to)
send_email_task.delay('Hello, World!', 'This is a test email.', 'test@example.com')
以上代码中,异步执行了发送邮件任务,并通过 send_email_task.delay()
方法异步调用了该任务,同时将邮件的参数传递给该任务。
这就是 Celery 的使用和示例,希望对你有所帮助。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:python使用celery实现异步任务执行的例子 - Python技术站