Celery 介绍

文档:http://docs.celeryproject.org/en/latest/index.html
Celery 是一个功能完备,即插即用的异步任务队列,可以独立于主进程运行,在主进程退出后,也不影响队列中的任务的执行。
任务执行异常退出,重新启动后,会继续执行队列中的其他任务,同时可以缓存停止期间接收的工作任务,这个功能依赖于消息队列(RabbitMQ, Redis等)

 

Celery 特点

简单,易于使用和维护,有丰富的文档
高效,单个Celery 进程 每分钟可以处理数百万个任务
灵活,Celery 种几乎每个部分都可以自定义扩展
Celery 非常易于集成到一些Web开发框架中

 

Celery角色

Celery clinet:这是任务生产者,它负责将任务(tasks)发送到broker中
Broker:broker 负责将任务分发给响应的Celery worker
Celery worker:这是任务的执行者,完成相应的业务逻辑,在具体实现上体现为Python 函数
实现过程
Celery 通过消息进行通信,通常使用一个叫broker(中间人)来协作client(任务的发出者)和worker(任务的处理者),client 发出消息到队列中,broker将队列中的消息发给worker 来处理
一个Celery系统可以包含很多的worker和broker,可增强横向扩展和高可用性能

 

Celery组成结构

Celery 的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成
1.消息中间件:Celery 本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成,官方推荐RabbitMQ和Redis作为消息队列
2.任务执行单元:Worker 是Celery提供的任务执行的单元,worker 并发的运行在分布式的系统节点中
3.任务结果存储:Task result store 用来存储worker 执行的任务的结果,Celery 支持以不同方式存储任务的结果 包括AMQP, Redis,memcached,MongoDB,SQLAlchemy,Django ORM等等

 

Celery 适应场景

Celery 适用异步处理问题,比如发送邮件、文件上传、图像处理等比较耗时的操作,我们可以将其加入到任务队列去异步执行,这样用户不需要等待很久,提高用户体验。
1.异步任务处理:例如给注册用户发送短信或者确认邮件的任务
2.大型任务:执行时间较长的任务,例如视频和图片处理,添加水印和转码等,需要执行任务时间长
3.定时执行的任务:支持任务的定时执行和设定时间执行,例如性能压测定时执行。

 

Celery 的实现原理

Celery基本实现原理是amqp协议的一个实现,比较耗时,耗资源的操作通过amqp协议发送到远端,让远端去处理它,在url进来,通过view返回的时候,在前端能够得到很快的响应,任务都在后台处理掉,提高用户的体验度,能够跟框架结合,执行一些可能不是立马需要结果的任务,这就是大家都用的异步任务。

任务队列中包含任务的工作单元,有专门的工作进程持续不断的监视任务队列,并从中获得新的任务并处理,是一种跨线程、跨机器工作的一种机制。

 

Celery 的在Django中的实际使用(Django1.11.11, Celery4.0.2)

1.在项目下创建具有标识Celery的Python包(脚本名),比如:celery_tasks

2.在celery_tasks 包中创建main.py,用于作为Celery的启动文件
  1)为Celery使用Django配置文件
  import os

  if not os.getenv('DJANGO_SETTINGS_MODULE'):
    os.environ['DJANGO_SETTINGS_MODULE'] = '项目名.settings'
  2)创建Celery 对象
  from celery import Celery

  app = Celery('celery_tasks') # 一般设置main的参数为脚本名
  3)加载配置文件
  app.config_from_object('celery_tasks.config')
  4)设置自动加载任务的任务名
  app.autodiscover_tasks(['celery_tasks.xxx']) # xxx为celery_tasks 下的任务名

3.在celery_tasks包中创建config.py 文件,用于保存Celery 的配置信息(推荐使用方法一RabbitMQ)
  1)RabbitMQ
    RabbitMQ是Celery的默认代理,因此除了要使用的代理实例的URL位置之外,它不需要任何其他依赖项或初始配置
    broker_url = 'amqp://myuser: mypassword@localhost:5672/myvhost'
    # 也可以在实例Celery 对象的时候指定 如:
    # app = Celery('celery_tasks',
    #       broker='amqp://myuser: mypassword@localhost:5672/myvhost',
    #       backend='rpc://'
    # )
    # backend 参数标识celery worker 执行完的结果需要保存,rpc 表示通过RPC(Remote Procedure Call)模式被送到RabbitMQ,

    # 如果不指定backend 参数,任务结果将被丢弃

   2)Redis
    broker_url = 'redis://127.0.0.1/14'
    result_backend = 'redis://127.0.0.1/15'
    # 也可以在实例Celery 对象的时候指定 如:
    # app = Celery('celery_tasks',
    #       broker='redis://127.0.0.1/14',
    #       backend='redis://127.0.0.1/15'
    # )

4.创建任务,并让Celery 检测 (检测就是执行‘2.4)’)

如:发送短信
from libs.yuntongxun.sms import CCP
from celery_tasks.main import app

# 可以设置name参数,不设置任务名默认是该函数名的路径(如:.celery_tasks.sms.send_sms_code)
# app.task装饰符告诉Celery 这个函数并不在celery client 端执行,当他们被调用时只将调用信息通过# brocker 发送给celery workers 执行

@app.task(name='send_sms_code')
def send_sms_code(mobile, sms_code):
  ccp = CCP()
  ccp.send_template_sms(mobile, [sms_code, 5], 1)

5.调用Celery任务

from celery_tasks.sms.tasks
  1)delay方法,比如:
    tasks.send_sms_code.delay(mobile, sms_code)
  2)apply_async方法,比如:
    # tasks.send_sms_code.apply_async((mobile, sms_code))
    tasks.send_sms_code.apply_async((mobile, sms_code), queue='xxx')
由此可以看出,delay方法是apply_async 简化版本,但是apply_async方法可以带非常多的配置参数,包括指定队列等,queue指定队列名称,把不同任务分配不同的队列(推荐使用delay方法)

6.执行命令,启动worker 让Celery 单独执行
celery -A Celery实例对象路径 worker -l info 

celery -A celery_tasks.main worker -l info     或    celery -A celery_tasks.main worker  --loglevel=info

7.任务的状态

PENDING : 等待
STARTED : 开始
RETRY: 重试
FAILURE: 失败
SUCCESS : 成功
如:result = tasks.send_sms_code.delay(mobile, sms_code)
通过result.state 或result.status来查看任务的状态
更多 result 方法  查看http://docs.celeryproject.org/en/latest/reference/celery.result.html#module-celery.result