第8章 8.3 Worker Threads 多线程

🎯 开场 3 分钟:为什么你的代码总是卡在"干等"上?

上一章我们学了 cluster 集群,用多进程的方式让 Node.js 像个餐饮帝国一样开了多家分店,把请求分配给不同的进程处理。

但你有没有遇到过这种情况:一家店里的厨师不止一个,但大家都在等同一块砧板

举个例子,你要做一个「给 10 万张图片加滤镜」的程序。用 cluster 可以开多个进程,但每个进程里还是只有一个厨师在吭哧吭哧干。如果这个厨师还要做一些复杂的计算(不是等 I/O,是真正吃 CPU 的计算),那他还是会累得满头大汗。

痛点来了

  • Node.js 天然是单线程的(就像一个厨师一次只能颠一个勺)
  • 但你的电脑有 8 个 CPU 核心,老板只让你雇一个厨师,剩下 7 个核心在旁边晒太阳
  • 有些任务天生就是「重体力活」,比如视频转码、加密解密、数据分析——这些不是等网络返回,是真的在算

学完这章你能解决

  • 让一个 Node.js 进程里同时跑多个「小厨师」,把 CPU 多核用起来
  • 在不阻塞主线程的情况下做大量计算
  • 理解 Worker Threads 和 cluster 的区别,什么时候该用哪个

简单说:cluster 是开多家分店,Worker Threads 是让每家店里都有多个厨师同时干活。


🧱 基础 25 分钟:核心概念(小白视角)

8.3.1 什么是 Worker Thread?先别急,听我讲个故事

类比时间:想象你是奶茶店老板。

  • 单线程:店里只有 1 个店员,进货、点单、做奶茶、收银全是他。顾客一多就排长队。
  • cluster 多进程:你开了 5 家分店,每家 1 个店员。顾客被分配到不同店。
  • Worker Threads 多线程:你把店员分成了「前台」和「后厨」——前台只管点单收银,后厨有 3 个员工同时做奶茶。前台收到 100 杯订单,后厨 3 人分工做,比 1 个人做 100 杯快 3 倍。

Worker Thread 就是「同一家店里多个员工同时干活」,而 cluster 是「多家店各自干活」。

8.3.2 什么时候用 Worker Thread?

适合场景(CPU 密集型任务):

  • 图片/视频处理(加滤镜、转码、压缩)
  • 数据加密解密
  • 大数据计算、排序、搜索
  • 机器学习推理

不适合场景(V8 引擎限制):

  • 大量 I/O 操作(网络请求、文件读写)—— 这个用 async/await 或 cluster 更香
  • 任务之间需要频繁通信——线程间通信有开销,太频繁反而更慢

8.3.3 最简单的例子:先让代码跑起来

// worker-basic.js
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');

// 判断是主线程还是工作线程
if (isMainThread) {
console.log('我是主线程,准备派任务...');

// 创建一个新线程,让它执行这个文件
const worker = new Worker(__filename, {
workerData: { num: 42 }  // 给工作线程传数据
});

// 监听工作线程发回来的消息
worker.on('message', (result) => {
console.log('收到工作线程的返回:', result);
});

worker.on('error', (err) => {
console.error('工作线程出错了:', err);
});

worker.on('exit', (code) => {
console.log('工作线程退出了,退出码:', code);
});

} else {
// 这部分代码会在工作线程里执行
console.log('工作线程开始干活了,收到数据:', workerData);

// 模拟一个耗时计算(累加 1 到 10000000)
let sum = 0;
for (let i = 1; i <= 10000000; i++) {
sum += i;
}

// 把结果发回主线程
parentPort.postMessage({ 计算结果: sum, 原始数据: workerData.num });
}

运行结果:

我是主线程,准备派任务...
工作线程开始干活了,收到数据: { num: 42 }
收到工作线程的返回: { 计算结果: 500000050000000, 原始数据: 42 }
工作线程退出了,退出码: 0

代码解释

  • isMainThread:判断当前是不是主线程,主线程才创建 Worker
  • workerData:主线程传给工作线程的数据
  • parentPort.postMessage():工作线程把结果发回主线程

8.3.4 线程间数据传递:MessageChannel

有时候你想让两个工作线程直接对话,不经过主线程。可以用 MessageChannel

// worker-channel.js
const { Worker, MessageChannel } = require('worker_threads');

const worker1 = new Worker(`
const { parentPort } = require('worker_threads');
parentPort.on('message', (data) => {
console.log('线程1收到:', data);
parentPort.postMessage('线程1处理完毕');
});
`, { eval: true });

const worker2 = new Worker(`
const { parentPort } = require('worker_threads');
parentPort.on('message', (data) => {
console.log('线程2收到:', data);
});
`, { eval: true });

// 创建一个通道,让线程1和线程2直接通信
const { port1, port2 } = new MessageChannel();

worker1.postMessage({ port: port1 }, [port1]);
worker2.postMessage({ port: port2, msg: '你好线程2' }, [port2]);

8.3.5 SharedArrayBuffer:共享内存(高效但危险)

MessageChannel 每传一次数据都要「复制」,大数据量时很慢。SharedArrayBuffer 允许多个线程共享同一块内存,直接读写,不用复制。

// worker-shared.js
const { Worker, isMainThread, SharedArrayBuffer } = require('worker_threads');

if (isMainThread) {
// 创建一个共享数组,5个元素,初始值都是0
const sharedBuffer = new SharedArrayBuffer(5 * Int32Array.BYTES_PER_ELEMENT);
const sharedArray = new Int32Array(sharedBuffer);

console.log('主线程初始值:', Array.from(sharedArray));

const worker = new Worker(__filename, {
workerData: sharedBuffer
});

worker.on('message', () => {
console.log('主线程读取最终值:', Array.from(sharedArray));
});

} else {
const sharedArray = new Int32Array(workerData);

// 工作线程修改共享数据
for (let i = 0; i < 5; i++) {
sharedArray[i] = (i + 1) * 100;
}

parentPort.postMessage('工作线程已修改共享数据');
}

配图1 - 配图1

注意:SharedArrayBuffer 很快,但有「竞态条件」风险——两个线程同时改同一个数据,谁先改就不一定了。新手慎用!

8.3.6 workerData 和 parentPort 的关系

API 方向 用途
workerData 主线程 → 工作线程 只传一次的开局数据
parentPort.postMessage() 工作线程 → 主线程 随时发送结果
worker.on('message') 工作线程 → 主线程 主线程接收消息

🔥 实战 35 分钟:3 个递进的小项目

📦 项目 1(5 分钟):累加计算器

需求:计算 1 到 1 亿的累加和,用工作线程来做,不阻塞主线程。

// project-1-calculator.js
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');

if (isMainThread) {
const start = Date.now();
console.log('🎬 主线程开始派发任务...');

const worker = new Worker(__filename, {
workerData: { from: 1, to: 100000000 }
});

worker.on('message', (result) => {
console.log('✅ 计算完成!结果是:', result.sum);
console.log('⏱️ 耗时:', Date.now() - start, '毫秒');
process.exit(0);
});

console.log('🔄 主线程可以继续干别的事(比如接受用户输入)...');

} else {
const { from, to } = workerData;
let sum = 0n;  // 用 BigInt,避免大数溢出

for (let i = from; i <= to; i++) {
sum += BigInt(i);
}

parentPort.postMessage({ sum: sum.toString() });
}

预期输出

🎬 主线程开始派发任务...
🔄 主线程可以继续干别的事(比如接受用户输入)...
✅ 计算完成!结果是: 5000000050000000
⏱️ 耗时: 892 毫秒

一句话解释:主线程创建工作线程后不等待,继续做自己的事,等工作线程算完再回来收结果。


📦 项目 2(15 分钟):批量文件重命名工具

需求:有一个文件夹里有几十张图片,名字杂乱。要按「序号_原名.jpg」的格式批量重命名。

为了模拟,我们先创建测试文件:

// project-2-rename.js
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
const fs = require('fs');
const path = require('path');

// 先创建测试文件(只有主线程才执行)
if (isMainThread) {
const testDir = path.join(__dirname, 'test_images');

// 创建测试目录和文件
if (!fs.existsSync(testDir)) {
fs.mkdirSync(testDir);
for (let i = 0; i < 20; i++) {
  const oldName = `照片${i + 1}.jpg`;
  fs.writeFileSync(path.join(testDir, oldName), `模拟图片内容 ${i}`);
}
console.log('✅ 测试文件已创建');
}

console.log('📂 待处理文件:', fs.readdirSync(testDir));

// 读取所有文件
const files = fs.readdirSync(testDir)
.filter(f => f.endsWith('.jpg'))
.map(f => ({
  oldPath: path.join(testDir, f),
  oldName: f
}));

console.log(`\n🎬 找到 ${files.length} 个文件,准备派发给工作线程...\n`);

// 每个工作线程处理一批文件
const threadCount = 3;
const chunkSize = Math.ceil(files.length / threadCount);

let completedThreads = 0;

for (let i = 0; i < threadCount; i++) {
const chunk = files.slice(i * chunkSize, (i + 1) * chunkSize);

if (chunk.length === 0) continue;

const worker = new Worker(__filename, {
  workerData: { files: chunk, threadIndex: i + 1 }
});

worker.on('message', (result) => {
  console.log(`\n🧵 线程${result.threadIndex} 完成:`);
  result.renamed.forEach(f => console.log(`  ${f.oldName} → ${f.newName}`));

  completedThreads++;
  if (completedThreads === threadCount) {
    console.log('\n✨ 全部重命名完成!');
    console.log('📂 最终文件:', fs.readdirSync(testDir));
  }
});
}

} else {
// 工作线程负责具体的重命名操作
const { files, threadIndex } = workerData;
const renamed = [];

files.forEach((file, idx) => {
const newName = `${String(threadIndex).padStart(2, '0')}_${String(idx + 1).padStart(3, '0')}_${file.oldName}`;
const newPath = path.join(path.dirname(file.oldPath), newName);

try {
  fs.renameSync(file.oldPath, newPath);
  renamed.push({ oldName: file.oldName, newName });
} catch (err) {
  console.error(`线程${threadIndex} 重命名失败:`, file.oldName, err.message);
}
});

parentPort.postMessage({ threadIndex, renamed });
}

预期输出(部分):

✅ 测试文件已创建
📂 待处理文件: [ '照片1.jpg', '照片2.jpg', ..., '照片20.jpg' ]

🎬 找到 20 个文件,准备派发给工作线程...

🧵 线程1 完成:
照片1.jpg → 01_001_照片1.jpg
照片2.jpg → 01_002_照片2.jpg
照片3.jpg → 01_003_照片3.jpg
照片4.jpg → 01_004_照片4.jpg
照片5.jpg → 01_005_照片5.jpg
照片6.jpg → 01_006_照片6.jpg
照片7.jpg → 01_007_照片7.jpg

✨ 全部重命名完成!

一句话解释:把大量文件分成多份,每个工作线程处理一份,实现「并行重命名」。


📦 项目 3(15 分钟):CPU 密集型数据处理小工具

需求:读取一份 JSON 格式的销售数据(模拟1000条),计算每个月的销售额、最畅销的商品,并用工作线程做排序和统计。

// project-3-dashboard.js
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');

// 模拟的销售数据
const generateSalesData = () => {
const products = ['iPhone', 'iPad', 'MacBook', 'AirPods', 'Apple Watch'];
const data = [];

for (let i = 0; i < 1000; i++) {
data.push({
  id: i + 1,
  product: products[Math.floor(Math.random() * products.length)],
  month: Math.floor(Math.random() * 12) + 1,
  quantity: Math.floor(Math.random() * 50) + 1,
  price: Math.floor(Math.random() * 5000) + 500
});
}
return data;
};

if (isMainThread) {
console.log('📊 销售数据分析工具\n');
console.log('=' .repeat(40));

const salesData = generateSalesData();
console.log(`📦 共 ${salesData.length} 条销售记录\n`);

// 启动两个工作线程分别做不同分析
let completed = 0;
const results = {};

// 线程1:按月份统计销售额
const worker1 = new Worker(__filename, {
workerData: { task: 'monthlySales', data: salesData }
});

worker1.on('message', (result) => {
console.log('📅 月度销售统计:');
Object.entries(result).forEach(([month, total]) => {
  console.log(`  ${month}月: ¥${total.toLocaleString()}`);
});
completed++;
checkComplete();
});

// 线程2:找出最畅销商品
const worker2 = new Worker(__filename, {
workerData: { task: 'topProducts', data: salesData }
});

worker2.on('message', (result) => {
console.log('\n🏆 商品销量排行:');
result.forEach((item, idx) => {
  console.log(`  ${idx + 1}. ${item.name}: ${item.quantity} 件`);
});
completed++;
checkComplete();
});

function checkComplete() {
if (completed === 2) {
  console.log('\n' + '='.repeat(40));
  console.log('✨ 分析完成!');
}
}

} else {
const { task, data } = workerData;
let result;

if (task === 'monthlySales') {
// 按月份汇总销售额
const monthlyTotals = {};
for (let i = 1; i <= 12; i++) monthlyTotals[i] = 0;

data.forEach(record => {
  monthlyTotals[record.month] += record.quantity * record.price;
});

result = monthlyTotals;

} else if (task === 'topProducts') {
// 按商品汇总销量并排序
const productStats = {};

data.forEach(record => {
  if (!productStats[record.product]) {
    productStats[record.product] = 0;
  }
  productStats[record.product] += record.quantity;
});

// 排序并取前5名
result = Object.entries(productStats)
  .map(([name, quantity]) => ({ name, quantity }))
  .sort((a, b) => b.quantity - a.quantity)
  .slice(0, 5);
}

parentPort.postMessage(result);
}

预期输出

📊 销售数据分析工具

========================================
📦 共 1000 条销售记录

📅 月度销售统计:
1月: ¥1,234,567
2月: ¥1,456,789
3月: ¥1,123,456
...(省略其他月份)

🏆 商品销量排行:
1. iPhone: 8,432 件
2. AirPods: 7,891 件
3. Apple Watch: 6,543 件
4. iPad: 5,678 件
5. MacBook: 4,321 件

========================================
✨ 分析完成!

一句话解释:主线程同时派发两个任务给不同工作线程,一个算月度统计,一个算商品排行,两个线程「各干各的」,最后主线程汇总结果。

配图2 - 配图2


💪 进阶 20 分钟:常见坑 + 性能小贴士

❌ 坑1:以为 worker_threads 会自动共享变量

// ❌ 错误写法
let counter = 0;
const worker = new Worker(__filename);

worker.on('message', () => {
console.log(counter);  // 永远是 0!
counter++;  // 主线程的 counter 和工作线程的 counter 是两回事
});
// ✅ 正确写法
const { Worker } = require('worker_threads');
const worker = new Worker(`
const { parentPort } = require('worker_threads');
let counter = 0;
parentPort.on('message', () => {
counter++;
parentPort.postMessage(counter);
});
`, { eval: true });

worker.postMessage('inc');

解释:每个 Worker 有自己的 V8 实例,变量不共享。要共享必须用 SharedArrayBuffer,而且要自己处理同步。

❌ 坑2:传递大数据时不用 SharedArrayBuffer

// ❌ 错误写法:传10万条数据,每传一次都复制一遍,内存爆炸
const hugeData = generateArray(100000);
worker.postMessage(hugeData);  // 这会复制整份数据!
// ✅ 正确写法:用 SharedArrayBuffer 共享内存
const sharedBuffer = new SharedArrayBuffer(size);
worker.postMessage({ buffer: sharedBuffer }, [sharedBuffer]);

❌ 坑3:在循环里不断创建 Worker

// ❌ 错误写法:每处理一条数据就创建一个线程,创建/销毁开销巨大
for (const item of items) {
const worker = new Worker(__filename);
worker.postMessage(item);
}
// ✅ 正确写法:创建固定数量的 Worker,用消息队列分发任务
const workerPool = Array.from({ length: 4 }, () => new Worker(__filename));
let currentIndex = 0;

for (const item of items) {
workerPool[currentIndex % workerPool.length].postMessage(item);
currentIndex++;
}

❌ 坑4:不监听 error 事件导致静默失败

// ❌ 错误写法:工作线程报错,主线程完全不知道
const worker = new Worker(__filename);
worker.postMessage('start');
// 如果工作线程里 throw new Error,主线程啥也收不到
// ✅ 正确写法:一定要监听 error 和 exit 事件
worker.on('error', (err) => {
console.error('工作线程崩溃:', err);
});

worker.on('exit', (code) => {
if (code !== 0) {
console.error(`工作线程异常退出,退出码: ${code}`);
}
});

❌ 坑5:误用 Worker Threads 处理 I/O 密集型任务

// ❌ 错误写法:下载100个文件,用Worker Threads?没必要,还占线程
const workers = urls.map(url => {
const worker = new Worker(`fetch(url)`);  // 大材小用
});
// ✅ 正确写法:I/O任务用 async/await 或 cluster
const fetchAll = async (urls) => {
return Promise.all(urls.map(url => fetch(url)));
};

🚀 性能小贴士:Worker 池化

创建和销毁 Worker 有开销。如果任务很多,用完不销毁,放到池子里复用。

class WorkerPool {
constructor(filename, size = 4) {
this.pool = [];
this.tasks = [];

for (let i = 0; i < size; i++) {
  this.pool.push(new Worker(filename));
}
}

run(data) {
return new Promise((resolve) => {
  // 找空闲的 Worker
  const worker = this.pool.find(w => !w.busy);

  if (worker) {
    this._assign(worker, data, resolve);
  } else {
    // 没空闲的,排队
    this.tasks.push({ data, resolve });
  }
});
}

_assign(worker, data, resolve) {
worker.busy = true;
worker.once('message', (result) => {
  worker.busy = false;
  resolve(result);

  // 处理排队的任务
  if (this.tasks.length > 0) {
    const next = this.tasks.shift();
    this._assign(worker, next.data, next.resolve);
  }
});
worker.postMessage(data);
}
}

🔧 调试技巧:用 console.log 在 Worker 里输出

工作线程里的 console.log 会自动把输出传到主线程:

// worker-debug.js
const { Worker, isMainThread } = require('worker_threads');

if (isMainThread) {
const worker = new Worker(__filename);

// 工作线程的 console.log 会显示在这里
worker.stdout.on('data', (data) => {
console.log('[Worker stdout]:', data.toString());
});

worker.stderr.on('data', (data) => {
console.error('[Worker stderr]:', data.toString());
});

worker.postMessage('start');

} else {
// 工作线程里正常用 console.log
console.log('工作线程启动');
console.log('做一些计算...');
console.log('计算完成');

parentPort.postMessage('done');
}

✏️ 练习题 + 作业题

练习题(10 分钟)

练习 1(2 分钟):改改数字
- 输入:在项目 1 中,把计算范围从 1~100000000 改成 1~1000
- 预期输出:显示 500500
- 提示:只改 workerData 里的 to

练习 2(2 分钟):加个判断
- 输入:在项目 1 中,如果累加结果是偶数,输出 结果是偶数,否则输出 结果是奇数
- 预期输出:结果是偶数(因为 5000000050000000 是偶数)
- 提示:在工作线程里用 sum % 2n === 0n 判断

练习 3(2 分钟):新数据源
- 输入:用项目 2 的方法,把测试目录换成 test_docs,文件换成 .txt,重命名格式改成 doc_序号
- 预期输出:文件被正确重命名
- 提示:把 .jpg 换成 .txt,格式字符串改一下

练习 4(2 分钟):串起来
- 输入:用项目 3 的结构,再加一个线程,统计「每个季度的总销售额」
- 预期输出:显示 Q1、Q2、Q3、Q4 的销售额
- 提示:1-3月=Q1, 4-6月=Q2... 在工作线程里把月份转成季度

练习 5(2 分钟):找错
- 输入:以下代码运行后报错 worker.on is not a function,找出错误并修复

const { Worker } = require('worker_threads');
const worker = new Worker('console.log("hi")');
worker.on('message', console.log);  // 这里报错
  • 预期输出:能正常运行,输出 hi
  • 提示:检查 Worker 构造函数的第二个参数怎么写

作业题(30 分钟 - 2 小时)

作业:做一个「 Worker Threads 图片批量处理器」

  • 需求描述:做一个命令行工具,能批量处理图片(模拟:读取文件列表,计算每个文件的「哈希值」用于去重)

  • 功能点
    1. 读取指定目录下的所有文件
    2. 用工作线程并行计算每个文件的哈希值(用 crypto 模块的 hash
    3. 找出重复文件(哈希值相同的)
    4. 输出报告:共多少文件、发现多少重复、每组重复文件列表

  • 加分项
    1. 用 Worker 池而不是每次新建 Worker
    2. 显示进度条(处理到第 X 个/共 Y 个)
    3. 支持按扩展名过滤(如只处理 .jpg

  • 验收标准

  • 能跑起来(node main.js ./test_folder
  • 输出正确的哈希值和重复报告
  • 代码有适当注释

  • 提交方式:评论区贴代码或 GitHub 链接


📚 总结 + 资源

本文学了 3 件事

  1. Worker Threads 是啥:让 Node.js 在单进程内实现多线程,把 CPU 多核用起来
  2. 怎么用:用 new Worker(__filename) 创建,workerData 传数据,parentPort.postMessage 收结果
  3. 什么时候用:CPU 密集型任务(计算、加密、排序)用 Worker,I/O 密集型任务用 async/await

延伸学习资源


互动钩子:你在工作中遇到过「CPU 跑满了但 Node.js 还是很慢」的情况吗?当时是怎么解决的?评论区聊聊,老粉优先回复!

下一章我们要解决一个更实际的问题:怎么知道我的 Worker Threads 到底有没有让代码变快? 我们会学习 clinic.jsv8-profiler,用工具来「给代码照 CT」,看看瓶颈到底在哪。

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。