614 Python Celery 异步任务队列¶
一句话概述:Celery 是 Python 的分布式任务队列,把耗时操作(发邮件、数据处理、报告生成)从主应用剥离到后台执行,让用户不用等,是 Python 后端的"后台工人"。
核心知识点速查表¶
| 知识点 | 说明 |
|---|---|
| 最新版本 | Celery 5.6.x(2026年) |
| Python版本 | 3.9+(5.7将要求3.10+) |
| 消息代理 | Redis(推荐)、RabbitMQ、Amazon SQS |
| 结果后端 | Redis、数据库、Memcached |
| 定时任务 | Celery Beat |
| 监控工具 | Flower |
一、安装配置¶
# 安装 Celery + Redis 支持
pip install "celery[redis]" # 安装 Celery 及 Redis 依赖
# 确保 Redis 在运行
docker run -d --name redis -p 6379:6379 redis:latest
1.1 基本配置¶
# celery_app.py
from celery import Celery # 导入 Celery 类
# 创建 Celery 应用
app = Celery(
'myapp', # 应用名称
broker='redis://localhost:6379/0', # 消息代理(Redis 数据库0)
backend='redis://localhost:6379/1' # 结果存储(Redis 数据库1)
)
# 配置
app.conf.update(
task_serializer='json', # 任务序列化格式
result_serializer='json', # 结果序列化格式
accept_content=['json'], # 接受的内容类型
timezone='Asia/Shanghai', # 时区
task_track_started=True, # 跟踪任务开始状态
task_time_limit=300, # 单个任务最大执行时间(秒)
worker_max_tasks_per_child=100, # 每个 worker 进程处理100个任务后重启(防内存泄漏)
)
二、基本使用¶
2.1 定义任务¶
# tasks.py
from celery_app import app # 导入 Celery 应用
import time
@app.task(bind=True, max_retries=3) # bind=True 让任务能访问 self
def send_email(self, to, subject, body):
"""发送邮件的异步任务"""
try:
print(f"正在发送邮件给 {to}...")
time.sleep(3) # 模拟耗时操作
print(f"邮件发送成功: {subject}")
return {'status': 'sent', 'to': to}
except Exception as exc:
# 重试机制(白话:失败了自动重试,每次等更长时间)
raise self.retry(
exc=exc,
countdown=60 * (self.request.retries + 1) # 60s, 120s, 180s
)
@app.task
def process_data(data_list):
"""数据处理任务"""
results = []
for item in data_list:
result = item * 2 # 模拟处理
results.append(result)
return results
@app.task
def add(x, y):
"""简单加法任务(演示用)"""
return x + y
2.2 调用任务¶
# 调用方式1:delay(最简单)
result = send_email.delay('user@example.com', '测试', '你好')
# 立即返回 AsyncResult 对象,不阻塞
# 调用方式2:apply_async(更多控制)
result = send_email.apply_async(
args=['user@example.com', '测试', '你好'],
countdown=60, # 60秒后执行
expires=3600, # 1小时后过期
queue='email' # 指定队列
)
# 获取结果
print(result.id) # 任务ID
print(result.status) # 状态:PENDING/STARTED/SUCCESS/FAILURE
print(result.ready()) # 是否完成
print(result.get(timeout=10)) # 等待结果(最多等10秒)
# 重要提醒:不要在 Web 请求中调用 result.get()!
# 这会阻塞 Web 服务器。应该返回 task_id 给前端,前端轮询状态。
2.3 启动 Worker¶
# 启动 Celery Worker(白话:启动后台工人,开始干活)
celery -A celery_app worker --loglevel=info
# 多进程模式(利用多核CPU)
celery -A celery_app worker --concurrency=4 --loglevel=info
# 指定队列
celery -A celery_app worker -Q email,default --loglevel=info
三、高级用法¶
3.1 任务链与工作流(Canvas)¶
from celery import chain, group, chord # 导入工作流原语
# chain:任务链(前一个任务的结果传给下一个)
# 白话:流水线,一步一步来
workflow = chain(
add.s(1, 2), # s() = signature,延迟执行
add.s(3), # 上一步结果(3) + 3 = 6
add.s(4) # 6 + 4 = 10
)
result = workflow.apply_async()
# group:并行执行多个任务
# 白话:同时干多件事
job = group([
add.s(1, 2),
add.s(3, 4),
add.s(5, 6)
])
results = job.apply_async()
# chord:先并行,再汇总
# 白话:大家先各干各的,全完成后汇总结果
callback = add.s() # 汇总任务
job = chord([add.s(1, 2), add.s(3, 4)], callback)
3.2 定时任务(Celery Beat)¶
# celery_app.py 中添加定时任务配置
from celery.schedules import crontab
app.conf.beat_schedule = {
# 每30分钟执行一次
'check-status-every-30-min': {
'task': 'tasks.check_system_status', # 任务函数路径
'schedule': 1800.0, # 间隔秒数
},
# 每天早上9点执行
'daily-report': {
'task': 'tasks.generate_report',
'schedule': crontab(hour=9, minute=0), # cron 表达式
'args': ('daily',), # 任务参数
},
# 每周一执行
'weekly-cleanup': {
'task': 'tasks.cleanup_old_data',
'schedule': crontab(hour=2, minute=0, day_of_week='monday'),
},
}
# 启动 Beat 调度器
celery -A celery_app beat --loglevel=info
# 同时启动 Worker 和 Beat
celery -A celery_app worker --beat --loglevel=info
3.3 与 FastAPI 集成¶
from fastapi import FastAPI
from tasks import send_email
app = FastAPI()
@app.post("/send-email")
async def api_send_email(to: str, subject: str):
"""API 调用异步任务"""
task = send_email.delay(to, subject, "邮件内容") # 异步发送
return {"task_id": task.id, "status": "queued"} # 立即返回
@app.get("/task/{task_id}")
async def get_task_status(task_id: str):
"""查询任务状态"""
from celery.result import AsyncResult
result = AsyncResult(task_id)
return {
"task_id": task_id,
"status": result.status,
"result": result.result if result.ready() else None
}
3.4 监控(Flower)¶
pip install flower # 安装 Flower 监控工具
celery -A celery_app flower --port=5555 # 启动 Web 监控界面
# 访问 http://localhost:5555 查看任务状态、Worker状态、成功/失败率
四、常见报错与解决¶
4.1 连接代理失败¶
解决:确认 Redis/RabbitMQ 在运行,检查 broker URL 配置。
4.2 任务超时¶
解决:增大 task_time_limit 或优化任务处理时间。
4.3 任务结果丢失¶
原因:未配置 result backend 或结果已过期 解决:配置 backend 参数,设置 result_expires = 3600。
五、速查表¶
| 操作 | 命令 |
|---|---|
| 启动Worker | celery -A app worker -l info |
| 启动Beat | celery -A app beat -l info |
| 启动Flower | celery -A app flower |
| 查看活跃任务 | celery -A app inspect active |
| 清空队列 | celery -A app purge |
| 查看注册任务 | celery -A app inspect registered |
六、同类工具对比¶
| 特性 | Celery | Dramatiq | Huey | ARQ |
|---|---|---|---|---|
| 成熟度 | 最成熟 | 成熟 | 轻量 | 轻量 |
| 复杂度 | 中 | 低 | 极低 | 低 |
| Canvas工作流 | 支持 | 支持 | 不支持 | 不支持 |
| 定时任务 | Beat | APScheduler | 内置 | 内置 |
| 异步支持 | 有限 | 支持 | 不支持 | 原生async |
| 适合场景 | 大型项目 | 中型项目 | 小型项目 | async项目 |
选型建议:Python 大型项目选 Celery(生态最好);追求简单选 Dramatiq;小项目选 Huey。
参考资料:Celery 官方文档 | GitHub | Real Python 教程