FastAPI 后台任务与 Celery — 异步任务处理
一句话说明
FastAPI 内置 BackgroundTasks 处理简单后台任务(发邮件/写日志),重量级任务(生信流程/模型训练)用 Celery + Redis 做分布式任务队列,让 API 立即返回不阻塞。
安装与配置
# 内置后台任务:无需额外安装
pip install "fastapi[standard]"
# Celery + Redis(生产推荐)
pip install celery redis "celery[redis]"
# 启动 Redis(Docker 方式)
# docker run -d -p 6379:6379 redis:latest
# 启动 Celery Worker
# celery -A tasks worker --loglevel=info
内置 BackgroundTasks
from fastapi import FastAPI, BackgroundTasks
import time
app = FastAPI()
# 后台函数(普通函数,不是路由)
def send_notification(email: str, message: str):
"""发送通知邮件(模拟耗时操作)"""
time.sleep(5) # 模拟发邮件耗时 5 秒
print(f"已发送邮件至 {email}:{message}")
def log_action(user_id: int, action: str):
"""记录用户操作日志"""
with open("access.log", "a") as f:
f.write(f"user={user_id}, action={action}\n")
# 路由中添加后台任务
@app.post("/patients/{patient_id}/analyze")
async def analyze_patient(
patient_id: int,
background_tasks: BackgroundTasks, # FastAPI 自动注入
):
# 立即返回响应(不等后台任务完成)
background_tasks.add_task(send_notification, "doctor@hospital.com", f"分析完成:患者{patient_id}")
background_tasks.add_task(log_action, patient_id, "analyze")
return {"message": "分析任务已提交,完成后通知您"} # 立即返回
Celery 分布式任务队列
定义任务
# 文件:tasks.py
from celery import Celery
import subprocess
import os
# 创建 Celery 应用(连接 Redis)
celery_app = Celery(
"bioinf_tasks",
broker = "redis://localhost:6379/0", # 任务队列
backend = "redis://localhost:6379/1", # 结果存储
)
# 定义任务(用 @celery_app.task 装饰)
@celery_app.task(bind=True) # bind=True 可访问 self(任务自身)
def run_kraken2(self, sample_id: str, fastq_path: str):
"""Kraken2 物种分类(耗时任务)"""
try:
# 更新任务进度
self.update_state(state="PROGRESS", meta={"step": "运行Kraken2", "progress": 10})
# 调用 Kraken2 命令
cmd = [
"kraken2",
"--db", "/data/kraken2_db",
"--output", f"results/{sample_id}.kraken",
"--report", f"results/{sample_id}.report",
fastq_path,
]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
self.update_state(state="PROGRESS", meta={"step": "完成", "progress": 100})
return {"sample_id": sample_id, "status": "success", "output": result.stdout[:500]}
except subprocess.CalledProcessError as e:
raise self.retry(exc=e, countdown=60, max_retries=3) # 失败重试 3 次
FastAPI 集成 Celery
# 文件:main.py
from fastapi import FastAPI
from tasks import run_kraken2, celery_app
from celery.result import AsyncResult
app = FastAPI()
@app.post("/analyze/{sample_id}")
async def submit_analysis(sample_id: str, fastq_path: str):
"""提交分析任务,立即返回任务 ID"""
task = run_kraken2.delay(sample_id, fastq_path) # 异步提交
return {
"task_id": task.id, # 返回任务 ID
"message": "任务已提交,用 task_id 查询进度",
}
@app.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
"""查询任务状态和结果"""
result = AsyncResult(task_id, app=celery_app)
return {
"task_id": task_id,
"status": result.status, # PENDING / PROGRESS / SUCCESS / FAILURE
"result": result.result if result.ready() else None, # 完成后才有结果
"info": result.info, # PROGRESS 时的 meta 信息
}
常见报错与解决
| 报错 | 原因 | 解决 |
|---|
ConnectionRefusedError: Redis | Redis 未启动 | docker run -d -p 6379:6379 redis |
NotRegistered: tasks.xxx | Celery Worker 没加载任务模块 | 启动时指定:-A tasks |
kombu.exceptions.OperationalError | 网络或 broker 问题 | 检查 broker URL 和 Redis 运行状态 |
| BackgroundTask 失败无感知 | 异常不传播到响应 | 加 try/except 并写日志 |
速查表
| 操作 | 代码 |
|---|
| 内置后台任务 | background_tasks.add_task(func, arg1) |
| 定义 Celery 任务 | @celery_app.task def my_task(...) |
| 提交异步任务 | task = my_task.delay(arg1, arg2) |
| 查询任务状态 | AsyncResult(task_id).status |
| 查询任务结果 | AsyncResult(task_id).result |
| 更新进度 | self.update_state(state="PROGRESS", meta={...}) |
| 失败重试 | raise self.retry(countdown=60, max_retries=3) |
| 启动 Worker | celery -A tasks worker --loglevel=info |