第8章 8.1 child_process 子进程

🎯 开场:Node.js 也会「分身术」?

上一章我们搞定了大文件分片上传下载,手指一动,文件就飞到了服务器。但你有没有想过:当一个任务太重时,Node.js 怎么消化它?

举个例子:你写了一个 Node.js 网页服务器,但某个请求要做复杂的图片处理(比如给 100 张照片加水印),这个操作可能耗时 30 秒。如果你让它在主线程里跑,用户访问你的网站就会卡住 30 秒——网页转圈圈,用户体验极差。

这时候该怎么办?

你跟老板说「再加一台服务器」?行,但贵。你说「优化代码」?图片处理就那么慢,优化空间有限。

Node.js 的答案是:分身术——child_process 子进程。

简单说,就是 Node.js 可以「生出」另一个进程来帮你干活。就像餐厅老板发现厨房忙不过来时,叫来了钟点工——主进程是老板,子进程是钟点工。老板继续接待客人(处理其他请求),钟点工去后厨慢慢处理图片(复杂任务)。

学完这一章,你就能:

  • 让 Node.js 同时执行多个任务,互不阻塞
  • 调用系统命令(比如调用 ffmpeg 压缩视频)
  • 父子进程之间互相通信,传递数据

🧱 基础:子进程三剑客

什么是子进程?

类比时间:想象你是早餐店老板(主进程),某天要同时做包子、煮粥、炸油条。三个任务同时进行——但你一个人忙不过来。于是你雇了三个钟点工,每人负责一个任务。你们都在同一个店里,但各干各的。这就是多进程

Node.js 的 child_process 模块就是干这个的:在主进程之外,创建子进程去执行任务

为什么需要子进程?

  1. 不阻塞主线程:Node.js 单线程跑,如果一个任务耗时很长,整个程序就卡住了
  2. 利用多核 CPU:现在的电脑都是多核,但 Node.js 默认只用一个核,太浪费
  3. 调用系统工具:有些活让系统命令干更方便(比如用 ffmpeg 处理视频)

子进程三种创建方式

Node.js 给了我们三个「生孩子」的方式:spawnexecfork。我来一个个讲。


方式一:spawn —— spawn 是「孵化」,启动一个命令

spawn 是最底层的创建方式。它会启动一个指定的命令,然后流式返回数据。适合处理大数据量、长时间运行的任务。

import subprocess

# spawn 接收一个列表:命令 + 参数
# 相当于在终端执行:ls -la /tmp
process = subprocess.Popen(
['ls', '-la', '/tmp'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)

# 读取输出
stdout, stderr = process.communicate()

print("标准输出:")
print(stdout.decode('utf-8'))

if stderr:
print("错误输出:")
print(stderr.decode('utf-8'))

这行在干嘛subprocess.Popen 启动了一个新进程去执行 ls -la /tmp,相当于你在终端敲这个命令。

运行结果:

标准输出:
total 8
drwxr-xr-x  5 root wheel   160 Jun 26 10:30 /tmp
...

错误输出:

方式二:exec —— exec 是「执行」,执行一条完整命令

exec 更直接,你直接丢给它一整条命令字符串(像在终端敲的那样),它帮你执行并一次性返回所有输出。适合短小精悍的任务。

import subprocess

# exec 执行一条完整命令字符串
# 相当于在终端执行:echo "Hello from child"
result = subprocess.run(
'echo "Hello from child process"',
shell=True,  # 注意:shell=True 才能执行字符串命令
capture_output=True,
text=True
)

print("命令退出码:", result.returncode)
print("标准输出:", result.stdout)
print("错误输出:", result.stderr)

这行在干嘛subprocess.run 是 Python 3.7+ 引入的简化写法,比上面的 Popen 少写几行代码。shell=True 让系统用 shell 来解释命令。

运行结果:

命令退出码: 0
标准输出: Hello from child process

错误输出:

方式三:fork —— fork 是「分叉」,创建能通信的子进程

这个概念来自 Unix 的 fork() 系统调用。在 Node.js 语境里,fork 专门用来创建可以双向通信的子进程——父子进程之间可以发消息、传数据。

Python 里实现类似功能要用 multiprocessing 模块的 Process

from multiprocessing import Process, Queue

def worker(shared_queue):
"""子进程要执行的函数"""
# 收到主进程的消息
msg = shared_queue.get()
print(f"[子进程] 收到了:{msg}")

# 往队列里放回数据
shared_queue.put(f"处理完毕:{msg.upper()}")
print("[子进程] 发送了回复")

if __name__ == "__main__":
# 创建一个队列,用于父子进程通信
queue = Queue()

# 往队列里放初始数据
queue.put("Hello Main")

# 创建子进程,target 是子进程要跑的函数
p = Process(target=worker, args=(queue,))
p.start()  # 启动子进程

# 主进程等子进程处理完,拿结果
p.join()  # 等待子进程结束

result = queue.get()
print(f"[主进程] 拿到最终结果:{result}")

这行在干嘛:通过 Queue,主进程和子进程可以安全地交换数据——主进程放数据,子进程取出来处理,再放回去。

运行结果:

[子进程] 收到了:Hello Main
[子进程] 发送了回复
[主进程] 拿到最终结果:处理完毕:HELLO MAIN

配图1 - 配图1


stdio:进程的「嘴巴和耳朵」

我们一直在说「子进程」,但子进程怎么和主进程说话呢?这就涉及到 stdio(标准输入/输出/错误)

想象子进程是一个工厂:
- stdin(标准输入):工厂的「嘴巴」,用来接收数据
- stdout(标准输出):工厂的「正门」,用来输出正常结果
- stderr(标准错误):工厂的「后门」,用来输出错误信息

主进程通过「门」和子进程交流:

import subprocess

# 创建一个能接收 stdin 输入的子进程
# 相当于:cat(读取输入并打印)
process = subprocess.Popen(
'cat',  # cat 命令:读取 stdin,原样输出到 stdout
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)

# 向子进程的 stdin 写入数据
input_data = "这是要给子进程处理的数据\n第二行\n"
stdout, stderr = process.communicate(input=input_data.encode('utf-8'))

print("子进程输出:")
print(stdout.decode('utf-8'))

这行在干嘛cat 命令会读取你给它的任何输入,然后原封不动地输出。我们通过 stdin 喂数据给它,它通过 stdout 吐出来。

运行结果:

子进程输出:
这是要给子进程处理的数据
第二行

🔥 实战:三个项目练练手

项目一:5 分钟 —— 用子进程执行系统命令

场景:你是一个运维,要经常检查服务器状态。每次都要敲 df -h(磁盘使用)、top(进程状态)等命令。好麻烦,写个脚本一键检查!

import subprocess
from datetime import datetime

def check_server_status():
"""一键检查服务器状态"""
print("=" * 40)
print(f"📊 服务器状态检查 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 40)

# 1. 检查磁盘使用
print("\n💾 磁盘使用情况:")
result = subprocess.run('df -h', shell=True, capture_output=True, text=True)
# 只打印第一行(标题)和最后一行(总计)
lines = result.stdout.strip().split('\n')
print('\n'.join([lines[0], lines[-1]]))  # 标题 + 总计

# 2. 检查内存使用
print("\n🧠 内存使用情况:")
result = subprocess.run('free -h', shell=True, capture_output=True, text=True)
print(result.stdout)

# 3. 检查当前用户
print("👤 当前登录用户:")
result = subprocess.run('whoami', shell=True, capture_output=True, text=True)
print(result.stdout.strip())

print("\n✅ 检查完成!")

if __name__ == "__main__":
check_server_status()

预期输出

========================================
📊 服务器状态检查 - 2024-01-15 14:30:00
========================================

💾 磁盘使用情况:
Filesystem      Size  Used Avail Use% Mounted on
/dev/disk1s1    234G  128G  106G  55% /

🧠 内存使用情况:
          total        used        free      shar  buff/cache   available
Mem:           16Gi       8.5Gi       2.1Gi       380Mi       5.4Gi       7.2Gi
Swap:          2Gi       256Mi       1.7Gi

👤 当前登录用户:
john

✅ 检查完成!

一句话解释:用 subprocess.run 执行系统命令,比你手动敲快多了,还不会敲错。


项目二:15 分钟 —— 并行处理多个文件

场景:你是一个数据分析师,有 10 个 CSV 文件,每个都要做同样的处理(计算总价、过滤无效数据)。一个一个处理太慢,要并行处理

import subprocess
import time
from concurrent.futures import ProcessPoolExecutor, as_completed

# 模拟一个 CSV 处理任务
def process_csv_file(file_path):
"""
子进程要执行的函数:模拟处理一个 CSV 文件
实际场景中这里会是你真正的数据处理逻辑
"""
# 用子进程执行 Python 脚本来处理文件
result = subprocess.run(
    ['python3', '-c', f'''
import random
import time
# 模拟处理 {file_path}
time.sleep(1)  # 模拟耗时操作
# 随机生成处理结果
total = sum(random.randint(100, 999) for _ in range(5))
print(f"文件 {file_path} 处理完成,总额: {{total}}")
'''],
    capture_output=True,
    text=True
)
return result.stdout.strip()

def main():
# 要处理的 10 个文件
files = [f"data_{i}.csv" for i in range(1, 11)]

print(f"📁 共有 {len(files)} 个文件待处理")
print("-" * 40)

start_time = time.time()

# 使用进程池并行处理
with ProcessPoolExecutor(max_workers=4) as executor:
    # 提交所有任务
    future_to_file = {
        executor.submit(process_csv_file, f): f 
        for f in files
    }

    # 收集结果
    completed = 0
    for future in as_completed(future_to_file):
        file_name = future_to_file[future]
        completed += 1
        try:
            result = future.result()
            print(f"[{completed}/{len(files)}] {result}")
        except Exception as e:
            print(f"[{completed}/{len(files)}] {file_name} 处理失败: {e}")

elapsed = time.time() - start_time
print("-" * 40)
print(f"✅ 全部处理完成!耗时: {elapsed:.2f} 秒")
print(f"📈 平均每个文件: {elapsed/len(files):.2f} 秒")

if __name__ == "__main__":
main()

预期输出

📁 共有 10 个文件待处理
----------------------------------------
[1/10] 文件 data_1.csv 处理完成,总额: 3842
[2/10] 文件 data_2.csv 处理完成,总额: 4211
[3/10] 文件 data_3.csv 处理完成,总额: 3105
[4/10] 文件 data_4.csv 处理完成,总额: 5678
[5/10] 文件 data_5.csv 处理完成,总额: 2987
[6/10] 文件 data_6.csv 处理完成,总额: 4123
[7/10] 文件 data_7.csv 处理完成,总额: 3891
[8/10] 文件 data_8.csv 处理完成,总额: 4521
[9/10] 文件 data_9.csv 处理完成,总额: 3712
[10/10] 文件 data_10.csv 处理完成,总额: 4198
----------------------------------------
✅ 全部处理完成!耗时: 3.08 秒
📈 平均每个文件: 0.31 秒

注意:如果有 10 个文件,每个处理 1 秒,串行需要 10 秒。用了 4 个并行进程,理论上 3 秒左右(10/4=2.5,加上开销 3 秒多)。

一句话解释ProcessPoolExecutor 就像包工头,你告诉它有 10 个任务、4 个工人,它自动分配、并行执行、收集结果。


项目三:15 分钟 —— 父子进程实时通信的爬虫小工具

场景:你想写一个爬虫,从网页抓数据。但 Node.js 主进程要和子进程实时通信——主进程告诉子进程要爬哪个 URL,子进程抓完告诉主进程结果。

这里用 Python 的 multiprocessing + Queue 实现类似功能:

from multiprocessing import Process, Queue
import time
import random

def web_crawler(worker_id, task_queue, result_queue):
"""
子进程:模拟爬虫工作
从任务队列拿 URL,爬取后把结果放到结果队列
"""
print(f"[爬虫-{worker_id}] 启动,等待任务...")

while True:
    # 从任务队列取任务(阻塞等待)
    task = task_queue.get()

    # 收到 'STOP' 就退出
    if task == 'STOP':
        print(f"[爬虫-{worker_id}] 收到停止信号,退出")
        break

    url = task
    print(f"[爬虫-{worker_id}] 正在爬取: {url}")

    # 模拟爬取耗时
    time.sleep(random.uniform(0.5, 1.5))

    # 模拟爬取结果
    result = {
        'url': url,
        'title': f"页面标题 - {url}",
        'status': 'success',
        'crawled_at': time.strftime('%H:%M:%S')
    }

    # 把结果放入结果队列
    result_queue.put(result)
    print(f"[爬虫-{worker_id}] {url} 爬取完成!")

def main():
# 要爬取的 URL 列表
urls = [
    "https://example.com/article/1",
    "https://example.com/article/2",
    "https://example.com/article/3",
    "https://example.com/article/4",
    "https://example.com/article/5",
    "https://example.com/article/6",
]

# 创建队列
task_queue = Queue()  # 主进程 -> 子进程:放任务
result_queue = Queue()  # 子进程 -> 主进程:放结果

# 启动 2 个爬虫进程
num_workers = 2
workers = []
for i in range(num_workers):
    p = Process(target=web_crawler, args=(i+1, task_queue, result_queue))
    p.start()
    workers.append(p)
    print(f"启动了爬虫进程-{i+1}")

print(f"\n📋 共 {len(urls)} 个 URL待爬取,{num_workers} 个爬虫并行\n")

# 向任务队列放入所有 URL
for url in urls:
    task_queue.put(url)

# 停止信号(等所有 URL 爬完再发)
for _ in range(num_workers):
    task_queue.put('STOP')

# 收集结果
results = []
for _ in range(len(urls)):
    result = result_queue.get()  # 阻塞等待结果
    results.append(result)
    print(f"[主进程] 收到结果: {result['url']} - {result['title']}")

# 等待所有爬虫进程结束
for p in workers:
    p.join()

print("\n" + "=" * 50)
print(f"✅ 爬取完成!共 {len(results)} 个页面")
print("=" * 50)

# 打印汇总
print("\n📊 爬取汇总:")
for r in results:
    print(f"  {r['url']} | {r['status']} | {r['crawled_at']}")

if __name__ == "__main__":
main()

预期输出

启动了爬虫进程-1
启动了爬虫进程-2
[爬虫-1] 启动,等待任务...
[爬虫-2] 启动,等待任务...

📋 共 6 个 URL待爬取,2 个爬虫并行

[爬虫-2] 正在爬取: https://example.com/article/2
[爬虫-1] 正在爬取: https://example.com/article/1
[爬虫-1] https://example.com/article/1 爬取完成!
[主进程] 收到结果: https://example.com/article/1 - 页面标题 - https://example.com/article/1
[爬虫-1] 正在爬取: https://example.com/article/3
[爬虫-2] https://example.com/article/2 爬取完成!
[主进程] 收到结果: https://example.com/article/2 - 页面标题 - https://example.com/article/2
[爬虫-2] 正在爬取: https://example.com/article/4
[爬虫-1] https://example.com/article/3 爬取完成!
[主进程] 收到结果: https://example.com/article/3 - 页面标题 - https://example.com/article/3
[爬虫-2] https://example.com/article/4 爬取完成!
[主进程] 收到结果: https://example.com/article/4 - 页面标题 - https://example.com/article/4
[爬虫-1] 正在爬取: https://example.com/article/5
[爬虫-2] 正在爬取: https://example.com/article/6
[爬虫-1] https://example.com/article/5 爬取完成!
[主进程] 收到结果: https://example.com/article/5 - 页面标题 - https://example.com/article/5
[爬虫-2] https://example.com/article/6 爬取完成!
[主进程] 收到结果: https://example.com/article/6 - 页面标题 - https://example.com/article/6
[爬虫-1] 收到停止信号,退出
[爬虫-2] 收到停止信号,退出

==================================================
✅ 爬取完成!共 6 个页面
==================================================

📊 爬取汇总:
https://example.com/article/1 | success | 14:30:01
https://example.com/article/2 | success | 14:30:02
https://example.com/article/3 | success | 14:30:02
https://example.com/article/4 | success | 14:30:03
https://example.com/article/5 | success | 14:30:03
https://example.com/article/6 | success | 14:30:04

一句话解释:两个爬虫进程「抢」任务队列里的 URL,谁有空谁拿,实现了真正的并行爬取。

配图2 - 配图2


💪 进阶:常见坑与调试技巧

坑一:shell=True 的安全性问题

❌ 错误示例:直接用用户输入执行命令

import subprocess

user_input = "hello; rm -rf /"  # 恶意输入
subprocess.run(f'echo {user_input}', shell=True)  # 危险!

✅ 正确示例:用列表形式,不经过 shell

import subprocess

# 用列表形式,参数被严格解析,不会被注入
subprocess.run(['echo', 'hello; rm -rf /'])  # 安全!; 被当成普通字符串

注意shell=True 存在命令注入风险,如果必须用 shell=True,一定要先校验输入


坑二:子进程卡死(管道缓冲区满)

❌ 错误示例:子进程输出大量数据

import subprocess

# 如果 grep 输出很大,可能会卡死
process = subprocess.Popen(
['grep', '-r', 'pattern', '/'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
stdout, stderr = process.communicate()  # 可能卡死!

✅ 正确示例:用文件或流式处理

import subprocess

# 把输出重定向到文件,避免缓冲区问题
with open('output.txt', 'w') as f:
subprocess.run(
    ['grep', '-r', 'pattern', '/'],
    stdout=f,
    stderr=subprocess.PIPE
)

坑三:忘记 join() 等待子进程结束

❌ 错误示例:子进程还没跑完,主进程就退出了

from multiprocessing import Process

def worker():
print("开始处理...")
time.sleep(2)
print("处理完成")

p = Process(target=worker)
p.start()
print("主进程退出了")  # 子进程还没执行完!
# 运行结果:只打印"主进程退出了",子进程的输出可能丢失

✅ 正确示例:记得 join() 等待

from multiprocessing import Process

def worker():
print("开始处理...")
time.sleep(2)
print("处理完成")

p = Process(target=worker)
p.start()
p.join()  # 等待子进程结束
print("主进程退出了")  # 子进程肯定执行完了
# 运行结果:正常打印所有输出

坑四:Windows 下 multiprocessing 的坑

❌ 错误示例:直接创建子进程(Windows 不支持 fork

from multiprocessing import Process

def worker():
print("hello")

# 在 Windows 上,这段代码在 if __name__ == "__main__": 外面会报错
p = Process(target=worker)
p.start()

✅ 正确示例:Windows 必须包在 if __name__ == "__main__":

from multiprocessing import Process

def worker():
print("hello")

if __name__ == "__main__":  # Windows 必须加这行!
p = Process(target=worker)
p.start()
p.join()

原因:Windows 没有 fork() 系统调用,只有 spawnif __name__ == "__main__" 用来防止子进程重复执行启动代码。


坑五:Queue 不能放不可 pickle 的对象

❌ 错误示例:尝试通过 Queue 传递函数

from multiprocessing import Process, Queue

def worker(func):  # 函数不能被 pickle
func()

q = Queue()
q.put(lambda: print("hello"))  # 报错!lambda 不能被序列化

✅ 正确示例:只传数据,不传代码

from multiprocessing import Process, Queue

def worker(q):
msg = q.get()
print(f"收到:{msg}")

q = Queue()
q.put("hello")  # 传数据,没问题

调试技巧:用 print 打日志

子进程调试最简单的方法就是 print——每个 print 都会带上进程前缀([MainProcess][SpawnProcess-1]):

from multiprocessing import Process, Queue
import time

def worker(q):
print(f"[子进程-{id(Process().pid)}] 开始工作")  # 打印进程 ID
while True:
    task = q.get()
    print(f"[子进程] 收到任务: {task}")
    if task == 'STOP':
        break
    time.sleep(0.5)
    print(f"[子进程] 完成: {task}")

if __name__ == "__main__":
q = Queue()
p = Process(target=worker, args=(q,))
p.start()

for i in range(3):
    q.put(f"任务-{i}")
q.put('STOP')

p.join()
print("[主进程] 全部完成")

✏️ 练习题

练习一(2 分钟):改个命令

# 题目:把下面的命令从 `ls` 改成 `pwd`,让它打印当前目录
# 输入:直接运行
# 预期输出:当前工作目录路径
import subprocess

result = subprocess.run(['ls'], capture_output=True, text=True)
print(result.stdout)

练习二(2 分钟):加个判断

# 题目:在执行命令后,加一个 if 判断,如果退出码不是 0 就打印"命令失败"
# 输入:执行 `ls /nonexistent_folder`(不存在的文件夹)
# 预期输出:打印"命令失败"
import subprocess

result = subprocess.run(['ls', '/nonexistent_folder'], capture_output=True, text=True)
# 你的代码:加 if 判断在这里

练习三(3 分钟):换个数据源

# 题目:把项目二的并行处理改成处理 JSON 文件(而不是 CSV)
# 输入:5 个 JSON 文件
# 预期输出:每个文件处理完成的信息
from multiprocessing import Process, Queue

def process_json(file_path):
# 你的代码:模拟处理一个 JSON 文件
# 提示:把原来 process_csv_file 里的逻辑复制过来,改个名字
pass

# 你的代码:启动进程池处理 ['data_1.json', 'data_2.json', ...]

练习四(3 分钟):串起两个项目

# 题目:把项目一的"系统检查"和项目二的"并行处理"结合起来
# 需求:先检查服务器状态(内存/CPU),然后根据状态决定启动多少个并行进程
# 提示:内存 > 80% 时用 2 个进程,< 80% 时用 4 个进程
import subprocess

def get_memory_usage():
# 你的代码:解析 free 命令输出,返回内存使用百分比
pass

def decide_worker_count():
# 你的代码:根据内存使用率决定 worker 数量
pass

练习五(5 分钟):读图分析报错

# 题目:运行以下代码时,控制台报错了。分析原因并修复

from multiprocessing import Process, Queue

def worker(q):
data = q.get()
print(f"收到: {data}")
q.put(f"处理完成: {data}")

if __name__ == "__main__":
q = Queue()
q.put("hello")
p = Process(target=worker, args=(q,))
p.start()
p.join()
print(q.get())

提示:这个代码在 Windows 上会出问题,想想是哪个坑导致的?


作业:做一个「智能文件备份工具」

需求描述:写一个工具,把指定文件夹里的所有文件并行压缩备份到另一个目录。

功能点
1. 读取源文件夹里所有文件
2. 用多进程并行压缩(每个文件一个子进程)
3. 把压缩包存到目标文件夹
4. 显示备份进度条

加分项
1. 支持排除某些文件类型(如 .tmp
2. 备份完成后生成一份报告(备份了哪些文件、总大小、耗时)

验收标准
- 能跑起来(python backup.py /src /dst
- 输出显示每个文件的备份状态
- 代码有中文注释


📚 总结

本文学了 3 件事

  1. subprocess:让 Python 调用系统命令,适合短小任务
  2. multiprocessing + Queue:创建能通信的子进程,适合复杂并行任务
  3. 进程池 ProcessPoolExecutor:自动管理多个进程,省去手动调度

延伸资源

互动钩子

下一章我们要聊一个更「重磅」的话题——cluster 集群。想象一下,如果 child_process 是一个厨师在做菜,那 cluster 就是开了一家连锁餐厅,多个厨师共享同一道菜谱,接待更多客人。怎么做到的?剧透一点点:它能让 Node.js 真正利用多核 CPU……想了解更多?下一章见!


你在工作中有没有遇到过「任务太重,程序卡死」的情况?当时是怎么解决的?评论区聊聊,老粉优先回复!

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。