跳转至

FastAPI 后台任务与 Celery — 异步任务处理


一句话说明

FastAPI 内置 BackgroundTasks 处理简单后台任务(发邮件/写日志),重量级任务(生信流程/模型训练)用 Celery + Redis 做分布式任务队列,让 API 立即返回不阻塞。


安装与配置

# 内置后台任务:无需额外安装
pip install "fastapi[standard]"

# Celery + Redis(生产推荐)
pip install celery redis "celery[redis]"

# 启动 Redis(Docker 方式)
# docker run -d -p 6379:6379 redis:latest

# 启动 Celery Worker
# celery -A tasks worker --loglevel=info

内置 BackgroundTasks

from fastapi import FastAPI, BackgroundTasks
import time

app = FastAPI()

# 后台函数(普通函数,不是路由)
def send_notification(email: str, message: str):
    """发送通知邮件(模拟耗时操作)"""
    time.sleep(5)                        # 模拟发邮件耗时 5 秒
    print(f"已发送邮件至 {email}{message}")

def log_action(user_id: int, action: str):
    """记录用户操作日志"""
    with open("access.log", "a") as f:
        f.write(f"user={user_id}, action={action}\n")

# 路由中添加后台任务
@app.post("/patients/{patient_id}/analyze")
async def analyze_patient(
    patient_id: int,
    background_tasks: BackgroundTasks,  # FastAPI 自动注入
):
    # 立即返回响应(不等后台任务完成)
    background_tasks.add_task(send_notification, "doctor@hospital.com", f"分析完成:患者{patient_id}")
    background_tasks.add_task(log_action, patient_id, "analyze")

    return {"message": "分析任务已提交,完成后通知您"}  # 立即返回

Celery 分布式任务队列

定义任务

# 文件:tasks.py
from celery import Celery
import subprocess
import os

# 创建 Celery 应用(连接 Redis)
celery_app = Celery(
    "bioinf_tasks",
    broker  = "redis://localhost:6379/0",   # 任务队列
    backend = "redis://localhost:6379/1",   # 结果存储
)

# 定义任务(用 @celery_app.task 装饰)
@celery_app.task(bind=True)               # bind=True 可访问 self(任务自身)
def run_kraken2(self, sample_id: str, fastq_path: str):
    """Kraken2 物种分类(耗时任务)"""
    try:
        # 更新任务进度
        self.update_state(state="PROGRESS", meta={"step": "运行Kraken2", "progress": 10})

        # 调用 Kraken2 命令
        cmd = [
            "kraken2",
            "--db",      "/data/kraken2_db",
            "--output",  f"results/{sample_id}.kraken",
            "--report",  f"results/{sample_id}.report",
            fastq_path,
        ]
        result = subprocess.run(cmd, capture_output=True, text=True, check=True)

        self.update_state(state="PROGRESS", meta={"step": "完成", "progress": 100})
        return {"sample_id": sample_id, "status": "success", "output": result.stdout[:500]}

    except subprocess.CalledProcessError as e:
        raise self.retry(exc=e, countdown=60, max_retries=3)  # 失败重试 3 次

FastAPI 集成 Celery

# 文件:main.py
from fastapi import FastAPI
from tasks import run_kraken2, celery_app
from celery.result import AsyncResult

app = FastAPI()

@app.post("/analyze/{sample_id}")
async def submit_analysis(sample_id: str, fastq_path: str):
    """提交分析任务,立即返回任务 ID"""
    task = run_kraken2.delay(sample_id, fastq_path)  # 异步提交
    return {
        "task_id": task.id,              # 返回任务 ID
        "message": "任务已提交,用 task_id 查询进度",
    }

@app.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
    """查询任务状态和结果"""
    result = AsyncResult(task_id, app=celery_app)
    return {
        "task_id": task_id,
        "status":  result.status,        # PENDING / PROGRESS / SUCCESS / FAILURE
        "result":  result.result if result.ready() else None,   # 完成后才有结果
        "info":    result.info,          # PROGRESS 时的 meta 信息
    }

常见报错与解决

报错原因解决
ConnectionRefusedError: RedisRedis 未启动docker run -d -p 6379:6379 redis
NotRegistered: tasks.xxxCelery Worker 没加载任务模块启动时指定:-A tasks
kombu.exceptions.OperationalError网络或 broker 问题检查 broker URL 和 Redis 运行状态
BackgroundTask 失败无感知异常不传播到响应加 try/except 并写日志

速查表

操作代码
内置后台任务background_tasks.add_task(func, arg1)
定义 Celery 任务@celery_app.task def my_task(...)
提交异步任务task = my_task.delay(arg1, arg2)
查询任务状态AsyncResult(task_id).status
查询任务结果AsyncResult(task_id).result
更新进度self.update_state(state="PROGRESS", meta={...})
失败重试raise self.retry(countdown=60, max_retries=3)
启动 Workercelery -A tasks worker --loglevel=info