第8章 8.3 Worker Threads 多线程
🎯 开场 3 分钟:为什么你的代码总是卡在"干等"上?
上一章我们学了 cluster 集群,用多进程的方式让 Node.js 像个餐饮帝国一样开了多家分店,把请求分配给不同的进程处理。
但你有没有遇到过这种情况:一家店里的厨师不止一个,但大家都在等同一块砧板?
举个例子,你要做一个「给 10 万张图片加滤镜」的程序。用 cluster 可以开多个进程,但每个进程里还是只有一个厨师在吭哧吭哧干。如果这个厨师还要做一些复杂的计算(不是等 I/O,是真正吃 CPU 的计算),那他还是会累得满头大汗。
痛点来了:
- Node.js 天然是单线程的(就像一个厨师一次只能颠一个勺)
- 但你的电脑有 8 个 CPU 核心,老板只让你雇一个厨师,剩下 7 个核心在旁边晒太阳
- 有些任务天生就是「重体力活」,比如视频转码、加密解密、数据分析——这些不是等网络返回,是真的在算
学完这章你能解决:
- 让一个 Node.js 进程里同时跑多个「小厨师」,把 CPU 多核用起来
- 在不阻塞主线程的情况下做大量计算
- 理解 Worker Threads 和 cluster 的区别,什么时候该用哪个
简单说:cluster 是开多家分店,Worker Threads 是让每家店里都有多个厨师同时干活。
🧱 基础 25 分钟:核心概念(小白视角)
8.3.1 什么是 Worker Thread?先别急,听我讲个故事
类比时间:想象你是奶茶店老板。
- 单线程:店里只有 1 个店员,进货、点单、做奶茶、收银全是他。顾客一多就排长队。
- cluster 多进程:你开了 5 家分店,每家 1 个店员。顾客被分配到不同店。
- Worker Threads 多线程:你把店员分成了「前台」和「后厨」——前台只管点单收银,后厨有 3 个员工同时做奶茶。前台收到 100 杯订单,后厨 3 人分工做,比 1 个人做 100 杯快 3 倍。
Worker Thread 就是「同一家店里多个员工同时干活」,而 cluster 是「多家店各自干活」。
8.3.2 什么时候用 Worker Thread?
适合场景(CPU 密集型任务):
- 图片/视频处理(加滤镜、转码、压缩)
- 数据加密解密
- 大数据计算、排序、搜索
- 机器学习推理
不适合场景(V8 引擎限制):
- 大量 I/O 操作(网络请求、文件读写)—— 这个用 async/await 或 cluster 更香
- 任务之间需要频繁通信——线程间通信有开销,太频繁反而更慢
8.3.3 最简单的例子:先让代码跑起来
// worker-basic.js
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
// 判断是主线程还是工作线程
if (isMainThread) {
console.log('我是主线程,准备派任务...');
// 创建一个新线程,让它执行这个文件
const worker = new Worker(__filename, {
workerData: { num: 42 } // 给工作线程传数据
});
// 监听工作线程发回来的消息
worker.on('message', (result) => {
console.log('收到工作线程的返回:', result);
});
worker.on('error', (err) => {
console.error('工作线程出错了:', err);
});
worker.on('exit', (code) => {
console.log('工作线程退出了,退出码:', code);
});
} else {
// 这部分代码会在工作线程里执行
console.log('工作线程开始干活了,收到数据:', workerData);
// 模拟一个耗时计算(累加 1 到 10000000)
let sum = 0;
for (let i = 1; i <= 10000000; i++) {
sum += i;
}
// 把结果发回主线程
parentPort.postMessage({ 计算结果: sum, 原始数据: workerData.num });
}
运行结果:
我是主线程,准备派任务...
工作线程开始干活了,收到数据: { num: 42 }
收到工作线程的返回: { 计算结果: 500000050000000, 原始数据: 42 }
工作线程退出了,退出码: 0
代码解释:
isMainThread:判断当前是不是主线程,主线程才创建 WorkerworkerData:主线程传给工作线程的数据parentPort.postMessage():工作线程把结果发回主线程
8.3.4 线程间数据传递:MessageChannel
有时候你想让两个工作线程直接对话,不经过主线程。可以用 MessageChannel。
// worker-channel.js
const { Worker, MessageChannel } = require('worker_threads');
const worker1 = new Worker(`
const { parentPort } = require('worker_threads');
parentPort.on('message', (data) => {
console.log('线程1收到:', data);
parentPort.postMessage('线程1处理完毕');
});
`, { eval: true });
const worker2 = new Worker(`
const { parentPort } = require('worker_threads');
parentPort.on('message', (data) => {
console.log('线程2收到:', data);
});
`, { eval: true });
// 创建一个通道,让线程1和线程2直接通信
const { port1, port2 } = new MessageChannel();
worker1.postMessage({ port: port1 }, [port1]);
worker2.postMessage({ port: port2, msg: '你好线程2' }, [port2]);
8.3.5 SharedArrayBuffer:共享内存(高效但危险)
MessageChannel 每传一次数据都要「复制」,大数据量时很慢。SharedArrayBuffer 允许多个线程共享同一块内存,直接读写,不用复制。
// worker-shared.js
const { Worker, isMainThread, SharedArrayBuffer } = require('worker_threads');
if (isMainThread) {
// 创建一个共享数组,5个元素,初始值都是0
const sharedBuffer = new SharedArrayBuffer(5 * Int32Array.BYTES_PER_ELEMENT);
const sharedArray = new Int32Array(sharedBuffer);
console.log('主线程初始值:', Array.from(sharedArray));
const worker = new Worker(__filename, {
workerData: sharedBuffer
});
worker.on('message', () => {
console.log('主线程读取最终值:', Array.from(sharedArray));
});
} else {
const sharedArray = new Int32Array(workerData);
// 工作线程修改共享数据
for (let i = 0; i < 5; i++) {
sharedArray[i] = (i + 1) * 100;
}
parentPort.postMessage('工作线程已修改共享数据');
}

注意:SharedArrayBuffer 很快,但有「竞态条件」风险——两个线程同时改同一个数据,谁先改就不一定了。新手慎用!
8.3.6 workerData 和 parentPort 的关系
| API | 方向 | 用途 |
|---|---|---|
workerData |
主线程 → 工作线程 | 只传一次的开局数据 |
parentPort.postMessage() |
工作线程 → 主线程 | 随时发送结果 |
worker.on('message') |
工作线程 → 主线程 | 主线程接收消息 |
🔥 实战 35 分钟:3 个递进的小项目
📦 项目 1(5 分钟):累加计算器
需求:计算 1 到 1 亿的累加和,用工作线程来做,不阻塞主线程。
// project-1-calculator.js
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
if (isMainThread) {
const start = Date.now();
console.log('🎬 主线程开始派发任务...');
const worker = new Worker(__filename, {
workerData: { from: 1, to: 100000000 }
});
worker.on('message', (result) => {
console.log('✅ 计算完成!结果是:', result.sum);
console.log('⏱️ 耗时:', Date.now() - start, '毫秒');
process.exit(0);
});
console.log('🔄 主线程可以继续干别的事(比如接受用户输入)...');
} else {
const { from, to } = workerData;
let sum = 0n; // 用 BigInt,避免大数溢出
for (let i = from; i <= to; i++) {
sum += BigInt(i);
}
parentPort.postMessage({ sum: sum.toString() });
}
预期输出:
🎬 主线程开始派发任务...
🔄 主线程可以继续干别的事(比如接受用户输入)...
✅ 计算完成!结果是: 5000000050000000
⏱️ 耗时: 892 毫秒
一句话解释:主线程创建工作线程后不等待,继续做自己的事,等工作线程算完再回来收结果。
📦 项目 2(15 分钟):批量文件重命名工具
需求:有一个文件夹里有几十张图片,名字杂乱。要按「序号_原名.jpg」的格式批量重命名。
为了模拟,我们先创建测试文件:
// project-2-rename.js
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
const fs = require('fs');
const path = require('path');
// 先创建测试文件(只有主线程才执行)
if (isMainThread) {
const testDir = path.join(__dirname, 'test_images');
// 创建测试目录和文件
if (!fs.existsSync(testDir)) {
fs.mkdirSync(testDir);
for (let i = 0; i < 20; i++) {
const oldName = `照片${i + 1}.jpg`;
fs.writeFileSync(path.join(testDir, oldName), `模拟图片内容 ${i}`);
}
console.log('✅ 测试文件已创建');
}
console.log('📂 待处理文件:', fs.readdirSync(testDir));
// 读取所有文件
const files = fs.readdirSync(testDir)
.filter(f => f.endsWith('.jpg'))
.map(f => ({
oldPath: path.join(testDir, f),
oldName: f
}));
console.log(`\n🎬 找到 ${files.length} 个文件,准备派发给工作线程...\n`);
// 每个工作线程处理一批文件
const threadCount = 3;
const chunkSize = Math.ceil(files.length / threadCount);
let completedThreads = 0;
for (let i = 0; i < threadCount; i++) {
const chunk = files.slice(i * chunkSize, (i + 1) * chunkSize);
if (chunk.length === 0) continue;
const worker = new Worker(__filename, {
workerData: { files: chunk, threadIndex: i + 1 }
});
worker.on('message', (result) => {
console.log(`\n🧵 线程${result.threadIndex} 完成:`);
result.renamed.forEach(f => console.log(` ${f.oldName} → ${f.newName}`));
completedThreads++;
if (completedThreads === threadCount) {
console.log('\n✨ 全部重命名完成!');
console.log('📂 最终文件:', fs.readdirSync(testDir));
}
});
}
} else {
// 工作线程负责具体的重命名操作
const { files, threadIndex } = workerData;
const renamed = [];
files.forEach((file, idx) => {
const newName = `${String(threadIndex).padStart(2, '0')}_${String(idx + 1).padStart(3, '0')}_${file.oldName}`;
const newPath = path.join(path.dirname(file.oldPath), newName);
try {
fs.renameSync(file.oldPath, newPath);
renamed.push({ oldName: file.oldName, newName });
} catch (err) {
console.error(`线程${threadIndex} 重命名失败:`, file.oldName, err.message);
}
});
parentPort.postMessage({ threadIndex, renamed });
}
预期输出(部分):
✅ 测试文件已创建
📂 待处理文件: [ '照片1.jpg', '照片2.jpg', ..., '照片20.jpg' ]
🎬 找到 20 个文件,准备派发给工作线程...
🧵 线程1 完成:
照片1.jpg → 01_001_照片1.jpg
照片2.jpg → 01_002_照片2.jpg
照片3.jpg → 01_003_照片3.jpg
照片4.jpg → 01_004_照片4.jpg
照片5.jpg → 01_005_照片5.jpg
照片6.jpg → 01_006_照片6.jpg
照片7.jpg → 01_007_照片7.jpg
✨ 全部重命名完成!
一句话解释:把大量文件分成多份,每个工作线程处理一份,实现「并行重命名」。
📦 项目 3(15 分钟):CPU 密集型数据处理小工具
需求:读取一份 JSON 格式的销售数据(模拟1000条),计算每个月的销售额、最畅销的商品,并用工作线程做排序和统计。
// project-3-dashboard.js
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
// 模拟的销售数据
const generateSalesData = () => {
const products = ['iPhone', 'iPad', 'MacBook', 'AirPods', 'Apple Watch'];
const data = [];
for (let i = 0; i < 1000; i++) {
data.push({
id: i + 1,
product: products[Math.floor(Math.random() * products.length)],
month: Math.floor(Math.random() * 12) + 1,
quantity: Math.floor(Math.random() * 50) + 1,
price: Math.floor(Math.random() * 5000) + 500
});
}
return data;
};
if (isMainThread) {
console.log('📊 销售数据分析工具\n');
console.log('=' .repeat(40));
const salesData = generateSalesData();
console.log(`📦 共 ${salesData.length} 条销售记录\n`);
// 启动两个工作线程分别做不同分析
let completed = 0;
const results = {};
// 线程1:按月份统计销售额
const worker1 = new Worker(__filename, {
workerData: { task: 'monthlySales', data: salesData }
});
worker1.on('message', (result) => {
console.log('📅 月度销售统计:');
Object.entries(result).forEach(([month, total]) => {
console.log(` ${month}月: ¥${total.toLocaleString()}`);
});
completed++;
checkComplete();
});
// 线程2:找出最畅销商品
const worker2 = new Worker(__filename, {
workerData: { task: 'topProducts', data: salesData }
});
worker2.on('message', (result) => {
console.log('\n🏆 商品销量排行:');
result.forEach((item, idx) => {
console.log(` ${idx + 1}. ${item.name}: ${item.quantity} 件`);
});
completed++;
checkComplete();
});
function checkComplete() {
if (completed === 2) {
console.log('\n' + '='.repeat(40));
console.log('✨ 分析完成!');
}
}
} else {
const { task, data } = workerData;
let result;
if (task === 'monthlySales') {
// 按月份汇总销售额
const monthlyTotals = {};
for (let i = 1; i <= 12; i++) monthlyTotals[i] = 0;
data.forEach(record => {
monthlyTotals[record.month] += record.quantity * record.price;
});
result = monthlyTotals;
} else if (task === 'topProducts') {
// 按商品汇总销量并排序
const productStats = {};
data.forEach(record => {
if (!productStats[record.product]) {
productStats[record.product] = 0;
}
productStats[record.product] += record.quantity;
});
// 排序并取前5名
result = Object.entries(productStats)
.map(([name, quantity]) => ({ name, quantity }))
.sort((a, b) => b.quantity - a.quantity)
.slice(0, 5);
}
parentPort.postMessage(result);
}
预期输出:
📊 销售数据分析工具
========================================
📦 共 1000 条销售记录
📅 月度销售统计:
1月: ¥1,234,567
2月: ¥1,456,789
3月: ¥1,123,456
...(省略其他月份)
🏆 商品销量排行:
1. iPhone: 8,432 件
2. AirPods: 7,891 件
3. Apple Watch: 6,543 件
4. iPad: 5,678 件
5. MacBook: 4,321 件
========================================
✨ 分析完成!
一句话解释:主线程同时派发两个任务给不同工作线程,一个算月度统计,一个算商品排行,两个线程「各干各的」,最后主线程汇总结果。

💪 进阶 20 分钟:常见坑 + 性能小贴士
❌ 坑1:以为 worker_threads 会自动共享变量
// ❌ 错误写法
let counter = 0;
const worker = new Worker(__filename);
worker.on('message', () => {
console.log(counter); // 永远是 0!
counter++; // 主线程的 counter 和工作线程的 counter 是两回事
});
// ✅ 正确写法
const { Worker } = require('worker_threads');
const worker = new Worker(`
const { parentPort } = require('worker_threads');
let counter = 0;
parentPort.on('message', () => {
counter++;
parentPort.postMessage(counter);
});
`, { eval: true });
worker.postMessage('inc');
解释:每个 Worker 有自己的 V8 实例,变量不共享。要共享必须用 SharedArrayBuffer,而且要自己处理同步。
❌ 坑2:传递大数据时不用 SharedArrayBuffer
// ❌ 错误写法:传10万条数据,每传一次都复制一遍,内存爆炸
const hugeData = generateArray(100000);
worker.postMessage(hugeData); // 这会复制整份数据!
// ✅ 正确写法:用 SharedArrayBuffer 共享内存
const sharedBuffer = new SharedArrayBuffer(size);
worker.postMessage({ buffer: sharedBuffer }, [sharedBuffer]);
❌ 坑3:在循环里不断创建 Worker
// ❌ 错误写法:每处理一条数据就创建一个线程,创建/销毁开销巨大
for (const item of items) {
const worker = new Worker(__filename);
worker.postMessage(item);
}
// ✅ 正确写法:创建固定数量的 Worker,用消息队列分发任务
const workerPool = Array.from({ length: 4 }, () => new Worker(__filename));
let currentIndex = 0;
for (const item of items) {
workerPool[currentIndex % workerPool.length].postMessage(item);
currentIndex++;
}
❌ 坑4:不监听 error 事件导致静默失败
// ❌ 错误写法:工作线程报错,主线程完全不知道
const worker = new Worker(__filename);
worker.postMessage('start');
// 如果工作线程里 throw new Error,主线程啥也收不到
// ✅ 正确写法:一定要监听 error 和 exit 事件
worker.on('error', (err) => {
console.error('工作线程崩溃:', err);
});
worker.on('exit', (code) => {
if (code !== 0) {
console.error(`工作线程异常退出,退出码: ${code}`);
}
});
❌ 坑5:误用 Worker Threads 处理 I/O 密集型任务
// ❌ 错误写法:下载100个文件,用Worker Threads?没必要,还占线程
const workers = urls.map(url => {
const worker = new Worker(`fetch(url)`); // 大材小用
});
// ✅ 正确写法:I/O任务用 async/await 或 cluster
const fetchAll = async (urls) => {
return Promise.all(urls.map(url => fetch(url)));
};
🚀 性能小贴士:Worker 池化
创建和销毁 Worker 有开销。如果任务很多,用完不销毁,放到池子里复用。
class WorkerPool {
constructor(filename, size = 4) {
this.pool = [];
this.tasks = [];
for (let i = 0; i < size; i++) {
this.pool.push(new Worker(filename));
}
}
run(data) {
return new Promise((resolve) => {
// 找空闲的 Worker
const worker = this.pool.find(w => !w.busy);
if (worker) {
this._assign(worker, data, resolve);
} else {
// 没空闲的,排队
this.tasks.push({ data, resolve });
}
});
}
_assign(worker, data, resolve) {
worker.busy = true;
worker.once('message', (result) => {
worker.busy = false;
resolve(result);
// 处理排队的任务
if (this.tasks.length > 0) {
const next = this.tasks.shift();
this._assign(worker, next.data, next.resolve);
}
});
worker.postMessage(data);
}
}
🔧 调试技巧:用 console.log 在 Worker 里输出
工作线程里的 console.log 会自动把输出传到主线程:
// worker-debug.js
const { Worker, isMainThread } = require('worker_threads');
if (isMainThread) {
const worker = new Worker(__filename);
// 工作线程的 console.log 会显示在这里
worker.stdout.on('data', (data) => {
console.log('[Worker stdout]:', data.toString());
});
worker.stderr.on('data', (data) => {
console.error('[Worker stderr]:', data.toString());
});
worker.postMessage('start');
} else {
// 工作线程里正常用 console.log
console.log('工作线程启动');
console.log('做一些计算...');
console.log('计算完成');
parentPort.postMessage('done');
}
✏️ 练习题 + 作业题
练习题(10 分钟)
练习 1(2 分钟):改改数字
- 输入:在项目 1 中,把计算范围从 1~100000000 改成 1~1000
- 预期输出:显示 500500
- 提示:只改 workerData 里的 to 值
练习 2(2 分钟):加个判断
- 输入:在项目 1 中,如果累加结果是偶数,输出 结果是偶数,否则输出 结果是奇数
- 预期输出:结果是偶数(因为 5000000050000000 是偶数)
- 提示:在工作线程里用 sum % 2n === 0n 判断
练习 3(2 分钟):新数据源
- 输入:用项目 2 的方法,把测试目录换成 test_docs,文件换成 .txt,重命名格式改成 doc_序号
- 预期输出:文件被正确重命名
- 提示:把 .jpg 换成 .txt,格式字符串改一下
练习 4(2 分钟):串起来
- 输入:用项目 3 的结构,再加一个线程,统计「每个季度的总销售额」
- 预期输出:显示 Q1、Q2、Q3、Q4 的销售额
- 提示:1-3月=Q1, 4-6月=Q2... 在工作线程里把月份转成季度
练习 5(2 分钟):找错
- 输入:以下代码运行后报错 worker.on is not a function,找出错误并修复
const { Worker } = require('worker_threads');
const worker = new Worker('console.log("hi")');
worker.on('message', console.log); // 这里报错
- 预期输出:能正常运行,输出
hi - 提示:检查 Worker 构造函数的第二个参数怎么写
作业题(30 分钟 - 2 小时)
作业:做一个「 Worker Threads 图片批量处理器」
-
需求描述:做一个命令行工具,能批量处理图片(模拟:读取文件列表,计算每个文件的「哈希值」用于去重)
-
功能点:
1. 读取指定目录下的所有文件
2. 用工作线程并行计算每个文件的哈希值(用crypto模块的hash)
3. 找出重复文件(哈希值相同的)
4. 输出报告:共多少文件、发现多少重复、每组重复文件列表 -
加分项:
1. 用 Worker 池而不是每次新建 Worker
2. 显示进度条(处理到第 X 个/共 Y 个)
3. 支持按扩展名过滤(如只处理.jpg) -
验收标准:
- 能跑起来(
node main.js ./test_folder) - 输出正确的哈希值和重复报告
-
代码有适当注释
-
提交方式:评论区贴代码或 GitHub 链接
📚 总结 + 资源
本文学了 3 件事:
- Worker Threads 是啥:让 Node.js 在单进程内实现多线程,把 CPU 多核用起来
- 怎么用:用
new Worker(__filename)创建,workerData传数据,parentPort.postMessage收结果 - 什么时候用:CPU 密集型任务(计算、加密、排序)用 Worker,I/O 密集型任务用 async/await
延伸学习资源:
- Node.js 官方文档 - Worker Threads
- 深入理解 Node.js 事件循环和 Worker Threads
- 《Node.js 设计模式》 第 6 章 - Concurrency Models
互动钩子:你在工作中遇到过「CPU 跑满了但 Node.js 还是很慢」的情况吗?当时是怎么解决的?评论区聊聊,老粉优先回复!
下一章我们要解决一个更实际的问题:怎么知道我的 Worker Threads 到底有没有让代码变快? 我们会学习 clinic.js 和 v8-profiler,用工具来「给代码照 CT」,看看瓶颈到底在哪。

评论(0)