celery的介绍和使用

celery介绍

celery是一个异步任务框架,它可以执行异步任务、延迟任务、定时任务

异步任务框架简述:

1)celery可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket)
2)celery服务为为其他项目服务提供异步解决任务需求的
注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求

Celery架构

Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和 任务执行结果存储(task result store)组成。

image

消息中间件

Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等

任务执行单元

Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

任务结果存储

Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

使用场景

异步执行:解决耗时任务,将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等

延迟执行:解决延迟任务

定时执行:解决周期(周期)任务,比如每天数据统计

Celery的安装配置

pip install celery gevent

windows环境下启动celery

# module为python module名
celery -A <module> worker -l info -P gevent

celery执行异步任务

采用包架构封装(多任务结构)

在项目文件project下新建一个celery包

project
    ├── celery_task  	# celery包
    │   ├── __init__.py # 包文件
    │   ├── celery.py   # celery连接和配置相关文件,且名字必须叫celery.py
    │   └── tasks.py    # 所有任务函数
    ├── add_task.py  	# 添加任务
    └── get_result.py   # 获取结果

celery.py

# 导入模块
from celery import Celery
# 指定任务中间件和任务处理仓库为redis里的第一个库,后面1表示第一个,2表示第二个。
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
# backend='redis://:123456@127.0.0.1:6379/1' redis加密码写法
# include列表里表示celery_task包里的task.py交给app处理
app = Celery(__name__,broker=broker, backend=backend, include=['celery_task.tasks'])

task.py

from .celery import app
import time
@app.task #添加该装饰器表示这个函数任务使用了celery框架的异步处理
def add(n, m):
    print(n)
    print(m)
    time.sleep(10)
    print('n+m的结果:%s' % (n + m))
    return n + m

@app.task
def low(n, m):
    print(n)
    print(m)
    print('n-m的结果:%s' % (n - m))
    return n - m

add_task.py
这个py文件是用来添加任务的,可以建在任意位置

from celery_task import tasks

# 添加立即执行任务
t1 = tasks.add.delay(10, 20)
t2 = tasks.low.delay(100, 50)
print(t1) # 获取的就是这个任务的id号


# 添加延迟任务
from datetime import datetime, timedelta
eta=datetime.utcnow() + timedelta(seconds=10)
tasks.low.apply_async(args=(200, 50), eta=eta)

get_result.py
这个py文件是用来获取任务处理的结果的

from celery_task.celery import app

from celery.result import AsyncResult

id = '21325a40-9d32-44b5-a701-9a31cc3c74b5' #输入任务的id号
if __name__ == '__main__':
    async = AsyncResult(id=id, app=app)
    if async.successful(): #如果这个任务执行成功
        result = async.get() #得到结果
        print(result)
    elif async.failed():
        print('任务失败')
    elif async.status == 'PENDING':
        print('任务等待中被执行')
    elif async.status == 'RETRY':
        print('任务异常后正在重试')
    elif async.status == 'STARTED':
        print('任务已经开始被执行')

celery执行延迟任务

在add_task.py里书写:

# 添加延迟任务
from datetime import datetime, timedelta
# datetime.utcnow()表示获取当前的utc时间,timedelta(seconds=10)表示加10秒timedelta括号里之后是时间类型,可以相加减
eta=datetime.utcnow() + timedelta(seconds=10)
# 使用的是apply_async这个方法,tasks.add这个是函数名,args括号里是函数add需要传的参数,eta必须是utc时间
tasks.add.apply_async(args=(200, 50), eta=eta)

celery执行定时任务

将celery.py添加以下定时配置

# 导入模块
from celery import Celery
# 指定任务中间件和任务处理仓库为redis里的第一个库,后面1表示第一个,2表示第二个。
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
# backend='redis://:123456@127.0.0.1:6379/1' redis加密码写法
# include列表里表示celery_task包里的task.py交给app处理
app = Celery(__name__,broker=broker, backend=backend, include=['celery_task.tasks'])


# 定时任务配置
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False

# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
    'add-task': {
        'task': 'celery_task.tasks.add',  # celery_task.tasks.low这个是你函数任务的位置,task是固定的
        'schedule': timedelta(seconds=3), # 每隔三秒执行一次
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        'args': (300, 150), # 函数任务需要传入的参数
    }
}

然后在终端下cd到scripts文件夹

执行celery -A celery_task beat
表示开始定时发布任务

celery在django中的使用

需要在celery.py中添加以下代码:

# 加载django环境
import os
import django
# luffyapi.settings.dev 这个是项目的配置文件位置
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffyapi.settings.dev")
django.setup()

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:celery的介绍和使用 - Python技术站

(0)
上一篇 2023年4月2日 下午4:30
下一篇 2023年4月2日

相关文章

  • bbs项目前期准备和表设计

    一、前期准备 1.新建一个django项目 2.连接mysql数据库(注意需要在init文件里面书写import pymysql),告诉django使用pymysql连接数据库 3.静态文件路径在settings里配置一下,并且在项目文件夹下新建一个静态文件夹 4.将需要用到bootstrap的css和js文件添加到static文件夹内 二、bbs项目表设计…

    Python开发 2023年4月2日
    00
  • django中视图函数的FBV和CBV

    1.什么是FBV和CBV FBV是指视图函数以普通函数的形式;CBV是指视图函数以类的方式。 2.普通FBV形式 def index(request): return HttpResponse(‘index’) 3.CBV形式 3.1 CBV形式的路由 path(r’^login/’,views.MyLogin.as_view()) 3.2 CBV形式的视图…

    Python开发 2023年4月2日
    00
  • 一切皆对象和深浅拷贝

    1.元类 元类的来源是:python中一切皆对象。 1.1 什么是元类 元类就是用来实例化产生类的类 关系:元类—实例化—类(自定义的类)—实例化—-对象(obj) 1.2如何查看内置的元类 1.type是内置的元类2.我们用class关键字定义出来的所有类以及内置的类都是由内置的元类type实例化产生的 例如:在python中int、dic…

    2023年4月2日
    00
  • 序列化类高级用法之source、SerializerMethodField和断言assert

    序列化类高级用法之source 使用source,字段参数,可以修改序列化字段名字 原本序列化器中字段名,必须和表中的字段名一样,不一样会报错 我们可以通过source字段来改变序列化器中的字段名,使得前端在展示的时候也修改一下字段名!! source也可以做跨表查询,通过外键字段,表名点外键出去的字段名字 class BookSerializer(seri…

    2023年4月2日
    00
  • 面向对象高级–反射、内置方法和元类

    1.反射 1.1什么是反射 python是一门动态语言,而反射机制被视为动态语言的关键! 反射机制指的是:在程序的运行过程中,动态的获取程序的信息和对象的功能! ‘动态’:指一开始不知道程序的信息和对象的功能,只有等到运行到那的时候才会动态获取!!! 比如:x=18 在程序运行的时候,python才会通过反射机制动态的获取到这个值是整型,并不需要一开始定义的…

    Python开发 2023年4月2日
    00
  • 小程序用户和登录页面展示

    用户页面wxml <!–pages/home/home.wxml–> <view class=”container”> <view class=”top-view”> <view class=”user”> <view class=”row”> <image class=”avatar” …

    Python开发 2023年4月2日
    00
  • 基于tcp协议的套接字通信

    1、套接字socket简介 Socket是应用层与TCP/UDP协议通信的中间软件抽象层,它充当一种接口的角色!封装了传输层以下的东西。 1.1基于tcp的socket通信流程图 2.tcp服务端搭建 需求:模拟两个手机的通话 from ipaddress import IPv4Address import socket # 1.买手机 # socket.A…

    2023年4月2日
    00
  • 编程语言的介绍

    1. 什么是编程语言 编程语言,其实就是一种人和计算机进行沟通所需要的介质、工具。就像英语是中国人用来与外国人沟通的工具。 2.什么是编程 编程指的是:人类通过编程语言,把想要计算机做的事,写到文件中,编程的结果就是这一堆文件,这些文件就是程序。 3. 为什么要编程 计算机就像是奴隶,人类通过编程去奴役计算机,从而使计算机完成人类想要完成的任务,解放人的劳动…

    2023年4月2日
    00
合作推广
合作推广
分享本页
返回顶部