Python并行分布式框架Celery详解
Celery是一个Python编写的开源的并行分布式任务队列框架,可以用于开发高并发、处理大量数据并且需要异步处理的系统。它提供了一些强大的特性,比如任务调度、并行处理、周期性执行、定时任务和分布式任务等。本文将详细介绍Celery的使用,并提供两个示例来说明其应用。
安装Celery
Celery的安装非常简单,可以使用pip进行安装:pip install celery
创建Celery实例
在使用Celery之前,我们需要先创建一个实例,这个实例在整个应用中被用来调用Celery任务。我们可以使用Celery
类来创建一个Celery实例:
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
上面的代码中,我们创建了一个叫做“tasks”的Celery实例,并设置了Broker为Redis。Redis是一个非常流行且高效的消息队列,也是Celery的默认消息代理。
定义任务
在Celery中,任务是一个Python函数,使用了Celery的装饰器来标识任务,下面是一个示例代码:
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def add(x, y):
return x+y
上面的代码中,我们定义了一个名为“add”的任务,它使用了@app.task
装饰器来标识。该任务的参数为x
和y
,任务函数的返回值为x+y
。现在我们已经定义了一个Celery任务,可以使用Celery实例来调用它。
调用任务
要调用一个Celery任务,我们需要使用apply_async
方法,该方法接受我们之前定义的任务函数作为参数,并传递它的参数。下面是一个调用示例代码:
from tasks import add
# 调用任务,并传入参数
result = add.apply_async(args=[1,2])
# 获取执行结果
print(result.get())
上面的代码中,我们首先从tasks.py中导入我们之前定义的add任务,然后调用该任务,并传入参数,最后使用get
方法获取任务的执行结果。
任务状态
Celery提供了获取任务状态的方法,我们可以使用AsyncResult
类来获取任务状态:
from tasks import add
# 调用任务,并传递参数
result = add.apply_async(args=[1,2])
# 等待任务执行完毕
while not result.ready():
pass
# 获取任务执行结果
if result.successful():
print(result.get())
else:
print(result.traceback)
上面的代码中,我们首先调用了add任务,并传递参数,然后使用一个while循环来等待任务执行完毕。在等待期间,我们可以做一些其他的事情。等任务执行完毕后,我们使用result.successful()
方法来判断任务是否执行成功,并使用result.get()
方法获取任务的执行结果,或使用result.traceback
获取任务的错误信息。
示例1:使用Celery处理大量数据
下面是一个示例,我们使用Celery来处理大量的数据:
from celery import Celery
import time
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def process_data(data):
# 模拟处理数据
time.sleep(5)
return "Processed: {}".format(data)
if __name__ == '__main__':
# 生成数据
data = range(1000)
# 批量处理数据
result = []
for d in data:
result.append(process_data.delay(d))
# 等待任务执行完毕
for r in result:
print(r.get())
上面的代码中,我们定义了一个名为process_data的任务,它的作用是模拟处理大量的数据。在代码中,我们生成了1000条数据,并使用delay
方法调用process_data
任务来处理数据。注意到在循环中,我们使用append
方法将任务返回的AsyncResult对象加入到了一个数组中,并使用get
方法等待任务执行完成并获取结果。
示例2:使用Celery分布式处理任务
下面是一个示例,我们使用Celery分布式处理任务:
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def add(x, y):
return x+y
if __name__ == '__main__':
# 调用add任务
result = add.delay(1,2)
# 获取任务执行结果
print(result.get())
上述代码中,我们部署了两个Celery worker,一个运行在本地,另一个运行在远程服务器上。我们使用delay
方法调用add任务,并传递参数。当我们运行这个示例时,两个worker都会去执行add任务,并返回执行结果。此时,我们可以使用get
方法获取任务的执行结果。
在分布式任务处理中,Celery会自动选择可用的worker来执行任务,并根据消息传递协议对任务进行管理和调度。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Python并行分布式框架Celery详解 - Python技术站