Bull/BullMQ 队列¶
一句话概述:BullMQ 是 Node.js 最流行的任务队列库(Bull 的下一代),基于 Redis,支持优先级队列、延迟任务、任务重试、限流等功能,是 Node.js 后台任务处理的首选。
核心知识点¶
| 概念 | 白话解释 |
|---|---|
| Queue | 队列 = 存放待处理任务的列表 |
| Job | 作业 = 队列中的一个具体任务 |
| Worker | 工作者 = 处理任务的进程 |
| Flow | 流程 = 多个任务的父子依赖关系 |
| Event | 事件 = 任务状态变化时的通知 |
| Repeatable | 可重复 = 定时执行的任务 |
安装配置¶
基本使用¶
// queue.ts — 创建队列和添加任务
import { Queue } from 'bullmq'
const analysisQueue = new Queue('analysis', {
connection: { host: 'localhost', port: 6379 }, // Redis 连接
})
// 添加任务
await analysisQueue.add('fastqc', { sampleId: 'S001', type: 'fastqc' })
// 延迟任务
await analysisQueue.add('report', { type: 'daily' }, { delay: 60000 }) // 1分钟后
// 优先级任务
await analysisQueue.add('urgent', { sampleId: 'S999' }, { priority: 1 }) // 优先处理
// 定时任务
await analysisQueue.add('cleanup', {}, {
repeat: { pattern: '0 */6 * * *' } // 每6小时
})
// worker.ts — 处理任务
import { Worker } from 'bullmq'
const worker = new Worker('analysis', async (job) => {
console.log(`处理任务: ${job.name}, 数据: ${JSON.stringify(job.data)}`)
// 更新进度
await job.updateProgress(50)
// 执行分析...
await job.updateProgress(100)
return { status: 'completed' } // 返回结果
}, {
connection: { host: 'localhost', port: 6379 },
concurrency: 5, // 并发处理5个任务
})
// 事件监听
worker.on('completed', (job) => console.log(`完成: ${job.id}`))
worker.on('failed', (job, err) => console.log(`失败: ${job?.id}, ${err.message}`))
高级用法¶
Flow(任务编排)¶
import { FlowProducer } from 'bullmq'
const flow = new FlowProducer({ connection: { host: 'localhost', port: 6379 } })
await flow.add({
name: 'final-report', // 父任务
queueName: 'reports',
children: [ // 子任务(先执行)
{ name: 'fastqc', queueName: 'analysis', data: { sample: 'S001' } },
{ name: 'bwa', queueName: 'analysis', data: { sample: 'S001' } },
],
})
常见报错¶
| 报错信息 | 原因 | 解决方法 |
|---|---|---|
ECONNREFUSED | Redis 未运行 | 启动 Redis |
Missing lock | 任务锁丢失 | 检查 Redis 连接稳定性 |
Job stalled | Worker 处理超时 | 增加 lockDuration |
速查表¶
// === 队列操作 ===
queue.add(name, data, opts) // 添加任务
queue.getJobs(['waiting', 'active']) // 获取任务列表
queue.obliterate() // 清空队列
// === 任务选项 ===
{ delay: 5000 } // 延迟5秒
{ priority: 1 } // 优先级(数字越小越优先)
{ attempts: 3, backoff: { type: 'exponential', delay: 1000 } } // 重试
{ repeat: { pattern: '*/5 * * * *' } } // 每5分钟
{ removeOnComplete: true } // 完成后删除
参考:BullMQ 文档 | 更新于 2026 年