跳转到内容
123xiao | 无名键客

《Node.js 中基于 BullMQ 与 Redis 的高可靠任务队列实战:重试、延迟任务与失败恢复设计》

字数: 0 阅读时长: 1 分钟

背景与问题

在 Node.js 服务里,很多业务一开始都喜欢“同步做完再返回”:

  • 用户下单后立刻发短信
  • 创建订单后同步扣库存、写日志、发通知
  • 导出报表时直接在接口里跑大查询
  • 第三方回调失败后靠接口调用方重试

一旦业务量上来,问题会很快暴露:

  1. 请求链路过长:接口响应慢,甚至超时。
  2. 外部依赖不稳定:短信、邮件、支付、Webhook 这类服务偶发失败很常见。
  3. 任务有时效差异:有的要立刻执行,有的要延迟执行,有的失败后要稍后重试。
  4. 服务重启后任务丢失:如果任务只在进程内存中,进程挂了,任务也没了。
  5. 重复执行与幂等问题:尤其在“失败重试 + 消费者崩溃恢复”的场景中,很容易出现同一个任务被执行多次。

这时候,引入一个可靠任务队列就很自然了。
在 Node.js 生态里,BullMQ 是一个很实用的选择:它基于 Redis,API 清晰,支持重试、延迟任务、并发处理、限速、失败队列等能力,足够支撑大部分中型系统。

这篇文章我会从“高可靠设计”这个角度来讲,不只是把任务塞进队列,而是重点讨论:

  • 失败了怎么重试
  • 延迟任务怎么做
  • Worker 崩溃后怎么恢复
  • 如何避免重复执行带来的业务事故
  • 实战里怎么排查“任务卡住/重复消费/Redis 压力大”等问题

方案定位与取舍分析

先说结论:BullMQ + Redis 很适合中小到中大型的异步任务场景,但它不是所有场景的终极答案。

为什么选 BullMQ

它比较适合这些任务:

  • 邮件、短信、推送
  • 订单超时取消
  • 异步生成报表
  • 图片/音视频转码的任务调度
  • Webhook 投递与失败重试
  • 业务异步解耦

它的优势:

  • 接入成本低:Node.js 里直接用。
  • Redis 持久化与恢复机制成熟:比纯内存队列可靠。
  • 内置能力全:延迟、重试、优先级、并发、限速、事件监听都有。
  • 生态稳定:比自己手写一个 Redis List + 定时轮询靠谱得多。

不适合的场景

但如果你碰到这些情况,要谨慎:

  • 超大规模流式事件处理:更偏向 Kafka 这类日志流系统。
  • 严格顺序与强事务一致性要求非常高:需要额外设计。
  • 超长耗时任务:例如单任务几小时,Worker 心跳与锁续期要特别关注。
  • 海量延迟任务:虽然能做,但到非常大的量级时,需要做专门的容量评估。

与常见方案简单对比

方案优点缺点适用场景
进程内队列简单重启丢任务,不可横向扩展本地脚本、临时工具
Redis + BullMQ易用、功能全、恢复能力好依赖 Redis,极端规模下要调优大多数 Node.js 异步任务
RabbitMQ路由灵活,消息模型成熟使用和维护门槛更高复杂消息路由
Kafka吞吐高,适合事件流不是“任务队列”的直接替代大数据、日志流

核心原理

BullMQ 的关键不是“能把任务放进去”,而是它如何围绕 Redis 组织任务状态和消费流程。

一个任务的大致生命周期

stateDiagram-v2
    [*] --> waiting
    waiting --> active: worker 获取任务
    active --> completed: 执行成功
    active --> failed: 执行失败且重试耗尽
    active --> delayed: 失败后按退避策略延迟重试
    delayed --> waiting: 到达执行时间
    failed --> waiting: 人工/程序重入队
    completed --> [*]

几个核心状态:

  • waiting:等待被消费
  • active:Worker 正在执行
  • delayed:延迟执行
  • completed:执行成功
  • failed:执行失败

可靠性的几个核心点

1. 任务持久化

任务不是放在 Node.js 内存里,而是写到 Redis。
这意味着:

  • 服务重启后,任务仍然存在
  • 多个 Worker 可以共享同一个队列
  • 可以跨机器扩容

2. 锁与“卡住任务”恢复

Worker 取到任务后,会持有一个锁,并持续续期。
如果 Worker 崩了、进程被杀、机器宕机,锁会失效。BullMQ 会把这类任务识别为 stalled job(卡住任务),再把它重新投回可处理状态。

这就是“失败恢复”的基础,但它也带来一个非常现实的问题:

同一个任务,理论上可能被执行不止一次。

所以业务层必须设计幂等性

3. 重试与退避策略

并不是所有失败都应该立刻进入 failed
网络抖动、第三方 502、瞬时限流,这些都属于“可恢复失败”。

BullMQ 允许你配置:

  • attempts:最大重试次数
  • backoff:退避策略,如固定延迟、指数退避

这比在业务代码里 setTimeout 自己重试要安全得多,因为重试状态是持久化的

4. 延迟任务

比如:

  • 下单 30 分钟未支付自动取消
  • 注册后 24 小时发送提醒邮件
  • Webhook 首次失败后 5 分钟再试

BullMQ 的 delay 可以天然覆盖这类场景。


系统交互流程

下面这张图把“生产任务、处理任务、失败重试、恢复”串起来看会更直观。

sequenceDiagram
    participant API as API 服务
    participant Queue as BullMQ Queue
    participant Redis as Redis
    participant Worker as Worker
    participant Third as 第三方服务

    API->>Queue: add(job, opts)
    Queue->>Redis: 写入 waiting/delayed
    Worker->>Redis: 获取任务并加锁
    Worker->>Third: 调用外部接口
    alt 成功
        Worker->>Redis: 标记 completed
    else 临时失败
        Worker->>Redis: 按 backoff 放入 delayed
    else 永久失败/重试耗尽
        Worker->>Redis: 标记 failed
    end

    alt Worker 崩溃
        Redis-->>Worker: 锁过期
        Worker->>Redis: stalled 检测后重新入队
    end

架构拆分建议

实际项目里,我通常会把角色拆清楚:

  • Producer:只负责投递任务
  • Worker:只负责消费
  • Scheduler / QueueEvents:负责补全延迟、失败、完成事件观测
  • 业务幂等层:保证重复执行也不会产生脏数据
flowchart LR
    A[HTTP/API 服务] --> B[Queue Producer]
    B --> C[(Redis)]
    C --> D[Worker 1]
    C --> E[Worker 2]
    C --> F[Worker N]
    D --> G[业务数据库]
    E --> G
    F --> G
    D --> H[第三方服务]
    E --> H
    F --> H
    C --> I[QueueEvents/监控]

一个很重要的经验是:不要让 API 进程既是高频生产者,又是重 CPU Worker
否则你会发现接口延迟和异步处理互相影响,压测时特别明显。


实战代码(可运行)

下面我们做一个简化但完整的例子:订单创建后,异步发送通知;失败自动重试;也支持延迟发送;Worker 崩溃后任务可恢复。

目录结构

bullmq-demo/
├─ package.json
├─ producer.js
├─ worker.js
├─ events.js
└─ idempotency-store.js

安装依赖

npm init -y
npm install bullmq ioredis

package.json

{
  "name": "bullmq-demo",
  "version": "1.0.0",
  "type": "commonjs",
  "scripts": {
    "producer": "node producer.js",
    "worker": "node worker.js",
    "events": "node events.js"
  }
}

共享 Redis 连接与幂等存储

这里为了演示简化,幂等记录也存在 Redis。生产环境你也可以放数据库,并加唯一索引。

idempotency-store.js

const IORedis = require('ioredis');

const connection = new IORedis({
  host: '127.0.0.1',
  port: 6379,
  maxRetriesPerRequest: null
});

async function markProcessedOnce(key, ttlSeconds = 24 * 3600) {
  // SET key value NX EX ttl
  const result = await connection.set(key, '1', 'EX', ttlSeconds, 'NX');
  return result === 'OK';
}

async function closeConnection() {
  await connection.quit();
}

module.exports = {
  connection,
  markProcessedOnce,
  closeConnection
};

生产者:投递任务

producer.js

const { Queue } = require('bullmq');
const { connection } = require('./idempotency-store');

const queueName = 'notification-queue';

const queue = new Queue(queueName, {
  connection,
  defaultJobOptions: {
    removeOnComplete: 1000,
    removeOnFail: 5000,
    attempts: 5,
    backoff: {
      type: 'exponential',
      delay: 3000
    }
  }
});

async function main() {
  // 立即执行任务
  await queue.add(
    'send-order-notification',
    {
      orderId: 'ORD-1001',
      userId: 'U-88',
      channel: 'sms',
      content: '您的订单已创建成功'
    },
    {
      jobId: 'order-notify-ORD-1001'
    }
  );

  // 延迟任务:30 秒后执行
  await queue.add(
    'send-order-notification',
    {
      orderId: 'ORD-1002',
      userId: 'U-99',
      channel: 'email',
      content: '这是一个延迟发送的通知'
    },
    {
      jobId: 'order-notify-ORD-1002',
      delay: 30 * 1000
    }
  );

  console.log('任务已投递');
  await queue.close();
  process.exit(0);
}

main().catch(async (err) => {
  console.error('producer error:', err);
  await queue.close();
  process.exit(1);
});

这里有两个关键点

  1. jobId 很重要
    它可以避免同一个业务任务被重复添加。比如订单 ORD-1001 的通知任务,如果 API 因为网络抖动重复提交,BullMQ 会基于相同 jobId 去重。

  2. attempts + backoff
    它不是写在业务函数里,而是写在队列层。这样任务重试状态不会因为进程重启而丢失。


Worker:消费与失败恢复

worker.js

const { Worker } = require('bullmq');
const { connection, markProcessedOnce } = require('./idempotency-store');

const queueName = 'notification-queue';

async function fakeSendNotification(data) {
  // 模拟第三方接口有概率失败
  const random = Math.random();

  // 模拟耗时
  await new Promise((resolve) => setTimeout(resolve, 1500));

  if (random < 0.5) {
    const err = new Error('第三方通知服务暂时不可用');
    err.code = 'THIRD_PARTY_TEMP_ERROR';
    throw err;
  }

  console.log(`[通知成功] orderId=${data.orderId}, channel=${data.channel}`);
  return {
    ok: true,
    providerMessageId: `msg_${Date.now()}`
  };
}

const worker = new Worker(
  queueName,
  async (job) => {
    console.log(`开始处理 jobId=${job.id}, name=${job.name}`);

    // 幂等键:确保同一业务任务重复执行时不会重复发送
    const idempotentKey = `notify:processed:${job.data.orderId}:${job.data.channel}`;

    const firstTime = await markProcessedOnce(idempotentKey, 24 * 3600);
    if (!firstTime) {
      console.log(`检测到重复执行,直接跳过 orderId=${job.data.orderId}`);
      return { skipped: true };
    }

    const result = await fakeSendNotification(job.data);
    return result;
  },
  {
    connection,
    concurrency: 5,
    lockDuration: 30000,
    stalledInterval: 30000,
    maxStalledCount: 1
  }
);

worker.on('completed', (job, result) => {
  console.log(`job completed: id=${job.id}`, result);
});

worker.on('failed', (job, err) => {
  console.error(`job failed: id=${job && job.id}, err=${err.message}`);
});

worker.on('error', (err) => {
  console.error('worker error:', err);
});

async function shutdown() {
  console.log('收到退出信号,准备关闭 worker...');
  await worker.close();
  process.exit(0);
}

process.on('SIGINT', shutdown);
process.on('SIGTERM', shutdown);

事件监听:观察重试、完成、失败

events.js

const { QueueEvents } = require('bullmq');
const { connection } = require('./idempotency-store');

const queueName = 'notification-queue';

const queueEvents = new QueueEvents(queueName, { connection });

queueEvents.on('completed', ({ jobId, returnvalue }) => {
  console.log('[event completed]', jobId, returnvalue);
});

queueEvents.on('failed', ({ jobId, failedReason }) => {
  console.log('[event failed]', jobId, failedReason);
});

queueEvents.on('waiting', ({ jobId }) => {
  console.log('[event waiting]', jobId);
});

queueEvents.on('delayed', ({ jobId, delay }) => {
  console.log('[event delayed]', jobId, delay);
});

queueEvents.on('stalled', ({ jobId }) => {
  console.log('[event stalled]', jobId);
});

queueEvents.on('error', (err) => {
  console.error('queueEvents error:', err);
});

console.log('QueueEvents started');

运行方式

先启动 Redis,然后开三个终端:

终端 1:事件监听

npm run events

终端 2:启动 Worker

npm run worker

终端 3:投递任务

npm run producer

如果你多运行几次,会看到:

  • 有的任务一次成功
  • 有的任务会失败后进入重试
  • 延迟任务会在指定时间后再执行
  • 如果你在 Worker 执行过程中强制 kill 掉进程,部分任务会被恢复后重新执行

失败恢复设计:只靠 BullMQ 还不够

很多人第一次用队列时,会有一个误区:

“只要用了消息队列,就能保证任务只执行一次。”

这在工程上几乎不成立。
BullMQ 更接近 至少一次投递(at-least-once) 的语义,而不是“绝对仅一次”。

所以高可靠设计必须补上以下几层。

1. 业务幂等

比如“发送优惠券”“扣库存”“变更订单状态”这类操作,必须支持重复调用不出错。

常见手段:

  • 数据库唯一索引
  • 状态机校验(已完成则跳过)
  • 幂等表记录请求号
  • Redis SET NX 作为短期防重

例如发券:

CREATE TABLE coupon_send_log (
  id BIGINT PRIMARY KEY AUTO_INCREMENT,
  biz_id VARCHAR(64) NOT NULL,
  user_id BIGINT NOT NULL,
  coupon_id BIGINT NOT NULL,
  created_at DATETIME NOT NULL,
  UNIQUE KEY uk_biz_id (biz_id)
);

消费时用 biz_id 去插入,如果唯一键冲突,说明发过了,直接视为成功。

2. 区分“临时失败”和“永久失败”

不是所有异常都值得重试。

建议把异常分类:

  • 临时失败:网络超时、503、限流、连接中断
  • 永久失败:参数错误、数据不存在、业务条件不满足

例如:

function isRetryableError(err) {
  return [
    'ECONNRESET',
    'ETIMEDOUT',
    'THIRD_PARTY_TEMP_ERROR'
  ].includes(err.code);
}

在 Worker 中:

if (!isRetryableError(err)) {
  // 可以包装成一个明确错误,避免无意义重试
  throw new Error(`NON_RETRYABLE: ${err.message}`);
}

如果你不做分类,最终会看到大量“明明不可能成功却反复重试”的任务,把 Redis 和 Worker 都拖慢。

3. 死信/人工补偿机制

BullMQ 没有像某些 MQ 那样显式的“死信交换器”概念,但你完全可以在 failed 后做自己的补偿策略:

  • 定时扫描失败任务,人工重放
  • 失败写入数据库工单表
  • 告警推送到企业微信/Slack
  • 将失败任务转移到 xxx-dead-letter 队列

这一步很关键,因为“高可靠”并不等于“系统永远不出错”,而是出错后能被发现、能被恢复


容量估算与并发取舍

中级工程师容易忽略的一点是:队列系统的瓶颈,很多时候不在 BullMQ,而在下游。

一个简单估算方法

假设:

  • 平均每秒入队 200 个任务
  • 每个任务处理耗时 500ms
  • 单个 Worker 并发设为 20

理论上,单个 Worker 的吞吐大约是:

20 / 0.5 = 40 task/s

那至少需要:

200 / 40 = 5 个 Worker

再考虑重试、波峰和第三方限流,实际通常要再留 30%~50% 裕量。

并发不是越大越好

我见过不少项目把 concurrency 开到几百,结果:

  • Node 事件循环被压住
  • 第三方接口被打爆
  • Redis 命令数飙升
  • 失败任务更快堆积

建议按任务类型调整:

  • IO 型任务:并发可以高一些
  • CPU 型任务:并发要保守,必要时拆到独立进程甚至其他语言服务

延迟任务很多时要关注什么

如果系统里有大量延迟任务,比如几十万、几百万级:

  • Redis 内存要预估
  • 队列数量不要无限膨胀
  • 删除策略要做好,避免 completed/failed 积压
  • 监控延迟漂移:计划 5 分钟执行,实际是否拖到 10 分钟

常见坑与排查

这一节我尽量讲得“接地气”一点,都是实战里很常见的问题。

坑 1:任务重复执行

现象

  • 用户收到两条短信
  • 同一订单被重复取消
  • 一条 webhook 被投递多次

原因

  • Worker 在处理过程中崩溃,任务被恢复
  • 生产者重复投递
  • 接口重试导致同一业务被多次入队
  • 锁续期失败,被误判 stalled

排查路径

  1. 看是否设置了业务唯一 jobId
  2. 看 Worker 是否有长时间阻塞事件循环
  3. 看是否实现了业务幂等
  4. 看 Redis 是否出现网络抖动或阻塞

处理建议

  • 入队时设置 jobId
  • 消费时实现幂等
  • 不要在 Worker 里写大 CPU 计算
  • 长任务适当调大 lockDuration

坑 2:任务卡住不动

现象

  • 队列里有 waiting,但 Worker 似乎不消费
  • delayed 任务一直不到点执行
  • active 任务长期不结束

常见原因

  • Worker 根本没启动
  • Redis 连接异常
  • 队列名写错
  • 任务执行函数卡死
  • 并发设置过小,任务被前面的大任务堵住

排查命令思路

你可以先从日志和 Redis 状态入手。
在应用里打印:

console.log('queue name:', queueName);
console.log('redis status:', connection.status);

如果要看 Redis 当前压力:

redis-cli info memory
redis-cli info clients
redis-cli info stats

我的经验

如果 active 长时间不释放,先怀疑:

  1. 业务代码里有未结束的 Promise
  2. 调用第三方接口没有超时控制
  3. 有同步 CPU 重活阻塞了事件循环

比如 HTTP 请求一定要设超时,而不是无限等:

async function withTimeout(promise, ms) {
  let timer;
  const timeout = new Promise((_, reject) => {
    timer = setTimeout(() => reject(new Error('timeout')), ms);
  });

  try {
    return await Promise.race([promise, timeout]);
  } finally {
    clearTimeout(timer);
  }
}

坑 3:失败任务越来越多

现象

  • failed 数量持续上升
  • 重试把队列越压越长
  • Redis 内存持续增长

原因

  • 重试次数设置过高
  • 永久失败任务也在重试
  • 下游服务不可用但没有熔断
  • removeOnFail 没配置,失败记录一直堆积

处理建议

  • 区分可重试与不可重试错误
  • 给第三方依赖做熔断、限流
  • 合理设置 attempts
  • 对失败任务做归档/清理策略

坑 4:Redis 内存涨得快

现象

  • Redis 内存不断上涨
  • 队列查询越来越慢
  • 运维开始追着问是不是“你们任务系统又爆了”

常见原因

  • completed/failed 没清理
  • 返回结果太大
  • 任务 data 放了大对象
  • 单队列积压严重

建议

  • removeOnCompleteremoveOnFail 要配置
  • 不要把大文件内容直接塞进 job data
  • job data 只放必要字段,例如 ID、路径、引用
  • 报表或大对象放对象存储/数据库,任务只传引用

安全/性能最佳实践

这一部分我尽量给能直接落地的建议。

1. 不要把敏感数据明文塞进任务体

错误示范:

await queue.add('send-email', {
  email: '[email protected]',
  password: 'plain-text-password',
  token: 'secret-token'
});

任务数据会进入 Redis。
如果 Redis 没有做好隔离、ACL、网络访问控制,这些信息会非常危险。

建议:

  • 只传业务 ID
  • 敏感字段到消费时再查
  • Redis 开启访问控制,部署在内网
  • 使用 TLS/认证(如果环境支持)

2. 队列数据最小化

一个好的 job data 通常长这样:

{
  orderId: "ORD-1001",
  userId: "U-88",
  templateId: "tpl_sms_created"
}

而不是把整个订单 JSON 全塞进去。

好处:

  • Redis 更省内存
  • 任务更稳定
  • 业务数据变更时不容易产生“旧快照问题”

3. 任务处理必须设超时边界

外部调用建议统一包装超时、重试、熔断。
否则某个第三方接口卡 2 分钟,你的 Worker 很快就会被拖死。

4. 按任务类型拆队列

不要把以下任务混在一个队列里:

  • 秒级短信通知
  • 分钟级报表导出
  • 30 分钟后自动取消订单
  • 视频转码

建议拆分:

  • notification-queue
  • report-queue
  • order-delay-queue
  • media-process-queue

这样更容易做:

  • 独立并发控制
  • 独立监控
  • 独立告警
  • 独立扩容

5. 做监控,不要“等投诉才知道挂了”

至少监控这些指标:

  • waiting 数量
  • active 数量
  • failed 增量
  • delayed 数量
  • 完成耗时分位数
  • 重试次数
  • stalled 次数
  • Redis 内存和连接数

如果没有监控,高可靠其实只是口头说法。

6. 优雅停机

Worker 收到 SIGTERM 时不要直接退出。
应该先 worker.close(),让当前任务有机会收尾。
尤其在容器环境或 K8s 下,这一步非常重要。


一个更稳妥的生产级配置示例

下面给一个更接近生产环境的队列初始化方式。

const { Queue } = require('bullmq');
const IORedis = require('ioredis');

const connection = new IORedis({
  host: process.env.REDIS_HOST || '127.0.0.1',
  port: Number(process.env.REDIS_PORT || 6379),
  password: process.env.REDIS_PASSWORD || undefined,
  maxRetriesPerRequest: null,
  enableReadyCheck: true,
  lazyConnect: false
});

const notificationQueue = new Queue('notification-queue', {
  connection,
  defaultJobOptions: {
    attempts: 4,
    backoff: {
      type: 'exponential',
      delay: 5000
    },
    removeOnComplete: 1000,
    removeOnFail: 3000
  }
});

module.exports = {
  connection,
  notificationQueue
};

如果你的下游有明显限流要求,还可以加限速:

const { Worker } = require('bullmq');

const worker = new Worker(
  'notification-queue',
  async (job) => {
    // process job
  },
  {
    connection,
    concurrency: 10,
    limiter: {
      max: 100,
      duration: 1000
    }
  }
);

这表示每秒最多处理 100 个任务,对于短信、邮件、Webhook 这类场景非常实用。


边界条件与设计建议

最后把几个容易忽视的边界条件单独拎出来。

什么时候 BullMQ 足够用

如果你的需求是:

  • 延迟任务
  • 异步通知
  • 失败重试
  • 多 Worker 并发处理
  • 服务重启后任务不丢

那 BullMQ 通常已经够用了。

什么时候要补更多基础设施

如果你的需求开始出现:

  • 跨地域多活
  • 极高吞吐事件流
  • 严格顺序消费
  • 海量任务审计追踪
  • 多语言复杂消费者体系

那可能要考虑:

  • Kafka 做事件流
  • 独立调度平台
  • 更严格的任务编排系统
  • 数据库 outbox/inbox 模式

一条很实用的建议

如果你现在的系统还比较简单,不要一开始就堆满复杂概念。
建议先做到这 5 件事:

  1. 任务入队带 jobId
  2. 消费逻辑实现幂等
  3. 配置 attempts + backoff
  4. completed/failed 做自动清理
  5. 把失败事件接入告警

这 5 条做好,可靠性会比“只是把任务异步化”高很多。


总结

BullMQ + Redis 在 Node.js 里做高可靠任务队列,是一个非常务实的方案。
它真正有价值的地方,不只是“异步”,而是把下面这些能力系统化了:

  • 任务持久化
  • 失败自动重试
  • 延迟执行
  • Worker 崩溃后的恢复
  • 并发与限速控制
  • 事件观测与运维排查

但也要记住一个核心事实:

队列本身解决的是“调度与恢复”,不是“业务绝对只执行一次”。

所以真正的高可靠,一定是 BullMQ 的恢复能力 + 业务层幂等 + 失败补偿机制 三者一起完成的。

如果你准备在项目里落地,我建议按下面顺序推进:

  1. 先按业务域拆好队列
  2. 为每类任务定义唯一业务键
  3. 配置合理的重试与退避策略
  4. 给所有外部调用加超时
  5. 建立失败告警与人工补偿入口
  6. 再去做更细的性能调优和容量扩展

这样做,系统不一定“永远不失败”,但至少失败时你知道它在哪、为什么、怎么恢复。
这就是工程上真正有价值的“高可靠”。


分享到:

上一篇
《Java 中基于 CompletableFuture 与线程池的异步任务编排实战:性能优化与异常处理策略》
下一篇
《安卓逆向实战:基于 Frida 定位与绕过常见反调试机制的方法解析》