跳转至

547_消息队列系统设计


一句话说明

消息队列(MQ)实现服务解耦和异步处理,是生信分析任务调度、流水线控制的关键组件。


核心知识点

为什么用消息队列?

问题场景:用户上传FASTQ后触发分析
  同步方式:HTTP请求等待分析完成(4小时...超时!)
  异步方式:提交任务→返回job_id→MQ排队→Worker异步执行

好处:
  1. 解耦:上传服务和分析服务互不依赖
  2. 削峰:突发大量任务时排队,不压垮服务
  3. 重试:失败任务自动重入队列
  4. 顺序保证:FIFO队列保证任务顺序

主流 MQ 对比

消息队列吞吐量延迟持久化典型场景
Redis Queue<1ms轻量任务调度
RabbitMQ复杂路由、可靠投递
Kafka极高日志流、事件驱动
Celery+RedisPython异步任务
AWS SQS云原生场景

消息投递语义

At most once  最多一次:可能丢消息,性能最好
At least once 至少一次:可能重复,需要幂等处理(最常用)
Exactly once  恰好一次:最复杂,性能最差(Kafka事务)

实战代码/设计图/模板

生信任务队列架构图

[API服务]
   │ 提交任务(produce)
[Redis / RabbitMQ 队列]
  ├── queue:wes_jobs     ← WES分析任务
  ├── queue:rna_jobs     ← RNA-seq任务
  ├── queue:16s_jobs     ← 16S任务
  └── queue:notify_jobs  ← 邮件通知任务

[Worker集群] ← 消费消息(consume)
  ├── WES Worker(4个进程,每个16核)
  ├── RNA Worker(8个进程,每个8核)
  └── 16S Worker(16个进程,每个4核)

[数据库] ← 更新任务状态
[通知服务] ← 发送完成邮件

Celery 实现(Python 生信常用)

# tasks.py
from celery import Celery
from celery.utils.log import get_task_logger

app = Celery(
    'bioinf_tasks',
    broker='redis://localhost:6379/0',  # 消息队列
    backend='redis://localhost:6379/1'  # 结果存储
)

# 队列路由配置
app.conf.task_routes = {
    'tasks.run_wes':       {'queue': 'wes_jobs'},
    'tasks.run_rnaseq':    {'queue': 'rna_jobs'},
    'tasks.run_16s':       {'queue': '16s_jobs'},
    'tasks.send_email':    {'queue': 'notify_jobs'},
}

logger = get_task_logger(__name__)

@app.task(
    bind=True,
    max_retries=3,          # 最多重试3次
    retry_backoff=True,     # 指数退避重试
    queue='wes_jobs'
)
def run_wes(self, job_id: str, sample_ids: list, params: dict):
    """WES 分析任务"""
    logger.info(f"开始执行WES分析,job_id={job_id}")

    try:
        update_job_status(job_id, "RUNNING")

        # 执行分析流程
        result = execute_wes_pipeline(sample_ids, params)

        update_job_status(job_id, "COMPLETED", result=result)

        # 触发通知任务
        send_email.delay(job_id=job_id, status="完成")

        return {"job_id": job_id, "status": "SUCCESS"}

    except Exception as exc:
        logger.error(f"任务失败: {exc}")
        update_job_status(job_id, "FAILED", error=str(exc))

        # 自动重试(等待30s, 60s, 120s)
        raise self.retry(exc=exc, countdown=30 * (2 ** self.request.retries))

@app.task(queue='notify_jobs')
def send_email(job_id: str, status: str):
    """发送邮件通知"""
    job = get_job(job_id)
    send_smtp_email(
        to=job.user_email,
        subject=f"分析任务{status} - {job_id}",
        body=f"您的分析任务已{status},请登录查看结果"
    )

任务提交和监控

# 提交任务
from tasks import run_wes

def submit_analysis(job_id: str, sample_ids: list, params: dict):
    # 异步提交(立即返回)
    task = run_wes.apply_async(
        args=[job_id, sample_ids, params],
        # 指定在哪个队列执行
        queue='wes_jobs',
        # 优先级(0-9,9最高)
        priority=5,
        # 任务过期时间(防止积压)
        expires=86400
    )
    return task.id

# 查询任务状态
from celery.result import AsyncResult

def get_task_status(task_id: str) -> dict:
    result = AsyncResult(task_id)
    return {
        "task_id": task_id,
        "state": result.state,          # PENDING/STARTED/SUCCESS/FAILURE
        "result": result.result if result.ready() else None,
        "traceback": result.traceback if result.failed() else None
    }

死信队列处理

# 处理失败任务(死信队列)
@app.task(queue='dead_letter_queue')
def handle_dead_letter(task_name: str, args: list, exc: str):
    """
    任务多次重试失败后进入死信队列
    - 记录日志
    - 告警通知
    - 人工介入
    """
    logger.critical(f"死信任务: {task_name}, 错误: {exc}")
    alert_oncall(f"任务彻底失败: {task_name}")

面试常问点

问题参考答案
如何保证消息不丢?持久化存储+ack确认机制
如何处理重复消息?幂等性设计,数据库唯一约束
消息积压怎么办?扩容Worker,按优先级处理
Kafka vs RabbitMQ?Kafka吞吐高适合日志,RabbitMQ适合任务队列
如何监控队列深度?Prometheus + Grafana,设置告警阈值

速查表

Celery 常用命令:
  启动Worker:celery -A tasks worker -Q wes_jobs --concurrency=4
  查看队列:celery -A tasks inspect active
  监控UI:celery -A tasks flower --port=5555
  清空队列:celery -A tasks purge

Redis MQ 常用命令:
  LPUSH queue_name message   # 生产者入队
  BRPOP queue_name 0         # 消费者阻塞出队(0=永久等待)
  LLEN queue_name            # 查看队列长度
  LRANGE queue_name 0 9      # 查看队列前10条