跳转至

Bull/BullMQ 队列

一句话概述:BullMQ 是 Node.js 最流行的任务队列库(Bull 的下一代),基于 Redis,支持优先级队列、延迟任务、任务重试、限流等功能,是 Node.js 后台任务处理的首选。

核心知识点

概念白话解释
Queue队列 = 存放待处理任务的列表
Job作业 = 队列中的一个具体任务
Worker工作者 = 处理任务的进程
Flow流程 = 多个任务的父子依赖关系
Event事件 = 任务状态变化时的通知
Repeatable可重复 = 定时执行的任务

安装配置

npm install bullmq  # 安装 BullMQ
# 需要 Redis: docker run -d -p 6379:6379 redis

基本使用

// queue.ts — 创建队列和添加任务
import { Queue } from 'bullmq'

const analysisQueue = new Queue('analysis', {
  connection: { host: 'localhost', port: 6379 },  // Redis 连接
})

// 添加任务
await analysisQueue.add('fastqc', { sampleId: 'S001', type: 'fastqc' })

// 延迟任务
await analysisQueue.add('report', { type: 'daily' }, { delay: 60000 })  // 1分钟后

// 优先级任务
await analysisQueue.add('urgent', { sampleId: 'S999' }, { priority: 1 })  // 优先处理

// 定时任务
await analysisQueue.add('cleanup', {}, {
  repeat: { pattern: '0 */6 * * *' }  // 每6小时
})
// worker.ts — 处理任务
import { Worker } from 'bullmq'

const worker = new Worker('analysis', async (job) => {
  console.log(`处理任务: ${job.name}, 数据: ${JSON.stringify(job.data)}`)
  // 更新进度
  await job.updateProgress(50)
  // 执行分析...
  await job.updateProgress(100)
  return { status: 'completed' }  // 返回结果
}, {
  connection: { host: 'localhost', port: 6379 },
  concurrency: 5,  // 并发处理5个任务
})

// 事件监听
worker.on('completed', (job) => console.log(`完成: ${job.id}`))
worker.on('failed', (job, err) => console.log(`失败: ${job?.id}, ${err.message}`))

高级用法

Flow(任务编排)

import { FlowProducer } from 'bullmq'

const flow = new FlowProducer({ connection: { host: 'localhost', port: 6379 } })

await flow.add({
  name: 'final-report',  // 父任务
  queueName: 'reports',
  children: [  // 子任务(先执行)
    { name: 'fastqc', queueName: 'analysis', data: { sample: 'S001' } },
    { name: 'bwa', queueName: 'analysis', data: { sample: 'S001' } },
  ],
})

常见报错

报错信息原因解决方法
ECONNREFUSEDRedis 未运行启动 Redis
Missing lock任务锁丢失检查 Redis 连接稳定性
Job stalledWorker 处理超时增加 lockDuration

速查表

// === 队列操作 ===
queue.add(name, data, opts)           // 添加任务
queue.getJobs(['waiting', 'active'])  // 获取任务列表
queue.obliterate()                    // 清空队列

// === 任务选项 ===
{ delay: 5000 }                       // 延迟5秒
{ priority: 1 }                       // 优先级(数字越小越优先)
{ attempts: 3, backoff: { type: 'exponential', delay: 1000 } }  // 重试
{ repeat: { pattern: '*/5 * * * *' } }  // 每5分钟
{ removeOnComplete: true }             // 完成后删除

参考:BullMQ 文档 | 更新于 2026 年