第8章 8.1 child_process 子进程
🎯 开场:Node.js 也会「分身术」?
上一章我们搞定了大文件分片上传下载,手指一动,文件就飞到了服务器。但你有没有想过:当一个任务太重时,Node.js 怎么消化它?
举个例子:你写了一个 Node.js 网页服务器,但某个请求要做复杂的图片处理(比如给 100 张照片加水印),这个操作可能耗时 30 秒。如果你让它在主线程里跑,用户访问你的网站就会卡住 30 秒——网页转圈圈,用户体验极差。
这时候该怎么办?
你跟老板说「再加一台服务器」?行,但贵。你说「优化代码」?图片处理就那么慢,优化空间有限。
Node.js 的答案是:分身术——child_process 子进程。
简单说,就是 Node.js 可以「生出」另一个进程来帮你干活。就像餐厅老板发现厨房忙不过来时,叫来了钟点工——主进程是老板,子进程是钟点工。老板继续接待客人(处理其他请求),钟点工去后厨慢慢处理图片(复杂任务)。
学完这一章,你就能:
- 让 Node.js 同时执行多个任务,互不阻塞
- 调用系统命令(比如调用
ffmpeg压缩视频) - 父子进程之间互相通信,传递数据
🧱 基础:子进程三剑客
什么是子进程?
类比时间:想象你是早餐店老板(主进程),某天要同时做包子、煮粥、炸油条。三个任务同时进行——但你一个人忙不过来。于是你雇了三个钟点工,每人负责一个任务。你们都在同一个店里,但各干各的。这就是多进程。
Node.js 的 child_process 模块就是干这个的:在主进程之外,创建子进程去执行任务。
为什么需要子进程?
- 不阻塞主线程:Node.js 单线程跑,如果一个任务耗时很长,整个程序就卡住了
- 利用多核 CPU:现在的电脑都是多核,但 Node.js 默认只用一个核,太浪费
- 调用系统工具:有些活让系统命令干更方便(比如用
ffmpeg处理视频)
子进程三种创建方式
Node.js 给了我们三个「生孩子」的方式:spawn、exec、fork。我来一个个讲。
方式一:spawn —— spawn 是「孵化」,启动一个命令
spawn 是最底层的创建方式。它会启动一个指定的命令,然后流式返回数据。适合处理大数据量、长时间运行的任务。
import subprocess
# spawn 接收一个列表:命令 + 参数
# 相当于在终端执行:ls -la /tmp
process = subprocess.Popen(
['ls', '-la', '/tmp'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# 读取输出
stdout, stderr = process.communicate()
print("标准输出:")
print(stdout.decode('utf-8'))
if stderr:
print("错误输出:")
print(stderr.decode('utf-8'))
这行在干嘛:subprocess.Popen 启动了一个新进程去执行 ls -la /tmp,相当于你在终端敲这个命令。
运行结果:
标准输出:
total 8
drwxr-xr-x 5 root wheel 160 Jun 26 10:30 /tmp
...
错误输出:
方式二:exec —— exec 是「执行」,执行一条完整命令
exec 更直接,你直接丢给它一整条命令字符串(像在终端敲的那样),它帮你执行并一次性返回所有输出。适合短小精悍的任务。
import subprocess
# exec 执行一条完整命令字符串
# 相当于在终端执行:echo "Hello from child"
result = subprocess.run(
'echo "Hello from child process"',
shell=True, # 注意:shell=True 才能执行字符串命令
capture_output=True,
text=True
)
print("命令退出码:", result.returncode)
print("标准输出:", result.stdout)
print("错误输出:", result.stderr)
这行在干嘛:subprocess.run 是 Python 3.7+ 引入的简化写法,比上面的 Popen 少写几行代码。shell=True 让系统用 shell 来解释命令。
运行结果:
命令退出码: 0
标准输出: Hello from child process
错误输出:
方式三:fork —— fork 是「分叉」,创建能通信的子进程
这个概念来自 Unix 的 fork() 系统调用。在 Node.js 语境里,fork 专门用来创建可以双向通信的子进程——父子进程之间可以发消息、传数据。
Python 里实现类似功能要用 multiprocessing 模块的 Process:
from multiprocessing import Process, Queue
def worker(shared_queue):
"""子进程要执行的函数"""
# 收到主进程的消息
msg = shared_queue.get()
print(f"[子进程] 收到了:{msg}")
# 往队列里放回数据
shared_queue.put(f"处理完毕:{msg.upper()}")
print("[子进程] 发送了回复")
if __name__ == "__main__":
# 创建一个队列,用于父子进程通信
queue = Queue()
# 往队列里放初始数据
queue.put("Hello Main")
# 创建子进程,target 是子进程要跑的函数
p = Process(target=worker, args=(queue,))
p.start() # 启动子进程
# 主进程等子进程处理完,拿结果
p.join() # 等待子进程结束
result = queue.get()
print(f"[主进程] 拿到最终结果:{result}")
这行在干嘛:通过 Queue,主进程和子进程可以安全地交换数据——主进程放数据,子进程取出来处理,再放回去。
运行结果:
[子进程] 收到了:Hello Main
[子进程] 发送了回复
[主进程] 拿到最终结果:处理完毕:HELLO MAIN

stdio:进程的「嘴巴和耳朵」
我们一直在说「子进程」,但子进程怎么和主进程说话呢?这就涉及到 stdio(标准输入/输出/错误)。
想象子进程是一个工厂:
- stdin(标准输入):工厂的「嘴巴」,用来接收数据
- stdout(标准输出):工厂的「正门」,用来输出正常结果
- stderr(标准错误):工厂的「后门」,用来输出错误信息
主进程通过「门」和子进程交流:
import subprocess
# 创建一个能接收 stdin 输入的子进程
# 相当于:cat(读取输入并打印)
process = subprocess.Popen(
'cat', # cat 命令:读取 stdin,原样输出到 stdout
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# 向子进程的 stdin 写入数据
input_data = "这是要给子进程处理的数据\n第二行\n"
stdout, stderr = process.communicate(input=input_data.encode('utf-8'))
print("子进程输出:")
print(stdout.decode('utf-8'))
这行在干嘛:cat 命令会读取你给它的任何输入,然后原封不动地输出。我们通过 stdin 喂数据给它,它通过 stdout 吐出来。
运行结果:
子进程输出:
这是要给子进程处理的数据
第二行
🔥 实战:三个项目练练手
项目一:5 分钟 —— 用子进程执行系统命令
场景:你是一个运维,要经常检查服务器状态。每次都要敲 df -h(磁盘使用)、top(进程状态)等命令。好麻烦,写个脚本一键检查!
import subprocess
from datetime import datetime
def check_server_status():
"""一键检查服务器状态"""
print("=" * 40)
print(f"📊 服务器状态检查 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 40)
# 1. 检查磁盘使用
print("\n💾 磁盘使用情况:")
result = subprocess.run('df -h', shell=True, capture_output=True, text=True)
# 只打印第一行(标题)和最后一行(总计)
lines = result.stdout.strip().split('\n')
print('\n'.join([lines[0], lines[-1]])) # 标题 + 总计
# 2. 检查内存使用
print("\n🧠 内存使用情况:")
result = subprocess.run('free -h', shell=True, capture_output=True, text=True)
print(result.stdout)
# 3. 检查当前用户
print("👤 当前登录用户:")
result = subprocess.run('whoami', shell=True, capture_output=True, text=True)
print(result.stdout.strip())
print("\n✅ 检查完成!")
if __name__ == "__main__":
check_server_status()
预期输出:
========================================
📊 服务器状态检查 - 2024-01-15 14:30:00
========================================
💾 磁盘使用情况:
Filesystem Size Used Avail Use% Mounted on
/dev/disk1s1 234G 128G 106G 55% /
🧠 内存使用情况:
total used free shar buff/cache available
Mem: 16Gi 8.5Gi 2.1Gi 380Mi 5.4Gi 7.2Gi
Swap: 2Gi 256Mi 1.7Gi
👤 当前登录用户:
john
✅ 检查完成!
一句话解释:用 subprocess.run 执行系统命令,比你手动敲快多了,还不会敲错。
项目二:15 分钟 —— 并行处理多个文件
场景:你是一个数据分析师,有 10 个 CSV 文件,每个都要做同样的处理(计算总价、过滤无效数据)。一个一个处理太慢,要并行处理!
import subprocess
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
# 模拟一个 CSV 处理任务
def process_csv_file(file_path):
"""
子进程要执行的函数:模拟处理一个 CSV 文件
实际场景中这里会是你真正的数据处理逻辑
"""
# 用子进程执行 Python 脚本来处理文件
result = subprocess.run(
['python3', '-c', f'''
import random
import time
# 模拟处理 {file_path}
time.sleep(1) # 模拟耗时操作
# 随机生成处理结果
total = sum(random.randint(100, 999) for _ in range(5))
print(f"文件 {file_path} 处理完成,总额: {{total}}")
'''],
capture_output=True,
text=True
)
return result.stdout.strip()
def main():
# 要处理的 10 个文件
files = [f"data_{i}.csv" for i in range(1, 11)]
print(f"📁 共有 {len(files)} 个文件待处理")
print("-" * 40)
start_time = time.time()
# 使用进程池并行处理
with ProcessPoolExecutor(max_workers=4) as executor:
# 提交所有任务
future_to_file = {
executor.submit(process_csv_file, f): f
for f in files
}
# 收集结果
completed = 0
for future in as_completed(future_to_file):
file_name = future_to_file[future]
completed += 1
try:
result = future.result()
print(f"[{completed}/{len(files)}] {result}")
except Exception as e:
print(f"[{completed}/{len(files)}] {file_name} 处理失败: {e}")
elapsed = time.time() - start_time
print("-" * 40)
print(f"✅ 全部处理完成!耗时: {elapsed:.2f} 秒")
print(f"📈 平均每个文件: {elapsed/len(files):.2f} 秒")
if __name__ == "__main__":
main()
预期输出:
📁 共有 10 个文件待处理
----------------------------------------
[1/10] 文件 data_1.csv 处理完成,总额: 3842
[2/10] 文件 data_2.csv 处理完成,总额: 4211
[3/10] 文件 data_3.csv 处理完成,总额: 3105
[4/10] 文件 data_4.csv 处理完成,总额: 5678
[5/10] 文件 data_5.csv 处理完成,总额: 2987
[6/10] 文件 data_6.csv 处理完成,总额: 4123
[7/10] 文件 data_7.csv 处理完成,总额: 3891
[8/10] 文件 data_8.csv 处理完成,总额: 4521
[9/10] 文件 data_9.csv 处理完成,总额: 3712
[10/10] 文件 data_10.csv 处理完成,总额: 4198
----------------------------------------
✅ 全部处理完成!耗时: 3.08 秒
📈 平均每个文件: 0.31 秒
注意:如果有 10 个文件,每个处理 1 秒,串行需要 10 秒。用了 4 个并行进程,理论上 3 秒左右(10/4=2.5,加上开销 3 秒多)。
一句话解释:ProcessPoolExecutor 就像包工头,你告诉它有 10 个任务、4 个工人,它自动分配、并行执行、收集结果。
项目三:15 分钟 —— 父子进程实时通信的爬虫小工具
场景:你想写一个爬虫,从网页抓数据。但 Node.js 主进程要和子进程实时通信——主进程告诉子进程要爬哪个 URL,子进程抓完告诉主进程结果。
这里用 Python 的 multiprocessing + Queue 实现类似功能:
from multiprocessing import Process, Queue
import time
import random
def web_crawler(worker_id, task_queue, result_queue):
"""
子进程:模拟爬虫工作
从任务队列拿 URL,爬取后把结果放到结果队列
"""
print(f"[爬虫-{worker_id}] 启动,等待任务...")
while True:
# 从任务队列取任务(阻塞等待)
task = task_queue.get()
# 收到 'STOP' 就退出
if task == 'STOP':
print(f"[爬虫-{worker_id}] 收到停止信号,退出")
break
url = task
print(f"[爬虫-{worker_id}] 正在爬取: {url}")
# 模拟爬取耗时
time.sleep(random.uniform(0.5, 1.5))
# 模拟爬取结果
result = {
'url': url,
'title': f"页面标题 - {url}",
'status': 'success',
'crawled_at': time.strftime('%H:%M:%S')
}
# 把结果放入结果队列
result_queue.put(result)
print(f"[爬虫-{worker_id}] {url} 爬取完成!")
def main():
# 要爬取的 URL 列表
urls = [
"https://example.com/article/1",
"https://example.com/article/2",
"https://example.com/article/3",
"https://example.com/article/4",
"https://example.com/article/5",
"https://example.com/article/6",
]
# 创建队列
task_queue = Queue() # 主进程 -> 子进程:放任务
result_queue = Queue() # 子进程 -> 主进程:放结果
# 启动 2 个爬虫进程
num_workers = 2
workers = []
for i in range(num_workers):
p = Process(target=web_crawler, args=(i+1, task_queue, result_queue))
p.start()
workers.append(p)
print(f"启动了爬虫进程-{i+1}")
print(f"\n📋 共 {len(urls)} 个 URL待爬取,{num_workers} 个爬虫并行\n")
# 向任务队列放入所有 URL
for url in urls:
task_queue.put(url)
# 停止信号(等所有 URL 爬完再发)
for _ in range(num_workers):
task_queue.put('STOP')
# 收集结果
results = []
for _ in range(len(urls)):
result = result_queue.get() # 阻塞等待结果
results.append(result)
print(f"[主进程] 收到结果: {result['url']} - {result['title']}")
# 等待所有爬虫进程结束
for p in workers:
p.join()
print("\n" + "=" * 50)
print(f"✅ 爬取完成!共 {len(results)} 个页面")
print("=" * 50)
# 打印汇总
print("\n📊 爬取汇总:")
for r in results:
print(f" {r['url']} | {r['status']} | {r['crawled_at']}")
if __name__ == "__main__":
main()
预期输出:
启动了爬虫进程-1
启动了爬虫进程-2
[爬虫-1] 启动,等待任务...
[爬虫-2] 启动,等待任务...
📋 共 6 个 URL待爬取,2 个爬虫并行
[爬虫-2] 正在爬取: https://example.com/article/2
[爬虫-1] 正在爬取: https://example.com/article/1
[爬虫-1] https://example.com/article/1 爬取完成!
[主进程] 收到结果: https://example.com/article/1 - 页面标题 - https://example.com/article/1
[爬虫-1] 正在爬取: https://example.com/article/3
[爬虫-2] https://example.com/article/2 爬取完成!
[主进程] 收到结果: https://example.com/article/2 - 页面标题 - https://example.com/article/2
[爬虫-2] 正在爬取: https://example.com/article/4
[爬虫-1] https://example.com/article/3 爬取完成!
[主进程] 收到结果: https://example.com/article/3 - 页面标题 - https://example.com/article/3
[爬虫-2] https://example.com/article/4 爬取完成!
[主进程] 收到结果: https://example.com/article/4 - 页面标题 - https://example.com/article/4
[爬虫-1] 正在爬取: https://example.com/article/5
[爬虫-2] 正在爬取: https://example.com/article/6
[爬虫-1] https://example.com/article/5 爬取完成!
[主进程] 收到结果: https://example.com/article/5 - 页面标题 - https://example.com/article/5
[爬虫-2] https://example.com/article/6 爬取完成!
[主进程] 收到结果: https://example.com/article/6 - 页面标题 - https://example.com/article/6
[爬虫-1] 收到停止信号,退出
[爬虫-2] 收到停止信号,退出
==================================================
✅ 爬取完成!共 6 个页面
==================================================
📊 爬取汇总:
https://example.com/article/1 | success | 14:30:01
https://example.com/article/2 | success | 14:30:02
https://example.com/article/3 | success | 14:30:02
https://example.com/article/4 | success | 14:30:03
https://example.com/article/5 | success | 14:30:03
https://example.com/article/6 | success | 14:30:04
一句话解释:两个爬虫进程「抢」任务队列里的 URL,谁有空谁拿,实现了真正的并行爬取。

💪 进阶:常见坑与调试技巧
坑一:shell=True 的安全性问题
❌ 错误示例:直接用用户输入执行命令
import subprocess
user_input = "hello; rm -rf /" # 恶意输入
subprocess.run(f'echo {user_input}', shell=True) # 危险!
✅ 正确示例:用列表形式,不经过 shell
import subprocess
# 用列表形式,参数被严格解析,不会被注入
subprocess.run(['echo', 'hello; rm -rf /']) # 安全!; 被当成普通字符串
注意:shell=True 存在命令注入风险,如果必须用 shell=True,一定要先校验输入。
坑二:子进程卡死(管道缓冲区满)
❌ 错误示例:子进程输出大量数据
import subprocess
# 如果 grep 输出很大,可能会卡死
process = subprocess.Popen(
['grep', '-r', 'pattern', '/'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
stdout, stderr = process.communicate() # 可能卡死!
✅ 正确示例:用文件或流式处理
import subprocess
# 把输出重定向到文件,避免缓冲区问题
with open('output.txt', 'w') as f:
subprocess.run(
['grep', '-r', 'pattern', '/'],
stdout=f,
stderr=subprocess.PIPE
)
坑三:忘记 join() 等待子进程结束
❌ 错误示例:子进程还没跑完,主进程就退出了
from multiprocessing import Process
def worker():
print("开始处理...")
time.sleep(2)
print("处理完成")
p = Process(target=worker)
p.start()
print("主进程退出了") # 子进程还没执行完!
# 运行结果:只打印"主进程退出了",子进程的输出可能丢失
✅ 正确示例:记得 join() 等待
from multiprocessing import Process
def worker():
print("开始处理...")
time.sleep(2)
print("处理完成")
p = Process(target=worker)
p.start()
p.join() # 等待子进程结束
print("主进程退出了") # 子进程肯定执行完了
# 运行结果:正常打印所有输出
坑四:Windows 下 multiprocessing 的坑
❌ 错误示例:直接创建子进程(Windows 不支持 fork)
from multiprocessing import Process
def worker():
print("hello")
# 在 Windows 上,这段代码在 if __name__ == "__main__": 外面会报错
p = Process(target=worker)
p.start()
✅ 正确示例:Windows 必须包在 if __name__ == "__main__": 里
from multiprocessing import Process
def worker():
print("hello")
if __name__ == "__main__": # Windows 必须加这行!
p = Process(target=worker)
p.start()
p.join()
原因:Windows 没有 fork() 系统调用,只有 spawn。if __name__ == "__main__" 用来防止子进程重复执行启动代码。
坑五:Queue 不能放不可 pickle 的对象
❌ 错误示例:尝试通过 Queue 传递函数
from multiprocessing import Process, Queue
def worker(func): # 函数不能被 pickle
func()
q = Queue()
q.put(lambda: print("hello")) # 报错!lambda 不能被序列化
✅ 正确示例:只传数据,不传代码
from multiprocessing import Process, Queue
def worker(q):
msg = q.get()
print(f"收到:{msg}")
q = Queue()
q.put("hello") # 传数据,没问题
调试技巧:用 print 打日志
子进程调试最简单的方法就是 print——每个 print 都会带上进程前缀([MainProcess] 或 [SpawnProcess-1]):
from multiprocessing import Process, Queue
import time
def worker(q):
print(f"[子进程-{id(Process().pid)}] 开始工作") # 打印进程 ID
while True:
task = q.get()
print(f"[子进程] 收到任务: {task}")
if task == 'STOP':
break
time.sleep(0.5)
print(f"[子进程] 完成: {task}")
if __name__ == "__main__":
q = Queue()
p = Process(target=worker, args=(q,))
p.start()
for i in range(3):
q.put(f"任务-{i}")
q.put('STOP')
p.join()
print("[主进程] 全部完成")
✏️ 练习题
练习一(2 分钟):改个命令
# 题目:把下面的命令从 `ls` 改成 `pwd`,让它打印当前目录
# 输入:直接运行
# 预期输出:当前工作目录路径
import subprocess
result = subprocess.run(['ls'], capture_output=True, text=True)
print(result.stdout)
练习二(2 分钟):加个判断
# 题目:在执行命令后,加一个 if 判断,如果退出码不是 0 就打印"命令失败"
# 输入:执行 `ls /nonexistent_folder`(不存在的文件夹)
# 预期输出:打印"命令失败"
import subprocess
result = subprocess.run(['ls', '/nonexistent_folder'], capture_output=True, text=True)
# 你的代码:加 if 判断在这里
练习三(3 分钟):换个数据源
# 题目:把项目二的并行处理改成处理 JSON 文件(而不是 CSV)
# 输入:5 个 JSON 文件
# 预期输出:每个文件处理完成的信息
from multiprocessing import Process, Queue
def process_json(file_path):
# 你的代码:模拟处理一个 JSON 文件
# 提示:把原来 process_csv_file 里的逻辑复制过来,改个名字
pass
# 你的代码:启动进程池处理 ['data_1.json', 'data_2.json', ...]
练习四(3 分钟):串起两个项目
# 题目:把项目一的"系统检查"和项目二的"并行处理"结合起来
# 需求:先检查服务器状态(内存/CPU),然后根据状态决定启动多少个并行进程
# 提示:内存 > 80% 时用 2 个进程,< 80% 时用 4 个进程
import subprocess
def get_memory_usage():
# 你的代码:解析 free 命令输出,返回内存使用百分比
pass
def decide_worker_count():
# 你的代码:根据内存使用率决定 worker 数量
pass
练习五(5 分钟):读图分析报错
# 题目:运行以下代码时,控制台报错了。分析原因并修复
from multiprocessing import Process, Queue
def worker(q):
data = q.get()
print(f"收到: {data}")
q.put(f"处理完成: {data}")
if __name__ == "__main__":
q = Queue()
q.put("hello")
p = Process(target=worker, args=(q,))
p.start()
p.join()
print(q.get())
提示:这个代码在 Windows 上会出问题,想想是哪个坑导致的?
作业:做一个「智能文件备份工具」
需求描述:写一个工具,把指定文件夹里的所有文件并行压缩备份到另一个目录。
功能点:
1. 读取源文件夹里所有文件
2. 用多进程并行压缩(每个文件一个子进程)
3. 把压缩包存到目标文件夹
4. 显示备份进度条
加分项:
1. 支持排除某些文件类型(如 .tmp)
2. 备份完成后生成一份报告(备份了哪些文件、总大小、耗时)
验收标准:
- 能跑起来(python backup.py /src /dst)
- 输出显示每个文件的备份状态
- 代码有中文注释
📚 总结
本文学了 3 件事:
subprocess:让 Python 调用系统命令,适合短小任务multiprocessing+Queue:创建能通信的子进程,适合复杂并行任务- 进程池
ProcessPoolExecutor:自动管理多个进程,省去手动调度
延伸资源:
- Python 官方文档 - subprocess:最权威的参考
- Python 官方文档 - multiprocessing:进程通信全解
- 《Python 性能优化》第三章:讲了很多进程池的实际应用场景
互动钩子:
下一章我们要聊一个更「重磅」的话题——cluster 集群。想象一下,如果
child_process是一个厨师在做菜,那cluster就是开了一家连锁餐厅,多个厨师共享同一道菜谱,接待更多客人。怎么做到的?剧透一点点:它能让 Node.js 真正利用多核 CPU……想了解更多?下一章见!
你在工作中有没有遇到过「任务太重,程序卡死」的情况?当时是怎么解决的?评论区聊聊,老粉优先回复!

评论(0)