下面是Python Flask实现后台任务轻松构建高效API应用的攻略:
简介
Python Flask是一个用于构建Web应用程序和API的轻量级框架。被广泛应用于开发RESTful API。此外,Python Flask中还提供了轻便的异步任务队列库,可以方便地实现后台任务。将后台任务和API结合使用,可以更加高效地构建API应用程序。
步骤
第一步:安装Flask和Celery
安装Flask:在终端中输入以下命令:
pip install flask
安装Celery:在终端中输入以下命令:
pip install celery
第二步:创建Flask应用
from flask import Flask
app = Flask(__name__)
@app.route("/")
def index():
return "Hello, World!"
if __name__ == '__main__':
app.run(debug=True)
这段代码创建一个简单的Flask应用,路由"/"返回"Hello, World!"。将这段代码保存为app.py
。
第三步:创建Celery任务
from celery import Celery
celery = Celery('tasks', broker='pyamqp://guest@localhost//')
@celery.task
def add(x, y):
return x + y
这段代码创建一个Celery任务add
,其实现了两个数字的相加操作。将这段代码保存为tasks.py
。
第四步:在Flask应用中调用Celery任务
from tasks import add
from flask import Flask
app = Flask(__name__)
@app.route("/")
def index():
result = add.delay(2, 3)
return "Result: {}".format(result.get())
if __name__ == '__main__':
app.run(debug=True)
这段代码在app.py
中调用了tasks.py
中定义的add
任务。在路由"/"中,调用add.delay(2, 3)
启动异步任务,然后使用result.get()
等待并获取异步任务的结果。将这段代码保存为app.py
。
第五步:启动Celery Worker
在终端中输入以下命令,启动Celery Worker:
celery -A tasks worker --loglevel=info
第六步:启动Flask应用
在终端中输入以下命令,启动Flask应用:
python app.py
示例1:异步发送邮件
from flask_mail import Mail, Message
from celery import Celery
app = Flask(__name__)
app.config['MAIL_SERVER']='smtp.gmail.com'
app.config['MAIL_PORT'] = 465
app.config['MAIL_USERNAME'] = 'sender@gmail.com'
app.config['MAIL_PASSWORD'] = 'your-password'
app.config['MAIL_USE_TLS'] = False
app.config['MAIL_USE_SSL'] = True
mail = Mail(app)
celery = Celery('tasks', broker='pyamqp://guest@localhost//')
@celery.task
def send_email(subject, recipients, body):
with app.app_context():
msg = Message(subject=subject, recipients=recipients)
msg.body = body
mail.send(msg)
@app.route("/")
def index():
send_email.delay('Hello from Flask', ['recipient@gmail.com'], 'This is a test email sent asynchronously via Celery!')
return "Email sent successfully!"
if __name__ == '__main__':
app.run(debug=True)
这段代码创建了一个Flask应用和一个Celery任务send_email
,用于异步发送邮件。在路由"/"中,调用send_email.delay
启动异步任务,异步发送邮件。邮箱的相关信息需要自行修改。将这段代码保存为app.py
。
示例2:异步处理图像
import requests
from io import BytesIO
from PIL import Image
from celery import Celery
from flask import Flask, send_file
app = Flask(__name__)
celery = Celery('tasks', broker='pyamqp://guest@localhost//')
@celery.task
def process_image(url):
response = requests.get(url)
img = Image.open(BytesIO(response.content))
img.thumbnail((300, 300))
result = BytesIO()
img.save(result, format='JPEG')
result.seek(0)
return result
@app.route("/")
def index():
result = process_image.delay('https://i.imgur.com/B301j7T.jpg')
return send_file(result.get(), mimetype='image/jpeg')
if __name__ == '__main__':
app.run(debug=True)
这段代码创建了一个Flask应用和一个Celery任务process_image
,用于异步处理图像。在路由"/"中,调用process_image.delay
启动异步任务,下载并加工一张图片,然后返回图片的流数据。将这段代码保存为app.py
。
以上就是Python Flask实现后台任务轻松构建高效API应用的完整攻略。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Python Flask实现后台任务轻松构建高效API应用 - Python技术站