【FastAPI】BackgroundTasks 后台任务

说明

可以先返回响应,然后 服务器再执行任务

FastAPI 的 BackgroundTasks 只能实现简单的后台任务,如需要开启进程等,就需要更强大的工具 Celery

使用后台任务

import uvicorn
from fastapi import FastAPI, BackgroundTasks

app = FastAPI()

# 异步 & 同步函数红豆可以
async def write_notification(email: str, message):
    with open('./log.txt', mode='w') as file:
        file.write(f'notification for {email}: {message}')

@app.post("/send-notification/{email}")
async def send_notification(
        email: str,
        background_tasks: BackgroundTasks):  # 会自动识别为 后台任务
    background_tasks.add_task(write_notification, email, message="some notification")
    return {"message": "在后台发送通知"}

if __name__ == '__main__':
    uvicorn.run(app)

mark

依赖使用后台任务

import uvicorn
from fastapi import FastAPI, BackgroundTasks, Depends

app = FastAPI()

async def write_log(email: str):
    message = "some notification"
    with open('./log.txt', mode='w') as file:
        file.write(f'notification for {email}: {message}')

# 异步 & 同步函数红豆可以
async def add_task(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(write_log, email)

@app.post("/send-notification/{email}")
async def send_notification(
        q: str = Depends(add_task)):  # 使用依赖添加后台任务
    return {"message": "在后台发送通知"}

if __name__ == '__main__':
    uvicorn.run(app)

mark

发表评论 / Comment

用心评论~