547_消息队列系统设计
一句话说明
消息队列(MQ)实现服务解耦和异步处理,是生信分析任务调度、流水线控制的关键组件。
核心知识点
为什么用消息队列?
问题场景:用户上传FASTQ后触发分析
同步方式:HTTP请求等待分析完成(4小时...超时!)
异步方式:提交任务→返回job_id→MQ排队→Worker异步执行
好处:
1. 解耦:上传服务和分析服务互不依赖
2. 削峰:突发大量任务时排队,不压垮服务
3. 重试:失败任务自动重入队列
4. 顺序保证:FIFO队列保证任务顺序
主流 MQ 对比
| 消息队列 | 吞吐量 | 延迟 | 持久化 | 典型场景 |
|---|
| Redis Queue | 高 | <1ms | 弱 | 轻量任务调度 |
| RabbitMQ | 中 | 低 | 强 | 复杂路由、可靠投递 |
| Kafka | 极高 | 中 | 强 | 日志流、事件驱动 |
| Celery+Redis | 中 | 低 | 弱 | Python异步任务 |
| AWS SQS | 高 | 低 | 强 | 云原生场景 |
消息投递语义
At most once 最多一次:可能丢消息,性能最好
At least once 至少一次:可能重复,需要幂等处理(最常用)
Exactly once 恰好一次:最复杂,性能最差(Kafka事务)
实战代码/设计图/模板
生信任务队列架构图
[API服务]
│ 提交任务(produce)
▼
[Redis / RabbitMQ 队列]
├── queue:wes_jobs ← WES分析任务
├── queue:rna_jobs ← RNA-seq任务
├── queue:16s_jobs ← 16S任务
└── queue:notify_jobs ← 邮件通知任务
[Worker集群] ← 消费消息(consume)
├── WES Worker(4个进程,每个16核)
├── RNA Worker(8个进程,每个8核)
└── 16S Worker(16个进程,每个4核)
[数据库] ← 更新任务状态
[通知服务] ← 发送完成邮件
Celery 实现(Python 生信常用)
# tasks.py
from celery import Celery
from celery.utils.log import get_task_logger
app = Celery(
'bioinf_tasks',
broker='redis://localhost:6379/0', # 消息队列
backend='redis://localhost:6379/1' # 结果存储
)
# 队列路由配置
app.conf.task_routes = {
'tasks.run_wes': {'queue': 'wes_jobs'},
'tasks.run_rnaseq': {'queue': 'rna_jobs'},
'tasks.run_16s': {'queue': '16s_jobs'},
'tasks.send_email': {'queue': 'notify_jobs'},
}
logger = get_task_logger(__name__)
@app.task(
bind=True,
max_retries=3, # 最多重试3次
retry_backoff=True, # 指数退避重试
queue='wes_jobs'
)
def run_wes(self, job_id: str, sample_ids: list, params: dict):
"""WES 分析任务"""
logger.info(f"开始执行WES分析,job_id={job_id}")
try:
update_job_status(job_id, "RUNNING")
# 执行分析流程
result = execute_wes_pipeline(sample_ids, params)
update_job_status(job_id, "COMPLETED", result=result)
# 触发通知任务
send_email.delay(job_id=job_id, status="完成")
return {"job_id": job_id, "status": "SUCCESS"}
except Exception as exc:
logger.error(f"任务失败: {exc}")
update_job_status(job_id, "FAILED", error=str(exc))
# 自动重试(等待30s, 60s, 120s)
raise self.retry(exc=exc, countdown=30 * (2 ** self.request.retries))
@app.task(queue='notify_jobs')
def send_email(job_id: str, status: str):
"""发送邮件通知"""
job = get_job(job_id)
send_smtp_email(
to=job.user_email,
subject=f"分析任务{status} - {job_id}",
body=f"您的分析任务已{status},请登录查看结果"
)
任务提交和监控
# 提交任务
from tasks import run_wes
def submit_analysis(job_id: str, sample_ids: list, params: dict):
# 异步提交(立即返回)
task = run_wes.apply_async(
args=[job_id, sample_ids, params],
# 指定在哪个队列执行
queue='wes_jobs',
# 优先级(0-9,9最高)
priority=5,
# 任务过期时间(防止积压)
expires=86400
)
return task.id
# 查询任务状态
from celery.result import AsyncResult
def get_task_status(task_id: str) -> dict:
result = AsyncResult(task_id)
return {
"task_id": task_id,
"state": result.state, # PENDING/STARTED/SUCCESS/FAILURE
"result": result.result if result.ready() else None,
"traceback": result.traceback if result.failed() else None
}
死信队列处理
# 处理失败任务(死信队列)
@app.task(queue='dead_letter_queue')
def handle_dead_letter(task_name: str, args: list, exc: str):
"""
任务多次重试失败后进入死信队列
- 记录日志
- 告警通知
- 人工介入
"""
logger.critical(f"死信任务: {task_name}, 错误: {exc}")
alert_oncall(f"任务彻底失败: {task_name}")
面试常问点
| 问题 | 参考答案 |
|---|
| 如何保证消息不丢? | 持久化存储+ack确认机制 |
| 如何处理重复消息? | 幂等性设计,数据库唯一约束 |
| 消息积压怎么办? | 扩容Worker,按优先级处理 |
| Kafka vs RabbitMQ? | Kafka吞吐高适合日志,RabbitMQ适合任务队列 |
| 如何监控队列深度? | Prometheus + Grafana,设置告警阈值 |
速查表
Celery 常用命令:
启动Worker:celery -A tasks worker -Q wes_jobs --concurrency=4
查看队列:celery -A tasks inspect active
监控UI:celery -A tasks flower --port=5555
清空队列:celery -A tasks purge
Redis MQ 常用命令:
LPUSH queue_name message # 生产者入队
BRPOP queue_name 0 # 消费者阻塞出队(0=永久等待)
LLEN queue_name # 查看队列长度
LRANGE queue_name 0 9 # 查看队列前10条