跳转至

614 Python Celery 异步任务队列

一句话概述:Celery 是 Python 的分布式任务队列,把耗时操作(发邮件、数据处理、报告生成)从主应用剥离到后台执行,让用户不用等,是 Python 后端的"后台工人"。

核心知识点速查表

知识点说明
最新版本Celery 5.6.x(2026年)
Python版本3.9+(5.7将要求3.10+)
消息代理Redis(推荐)、RabbitMQ、Amazon SQS
结果后端Redis、数据库、Memcached
定时任务Celery Beat
监控工具Flower

一、安装配置

# 安装 Celery + Redis 支持
pip install "celery[redis]"            # 安装 Celery 及 Redis 依赖

# 确保 Redis 在运行
docker run -d --name redis -p 6379:6379 redis:latest

1.1 基本配置

# celery_app.py
from celery import Celery  # 导入 Celery 类

# 创建 Celery 应用
app = Celery(
    'myapp',                           # 应用名称
    broker='redis://localhost:6379/0',  # 消息代理(Redis 数据库0)
    backend='redis://localhost:6379/1'  # 结果存储(Redis 数据库1)
)

# 配置
app.conf.update(
    task_serializer='json',            # 任务序列化格式
    result_serializer='json',          # 结果序列化格式
    accept_content=['json'],           # 接受的内容类型
    timezone='Asia/Shanghai',          # 时区
    task_track_started=True,           # 跟踪任务开始状态
    task_time_limit=300,               # 单个任务最大执行时间(秒)
    worker_max_tasks_per_child=100,    # 每个 worker 进程处理100个任务后重启(防内存泄漏)
)

二、基本使用

2.1 定义任务

# tasks.py
from celery_app import app  # 导入 Celery 应用
import time

@app.task(bind=True, max_retries=3)    # bind=True 让任务能访问 self
def send_email(self, to, subject, body):
    """发送邮件的异步任务"""
    try:
        print(f"正在发送邮件给 {to}...")
        time.sleep(3)                  # 模拟耗时操作
        print(f"邮件发送成功: {subject}")
        return {'status': 'sent', 'to': to}
    except Exception as exc:
        # 重试机制(白话:失败了自动重试,每次等更长时间)
        raise self.retry(
            exc=exc,
            countdown=60 * (self.request.retries + 1)  # 60s, 120s, 180s
        )

@app.task
def process_data(data_list):
    """数据处理任务"""
    results = []
    for item in data_list:
        result = item * 2              # 模拟处理
        results.append(result)
    return results

@app.task
def add(x, y):
    """简单加法任务(演示用)"""
    return x + y

2.2 调用任务

# 调用方式1:delay(最简单)
result = send_email.delay('user@example.com', '测试', '你好')
# 立即返回 AsyncResult 对象,不阻塞

# 调用方式2:apply_async(更多控制)
result = send_email.apply_async(
    args=['user@example.com', '测试', '你好'],
    countdown=60,                      # 60秒后执行
    expires=3600,                      # 1小时后过期
    queue='email'                      # 指定队列
)

# 获取结果
print(result.id)                       # 任务ID
print(result.status)                   # 状态:PENDING/STARTED/SUCCESS/FAILURE
print(result.ready())                  # 是否完成
print(result.get(timeout=10))          # 等待结果(最多等10秒)

# 重要提醒:不要在 Web 请求中调用 result.get()!
# 这会阻塞 Web 服务器。应该返回 task_id 给前端,前端轮询状态。

2.3 启动 Worker

# 启动 Celery Worker(白话:启动后台工人,开始干活)
celery -A celery_app worker --loglevel=info

# 多进程模式(利用多核CPU)
celery -A celery_app worker --concurrency=4 --loglevel=info

# 指定队列
celery -A celery_app worker -Q email,default --loglevel=info

三、高级用法

3.1 任务链与工作流(Canvas)

from celery import chain, group, chord  # 导入工作流原语

# chain:任务链(前一个任务的结果传给下一个)
# 白话:流水线,一步一步来
workflow = chain(
    add.s(1, 2),                       # s() = signature,延迟执行
    add.s(3),                          # 上一步结果(3) + 3 = 6
    add.s(4)                           # 6 + 4 = 10
)
result = workflow.apply_async()

# group:并行执行多个任务
# 白话:同时干多件事
job = group([
    add.s(1, 2),
    add.s(3, 4),
    add.s(5, 6)
])
results = job.apply_async()

# chord:先并行,再汇总
# 白话:大家先各干各的,全完成后汇总结果
callback = add.s()                     # 汇总任务
job = chord([add.s(1, 2), add.s(3, 4)], callback)

3.2 定时任务(Celery Beat)

# celery_app.py 中添加定时任务配置
from celery.schedules import crontab

app.conf.beat_schedule = {
    # 每30分钟执行一次
    'check-status-every-30-min': {
        'task': 'tasks.check_system_status',  # 任务函数路径
        'schedule': 1800.0,            # 间隔秒数
    },
    # 每天早上9点执行
    'daily-report': {
        'task': 'tasks.generate_report',
        'schedule': crontab(hour=9, minute=0),  # cron 表达式
        'args': ('daily',),            # 任务参数
    },
    # 每周一执行
    'weekly-cleanup': {
        'task': 'tasks.cleanup_old_data',
        'schedule': crontab(hour=2, minute=0, day_of_week='monday'),
    },
}
# 启动 Beat 调度器
celery -A celery_app beat --loglevel=info

# 同时启动 Worker 和 Beat
celery -A celery_app worker --beat --loglevel=info

3.3 与 FastAPI 集成

from fastapi import FastAPI
from tasks import send_email

app = FastAPI()

@app.post("/send-email")
async def api_send_email(to: str, subject: str):
    """API 调用异步任务"""
    task = send_email.delay(to, subject, "邮件内容")  # 异步发送
    return {"task_id": task.id, "status": "queued"}  # 立即返回

@app.get("/task/{task_id}")
async def get_task_status(task_id: str):
    """查询任务状态"""
    from celery.result import AsyncResult
    result = AsyncResult(task_id)
    return {
        "task_id": task_id,
        "status": result.status,
        "result": result.result if result.ready() else None
    }

3.4 监控(Flower)

pip install flower                     # 安装 Flower 监控工具
celery -A celery_app flower --port=5555  # 启动 Web 监控界面
# 访问 http://localhost:5555 查看任务状态、Worker状态、成功/失败率

四、常见报错与解决

4.1 连接代理失败

kombu.exceptions.OperationalError: Connection refused

解决:确认 Redis/RabbitMQ 在运行,检查 broker URL 配置。

4.2 任务超时

celery.exceptions.TimeLimitExceeded

解决:增大 task_time_limit 或优化任务处理时间。

4.3 任务结果丢失

原因:未配置 result backend 或结果已过期 解决:配置 backend 参数,设置 result_expires = 3600


五、速查表

操作命令
启动Workercelery -A app worker -l info
启动Beatcelery -A app beat -l info
启动Flowercelery -A app flower
查看活跃任务celery -A app inspect active
清空队列celery -A app purge
查看注册任务celery -A app inspect registered

六、同类工具对比

特性CeleryDramatiqHueyARQ
成熟度最成熟成熟轻量轻量
复杂度极低
Canvas工作流支持支持不支持不支持
定时任务BeatAPScheduler内置内置
异步支持有限支持不支持原生async
适合场景大型项目中型项目小型项目async项目

选型建议:Python 大型项目选 Celery(生态最好);追求简单选 Dramatiq;小项目选 Huey。


参考资料Celery 官方文档 | GitHub | Real Python 教程