跳转到内容
123xiao | 无名键客

《Node.js 中基于 Worker Threads 与消息队列的高并发任务处理实践-333》

字数: 0 阅读时长: 1 分钟

背景与问题

很多团队刚开始做 Node.js 服务时,往往会享受它“写起来快、异步能力强”的红利。但业务一上量,问题马上就暴露出来:

  • HTTP 请求量突然升高
  • 某些任务是 CPU 密集型,比如图片处理、压缩、加密、规则计算、批量数据转换
  • 某些任务虽然不是纯 CPU 密集,但执行链路长,容易堆积
  • 主线程一忙,接口延迟飙升,甚至健康检查都超时

这时候只靠 async/await 是不够的。因为它解决的是 I/O 等待,不是 CPU 争抢

我自己第一次在 Node.js 里踩这个坑,是做一个“批量生成报表”的服务。接口看起来都是异步的,但一旦几十个大任务同时进来,事件循环卡住,API 响应明显变慢。后来拆开看才发现:真正的问题不在网络,而在计算。

这类场景里,一个比较稳妥的组合是:

  1. 消息队列负责削峰填谷、任务解耦、失败重试
  2. Worker Threads负责把 CPU 密集工作从主线程剥离
  3. 主进程只做:接收请求、投递任务、监控结果、控制并发

这篇文章从架构视角讲清楚这套方案,顺带给出一个可运行的小型实现。


先说结论:为什么是 Worker Threads + 消息队列

如果把高并发任务处理拆开看,本质上有两个问题:

  • 任务太多时,怎么不把系统打爆?
  • 任务很重时,怎么不阻塞 Node 主线程?

这两件事分别对应两种能力:

能力解决的问题典型手段
削峰与缓冲请求瞬时洪峰、任务堆积消息队列
并行执行CPU 密集工作阻塞事件循环Worker Threads

所以它们不是替代关系,而是互补关系。


方案总览

我们先看一个整体架构图。

flowchart LR
    A[客户端/上游服务] --> B[Node.js API 服务]
    B --> C[任务入队 Queue]
    C --> D[调度器 Consumer]
    D --> E1[Worker 1]
    D --> E2[Worker 2]
    D --> E3[Worker N]
    E1 --> F[(结果存储/DB)]
    E2 --> F
    E3 --> F
    D --> G[失败重试/死信队列]

这套链路中,各层职责比较清晰:

  • API 层:只管收任务,尽快返回“已接收”
  • 消息队列:做缓冲和解耦
  • Consumer/调度器:控制并发,分配给 Worker
  • Worker Threads:执行重 CPU 任务
  • 结果存储:记录结果、状态、错误原因
  • 重试/死信队列:处理失败任务

如果你的系统已经有 RabbitMQ、Kafka、Redis Stream、BullMQ 等基础设施,直接接入即可。为了让示例容易运行,下面的代码我会用 内存队列模拟消息队列,重点放在架构和 Worker 使用方式上。


背景与问题

Node.js 为什么会“并发高,但不适合所有高并发”?

这是个特别容易误解的问题。

Node.js 的强项是:

  • 单线程事件循环
  • 非阻塞 I/O
  • 很适合高并发网络请求处理

但它的短板也很明显:

  • JS 主线程默认只有一个
  • CPU 密集任务会卡住事件循环
  • 一旦事件循环被占满,所有请求都会受影响

比如下面这种“看起来没问题”的代码:

function heavyTask(n) {
  let count = 0;
  for (let i = 0; i < n; i++) {
    count += Math.sqrt(i);
  }
  return count;
}

如果这个函数在接口请求里直接执行,来 50 个请求,主线程就会被吃满。

所以,高并发请求处理高并发任务处理 不是一回事:

  • 前者更多是 I/O 并发
  • 后者常常伴随 CPU、内存、背压、重试和任务状态管理

核心原理

1. Worker Threads 解决什么问题

worker_threads 是 Node.js 提供的多线程能力。它适合:

  • CPU 密集任务
  • 大量计算逻辑
  • 可并行拆分的任务

它不适合:

  • 很轻的小任务
  • 频繁传大对象但计算很少的场景
  • 需要共享复杂状态的场景

因为 Worker 不是“零成本”的。线程创建、消息传递、序列化都有开销。

Worker 与主线程的关系

sequenceDiagram
    participant API as 主线程/API
    participant Q as 消息队列
    participant C as Consumer
    participant W as Worker Thread
    participant DB as 结果存储

    API->>Q: 投递任务
    C->>Q: 拉取任务
    C->>W: postMessage(task)
    W->>W: 执行 CPU 密集逻辑
    W-->>C: 返回结果/错误
    C->>DB: 更新状态

关键点有两个:

  1. 主线程不要做重活
  2. Worker 不要无限开

如果每来一个任务就创建一个 Worker,在高并发下会更糟。所以一般要做 Worker 池 或并发限制。


2. 消息队列解决什么问题

消息队列的价值,远不只是“异步”。

更重要的是:

  • 削峰:流量高峰时先入队
  • 解耦:请求方不必等待任务执行完成
  • 重试:失败任务可延后重试
  • 可观测:任务状态有据可查
  • 限流:消费侧可以按能力处理

你可以把它理解成系统的“缓冲层”。

状态流转建议

stateDiagram-v2
    [*] --> pending
    pending --> processing
    processing --> success
    processing --> retrying
    retrying --> processing
    retrying --> dead_letter
    processing --> dead_letter
    success --> [*]
    dead_letter --> [*]

实际系统里,我非常建议任务状态至少有:

  • pending
  • processing
  • success
  • failed / retrying
  • dead_letter

否则线上排查会很痛苦:你根本不知道任务是没收到、没消费、处理中、还是处理挂了。


3. 为什么这两个组合起来更稳

如果只有 Worker Threads,没有消息队列:

  • 突发任务仍然可能把进程打满
  • 没有天然的重试机制
  • 请求和执行耦合度高

如果只有消息队列,没有 Worker Threads:

  • Consumer 本身如果在主线程做重计算,照样会卡

因此完整路径通常是:

请求入队 -> Consumer 拉取 -> 并发受控地分配给 Worker -> 结果回写 -> 失败重试


方案对比与取舍分析

在 architecture 类文章里,单讲“怎么做”还不够,最好把“为什么这样做”讲透。

方案一:直接在主线程处理

优点

  • 实现最简单
  • 无额外组件

缺点

  • CPU 密集任务会阻塞事件循环
  • 接口响应不稳定
  • 高并发下几乎不可控

适合:低流量、轻任务、内部工具。


方案二:Cluster/多进程 + 队列

优点

  • 能利用多核
  • 进程隔离性好

缺点

  • 进程级开销较大
  • 进程间通信成本高
  • 每个进程还要自己控并发

适合:服务实例级扩展、隔离需求强的场景。


方案三:Worker Threads + 队列

优点

  • 更适合 CPU 密集型并行
  • 单进程内更容易统一调度
  • 比多进程通信更轻

缺点

  • 线程数管理不好容易失控
  • 大对象消息传递有序列化成本
  • 代码复杂度比纯异步高

适合:中高并发、CPU 任务明显、任务可独立执行的场景。


方案四:独立计算服务 + 队列

优点

  • 职责最清晰
  • 可单独扩容
  • 运维边界清楚

缺点

  • 系统复杂度更高
  • 链路更长
  • 开发和部署成本增加

适合:大规模平台化架构,或者计算逻辑非常重的场景。


容量估算思路

很多人刚上 Worker 会问:到底该开多少个线程?

没有万能数字,但有一个简单思路:

1. 先看 CPU 核数

比如机器是 8 核,那 Worker 并发通常从 6~8 起试,不要直接开 50 个。

因为除了 Worker,还有:

  • 主线程
  • GC
  • 监控线程
  • 其他系统开销

2. 估任务平均耗时

假设:

  • 单个任务平均执行时间:200ms
  • Worker 数量:8

理论吞吐大概是:

8 / 0.2 = 40 tasks/s

如果高峰任务进入速率是 120 tasks/s,那就意味着队列一定会积压。

3. 再决定策略

可选策略:

  • 扩 Worker 数量
  • 水平扩更多实例
  • 限流
  • 拆小任务
  • 降级非关键任务

我个人经验是:先控制单机稳定,再谈极限吞吐。否则系统看上去很快,实际抖动会很严重。


实战代码(可运行)

下面给一个可运行示例,使用:

  • Node.js 原生 worker_threads
  • 内存队列模拟 MQ
  • 一个简单 Worker 池
  • 一个 CPU 密集任务:计算斐波那契

说明:示例重点是架构方式,不依赖外部 MQ,复制即可运行。

目录结构

project/
  ├─ main.js
  └─ worker.js

worker.js

const { parentPort } = require('worker_threads');

function fib(n) {
  if (n <= 1) return n;
  return fib(n - 1) + fib(n - 2);
}

parentPort.on('message', async (job) => {
  const start = Date.now();

  try {
    const result = fib(job.payload.n);

    parentPort.postMessage({
      jobId: job.id,
      status: 'success',
      result,
      duration: Date.now() - start,
    });
  } catch (error) {
    parentPort.postMessage({
      jobId: job.id,
      status: 'failed',
      error: error.message,
      duration: Date.now() - start,
    });
  }
});

main.js

const os = require('os');
const path = require('path');
const { Worker } = require('worker_threads');

class InMemoryQueue {
  constructor() {
    this.jobs = [];
  }

  push(job) {
    this.jobs.push(job);
  }

  pop() {
    return this.jobs.shift();
  }

  size() {
    return this.jobs.length;
  }
}

class WorkerPool {
  constructor(workerFile, size) {
    this.workerFile = workerFile;
    this.size = size;
    this.workers = [];
    this.idleWorkers = [];
    this.jobResolvers = new Map();
    this.pendingJobs = [];

    for (let i = 0; i < size; i++) {
      this.createWorker();
    }
  }

  createWorker() {
    const worker = new Worker(this.workerFile);

    worker.on('message', (message) => {
      const { jobId } = message;
      const resolver = this.jobResolvers.get(jobId);

      if (resolver) {
        resolver.resolve(message);
        this.jobResolvers.delete(jobId);
      }

      this.idleWorkers.push(worker);
      this.dispatch();
    });

    worker.on('error', (err) => {
      console.error('[worker error]', err);

      if (worker.currentJobId) {
        const resolver = this.jobResolvers.get(worker.currentJobId);
        if (resolver) {
          resolver.reject(err);
          this.jobResolvers.delete(worker.currentJobId);
        }
      }

      this.workers = this.workers.filter((w) => w !== worker);
      this.idleWorkers = this.idleWorkers.filter((w) => w !== worker);

      // 自动补一个新 worker
      this.createWorker();
      this.dispatch();
    });

    worker.on('exit', (code) => {
      if (code !== 0) {
        console.warn(`[worker exit] code=${code}`);
      }
    });

    this.workers.push(worker);
    this.idleWorkers.push(worker);
  }

  runJob(job) {
    return new Promise((resolve, reject) => {
      this.pendingJobs.push({ job, resolve, reject });
      this.dispatch();
    });
  }

  dispatch() {
    while (this.idleWorkers.length > 0 && this.pendingJobs.length > 0) {
      const worker = this.idleWorkers.shift();
      const task = this.pendingJobs.shift();
      const { job, resolve, reject } = task;

      worker.currentJobId = job.id;
      this.jobResolvers.set(job.id, { resolve, reject });
      worker.postMessage(job);
    }
  }

  async destroy() {
    await Promise.all(this.workers.map((worker) => worker.terminate()));
  }
}

class JobManager {
  constructor() {
    this.queue = new InMemoryQueue();
    this.results = new Map();
    const cpuCount = os.cpus().length;
    const poolSize = Math.max(1, Math.min(cpuCount - 1, 4));

    this.pool = new WorkerPool(path.resolve(__dirname, './worker.js'), poolSize);
    this.maxRetries = 2;
    this.running = false;

    console.log(`[init] cpu=${cpuCount}, poolSize=${poolSize}`);
  }

  submit(payload) {
    const id = `${Date.now()}-${Math.random().toString(16).slice(2)}`;
    const job = {
      id,
      payload,
      retryCount: 0,
      status: 'pending',
      createdAt: new Date().toISOString(),
    };

    this.results.set(id, {
      status: 'pending',
      payload,
      retryCount: 0,
    });

    this.queue.push(job);
    return id;
  }

  getResult(jobId) {
    return this.results.get(jobId);
  }

  async startConsumer() {
    if (this.running) return;
    this.running = true;

    while (this.running) {
      const job = this.queue.pop();

      if (!job) {
        await new Promise((r) => setTimeout(r, 50));
        continue;
      }

      this.results.set(job.id, {
        ...this.results.get(job.id),
        status: 'processing',
        startedAt: new Date().toISOString(),
      });

      this.pool
        .runJob(job)
        .then((res) => {
          this.results.set(job.id, {
            status: res.status,
            result: res.result,
            duration: res.duration,
            finishedAt: new Date().toISOString(),
          });
          console.log(`[success] job=${job.id}, duration=${res.duration}ms`);
        })
        .catch((err) => {
          const retryCount = (job.retryCount || 0) + 1;

          if (retryCount <= this.maxRetries) {
            console.warn(`[retry] job=${job.id}, retryCount=${retryCount}`);

            const retryJob = {
              ...job,
              retryCount,
              status: 'retrying',
            };

            this.results.set(job.id, {
              status: 'retrying',
              retryCount,
              error: err.message,
            });

            setTimeout(() => this.queue.push(retryJob), 300);
          } else {
            console.error(`[dead-letter] job=${job.id}, error=${err.message}`);
            this.results.set(job.id, {
              status: 'dead_letter',
              retryCount,
              error: err.message,
              finishedAt: new Date().toISOString(),
            });
          }
        });
    }
  }

  stop() {
    this.running = false;
  }
}

async function main() {
  const manager = new JobManager();
  manager.startConsumer();

  const jobIds = [];
  const taskCount = 10;

  for (let i = 0; i < taskCount; i++) {
    const id = manager.submit({ n: 35 });
    jobIds.push(id);
  }

  console.log(`[submit] total jobs=${taskCount}`);

  const timer = setInterval(() => {
    const snapshot = jobIds.map((id) => ({
      id,
      ...manager.getResult(id),
    }));

    const summary = snapshot.reduce(
      (acc, item) => {
        acc[item.status] = (acc[item.status] || 0) + 1;
        return acc;
      },
      {}
    );

    console.log('[status]', summary);

    const done = snapshot.every((item) =>
      ['success', 'dead_letter'].includes(item.status)
    );

    if (done) {
      console.log('\n[final results]');
      console.dir(snapshot, { depth: null });

      clearInterval(timer);
      manager.stop();

      setTimeout(async () => {
        await manager.pool.destroy();
        process.exit(0);
      }, 500);
    }
  }, 500);
}

main().catch((err) => {
  console.error(err);
  process.exit(1);
});

运行方式

node main.js

运行后你会看到:

  • 任务先进入 pending
  • 被消费者拉取后变成 processing
  • Worker 处理完成后变成 success
  • 如果失败则进入 retrying,超过次数进入 dead_letter

这个示例的关键设计点

1. 入队与执行解耦

submit() 只负责接单,不直接计算。

这很重要,因为 API 层要尽量快返回,否则请求超时会把上游也拖垮。


2. Worker 池而不是每任务一个 Worker

这是稳定性的关键。

坏写法:

const worker = new Worker('./worker.js');

每个任务都这样来一次,高并发下线程创建会变成新的瓶颈。

正确思路是:

  • 预创建固定数量 Worker
  • 维护空闲队列
  • 任务到来时分配执行

3. 重试与死信不能省

线上环境里,失败不是“会不会发生”,而是“什么时候发生”。

你至少要考虑:

  • Worker 崩溃
  • 数据格式错误
  • 依赖不可用
  • 内存不足
  • 业务异常

如果没有重试与死信,失败任务要么丢失,要么无限重放,都会出问题。


常见坑与排查

这一节我尽量讲点真实会踩的坑。

坑一:把 I/O 任务也全塞给 Worker

很多人一看到 Worker,就会想把所有异步操作都丢进去。

其实没必要。

例如:

  • 调数据库
  • 请求第三方接口
  • 读写文件(普通规模)

这类大多是 I/O 密集任务,本身 Node 主线程就能高效处理。硬塞给 Worker,反而增加线程切换和消息传递成本。

判断原则

如果瓶颈主要是:

  • CPU 飙高
  • 事件循环延迟增大
  • 单个任务计算时间长

那就适合 Worker。

如果瓶颈主要是:

  • 外部接口慢
  • DB 慢
  • 网络等待长

优先考虑异步化、连接池、限流和批处理。


坑二:消息传递对象过大

主线程和 Worker 之间通过结构化克隆传递数据。对象很大时,开销非常明显。

现象

  • CPU 异常升高
  • 明明计算不重,但整体吞吐不高
  • 内存涨得快

排查方法

  • 统计单个任务 payload 大小
  • 记录 postMessage 前后的时间
  • 比较“传大对象”和“传 ID 后自行拉取数据”的差异

建议

不要传整块超大数据,尽量传:

  • 数据 ID
  • 文件路径
  • 存储位置
  • 必要的最小参数

坑三:并发数开太大

Worker 不是越多越快。

典型现象

  • CPU 长时间 100%
  • 吞吐没提升,反而下降
  • 上下文切换增加
  • 主线程响应抖动

排查思路

  1. 看 CPU 核数
  2. 看任务平均耗时
  3. 看 event loop delay
  4. 看队列积压长度
  5. 看 GC 次数和停顿

如果 Worker 数量高于 CPU 实际承载能力,收益往往很差。


坑四:消费者无限拉取,导致内存堆积

如果消息队列消费太快,而 Worker 执行速度有限,就会出现:

  • 主进程 pendingJobs 越积越多
  • 内存不断增长
  • 最终 OOM

解决方法

要有 背压机制

  • 只在池子有空闲能力时继续拉取
  • 或限制本地待处理任务上限
  • 达到阈值时暂停消费

这个点在接真实 MQ 时尤其重要。


坑五:任务不是幂等的,却做了自动重试

这类问题很隐蔽。

例如任务逻辑是:

  • 扣库存
  • 发券
  • 发消息
  • 扣费

如果没有幂等保护,重试一次就可能执行两次。

建议

对有副作用的任务,必须做:

  • 幂等键
  • 去重表
  • 状态机校验
  • 最终一致性补偿

安全/性能最佳实践

这一节我把线上更常用的建议收拢一下。

1. 输入参数必须校验

任务入队前就要校验参数,不要等进 Worker 后才发现不合法。

例如:

  • 数值范围
  • 字段是否缺失
  • 数据类型是否正确
  • 是否超出资源限制

简单例子:

function validatePayload(payload) {
  if (!payload || typeof payload.n !== 'number') {
    throw new Error('payload.n 必须是数字');
  }

  if (payload.n < 1 || payload.n > 45) {
    throw new Error('payload.n 超出允许范围');
  }
}

这样可以避免恶意或异常输入导致计算资源被滥用。


2. 给任务设置超时

有些任务会卡死,或者因为逻辑 bug 长时间不返回。

建议在调度层做超时控制。

function withTimeout(promise, ms) {
  return Promise.race([
    promise,
    new Promise((_, reject) =>
      setTimeout(() => reject(new Error('job timeout')), ms)
    ),
  ]);
}

如果真实业务允许,超时后:

  • 标记失败
  • 进入重试
  • 必要时销毁 Worker 重建

3. Worker 崩溃后要自动拉起

Worker 进程/线程不是永远可靠的。比如:

  • 未捕获异常
  • 内存问题
  • 原生模块异常

因此池子必须能:

  • 感知 error
  • 感知 exit
  • 自动补新 Worker

这一点示例代码里已经演示了。


4. 区分“业务失败”和“系统失败”

这件事对重试策略很关键。

  • 业务失败:参数错误、数据不存在、状态不合法
    通常不应重试
  • 系统失败:网络抖动、线程异常、临时资源不足
    可以重试

如果不区分,最常见的问题就是:把无效任务不断重试,把系统拖死。


5. 做好监控指标

最低限度建议监控这些指标:

  • 队列长度
  • 任务吞吐量(TPS)
  • 平均处理时长
  • P95/P99 耗时
  • 成功率 / 失败率
  • 重试次数
  • 死信数量
  • Worker 活跃数
  • CPU / 内存 / event loop delay

如果这些没有观测,系统一慢你只能靠猜。


6. 控制单任务资源上限

对于可能消耗大量资源的任务,建议加边界:

  • 限制输入大小
  • 限制计算次数
  • 限制执行时间
  • 限制每用户并发任务数

这既是性能问题,也是安全问题。否则一个超大任务就可能拖垮整个服务。


7. 优先考虑批量化与任务拆分

不是所有问题都要靠增加线程解决。

如果任务可拆分,可以考虑:

  • 大任务拆成多个小任务
  • 同类任务批处理
  • 中间结果缓存
  • 热点结果复用

这通常比盲目增加 Worker 更有效。


接入真实消息队列时的落地建议

上面的示例用的是内存队列,真实项目一般会接 MQ。这里给一些简洁的落地建议。

如果用 Redis/BullMQ

适合:

  • 中小型任务系统
  • 开发成本低
  • 需要快速上线

关注点:

  • Redis 内存容量
  • 延迟队列与重试配置
  • 作业幂等处理

如果用 RabbitMQ

适合:

  • 任务确认机制要求清晰
  • 路由灵活
  • 死信队列成熟

关注点:

  • ack/nack 时机
  • prefetch 控制
  • 死信交换机配置

如果用 Kafka

适合:

  • 超大吞吐
  • 流式处理
  • 事件驱动平台

关注点:

  • 分区与消费者组设计
  • 消息顺序
  • 重复消费与幂等

一个更稳的工程化分层建议

实际项目里,我比较推荐按下面方式拆层:

classDiagram
    class ApiLayer {
      +submitTask()
      +queryStatus()
    }

    class QueueAdapter {
      +publish(job)
      +consume()
      +ack()
      +retry()
    }

    class Scheduler {
      +dispatch()
      +backpressure()
      +timeoutControl()
    }

    class WorkerPool {
      +runJob()
      +createWorker()
      +destroy()
    }

    class ResultStore {
      +saveStatus()
      +saveResult()
      +getResult()
    }

    ApiLayer --> QueueAdapter
    QueueAdapter --> Scheduler
    Scheduler --> WorkerPool
    Scheduler --> ResultStore

这样做的好处是:

  • API 层不关心线程细节
  • MQ 能替换
  • Worker 执行逻辑能独立测试
  • 状态存储和调度策略可以单独演进

总结

如果你在 Node.js 里处理的是“高并发 + 重任务”场景,一个很实用的思路是:

  • 消息队列负责削峰、缓冲、重试、解耦
  • Worker Threads负责 CPU 密集并行执行
  • 调度层负责控制并发、背压、超时和失败恢复

可以把它记成一句话:

主线程接单,队列缓冲,Worker 干活,状态可追踪,失败可重试。

最后给几点可执行建议,适合落地时直接参考:

  1. 先判断瓶颈是不是 CPU
    • 如果只是 I/O 慢,不要滥用 Worker
  2. Worker 数量从 CPU 核数附近开始压测
    • 不要凭感觉乱配
  3. 一定要做任务状态管理
    • pending / processing / success / retrying / dead_letter
  4. 重试要有边界
    • 次数、间隔、死信都要清楚
  5. 对有副作用任务做幂等
    • 这是线上事故高发点
  6. 加监控和背压
    • 没有观测,系统再“快”也不算稳

边界条件也要说清楚:

  • 如果任务非常轻,Worker 可能不划算
  • 如果任务极重、依赖复杂,可能更适合独立计算服务
  • 如果吞吐要求极高,单机优化不够,最终还是要走分布式扩容

但对大多数中型 Node.js 业务来说,Worker Threads + 消息队列 已经是一套足够实用、足够稳、也比较容易渐进演化的方案。只要线程数、重试策略、幂等和监控这几个点抓住,系统稳定性通常会比“主线程硬扛”好很多。


分享到:

下一篇
《Web逆向实战:中级开发者如何定位并复现前端签名算法实现接口自动化调用》