第7章 7.2 Stream 流基础
📖 这是「Node.js 从入门到精通」系列第 32 / 45 章
⚠️ 重要说明:本章节虽属 Node.js 系列,但用 Python 授课。如果你正在找 Node.js 的 Stream 章节,请等后续更新。
🎯 开场 3 分钟:为什么要学这个?
上一章我们学会了用 Buffer 处理二进制数据,比如一张图片、一段音频,这些数据都是「一次性生成」的。
但真实场景往往更麻烦:
痛点 1:老板丢给你一个 5GB 的 CSV 日志文件,让你统计访问量。你一上来就读进内存?内存直接爆了。
痛点 2:你需要实时监控一个 API,每秒吐出一行 JSON。程序怎么「边收边处理」,而不是等全部收到再动手?
这俩问题的共同特点是:数据是「流」出来的,不是「堆」在那儿的。
学完这一章,你就能用「流」的思维处理数据——来一点处理一点,不占内存,效率翻倍。
🧱 基础 25 分钟:核心概念
什么是「流」?先别想代码
想象你用水管浇花:
- 传统方式:把整个游泳池的水抽到水桶里,然后用水桶浇(对应:一次性把数据全读进内存)
- 流的方式:直接接一根水管,边开水龙头边浇(对应:数据来一点处理一点)
一句话理解:流就是让数据「流动」起来,而不是「堆积」起来。
Python 里的「流」在哪里?
Python 其实到处都是流:
# 文件操作就是最常见的流
with open('data.csv', 'r') as f:
for line in f: # 一行一行读,不是全读进来
print(line.strip())
这里的 f 就是一个可迭代对象(像水管),数据从文件「流」出来。
生成器:流的基础
生成器(Generator) 是 Python 里「流」的核心实现。
普通函数一次性返回所有数据:
def get_numbers():
result = []
for i in range(5):
result.append(i)
return result
print(get_numbers()) # [0, 1, 2, 3, 4]
生成器用 yield「吐」出数据,来一个算一个:
def generate_numbers():
for i in range(5):
yield i # 吐出一个,停一下,等下次调用
gen = generate_numbers()
print(next(gen)) # 0
print(next(gen)) # 1
print(next(gen)) # 2
yield 就是水龙头的开关,每次调用只流出一滴水。

为什么要用生成器?
省内存!
# 普通方式:一次性生成 100万个数字,占内存
def get_million():
return list(range(1_000_000))
# 生成器方式:每次只占1个数字的内存
def generate_million():
for i in range(1_000_000):
yield i
假设每个数字占 28 字节:
- 普通方式:28MB
- 生成器方式:28 字节
差了 1000 倍!
读取大文件的正确姿势
假设 large_file.txt 有 100 万行:
# ❌ 错误:一次性全读进来
with open('large_file.txt', 'r') as f:
lines = f.readlines() # 内存爆炸警告
# ✅ 正确:一行一行读,流式处理
with open('large_file.txt', 'r') as f:
for line in f: # f 本身就是一个生成器
process(line)
一行一行读,就是最简单、最直觉的流处理。
链式处理:把多个流串起来
Python 没有 Node.js 的 pipe(),但我们可以用生成器链模拟:
def source():
"""数据源:无限生成数字"""
i = 0
while True:
yield i
i += 1
def filter_even(iterable):
"""过滤器:只留偶数"""
for item in iterable:
if item % 2 == 0:
yield item
def take(n, iterable):
"""限制器:只取前n个"""
for i, item in enumerate(iterable):
if i >= n:
break
yield item
# 串起来!source -> filter_even -> take
result = list(take(5, filter_even(source())))
print(result) # [0, 2, 4, 6, 8]
这就是「管道」的思路:数据从 A 流到 B 流到 C,每个环节只做一件事。

实战:用 itertools 做流式处理
Python 内置的 itertools 就是流处理神器:
import itertools
# 无限序列:1, 2, 3, 4, 5, 1, 2, 3...
cyclic = itertools.cycle([1, 2, 3, 4, 5])
# 累加:1, 3, 6, 10, 15...
accumulated = itertools.accumulate(cyclic)
# 只取前10个
result = list(itertools.islice(accumulated, 10))
print(result) # [1, 3, 6, 10, 15, 21, 28, 36, 45, 55]
记住这三个函数:
- cycle() - 无限循环
- accumulate() - 累加
- islice() - 截取前N个
🔥 实战 35 分钟:3 个递进项目
项目 1:单词计数器(5 分钟)
需求:统计一段文字里,每个单词出现的次数。
def read_words(text):
"""把文字流式拆成单词"""
word = ''
for char in text:
if char.isalnum():
word += char.lower()
else:
if word:
yield word
word = ''
if word:
yield word
def count_words(word_iter):
"""边读边统计"""
counts = {}
for word in word_iter:
counts[word] = counts.get(word, 0) + 1
return counts
# 测试
text = "Hello world, hello Python! World of Python."
result = count_words(read_words(text))
print(result)
# {'hello': 2, 'world': 2, 'python': 2, 'of': 1}
一句话:生成器让「读」和「处理」同时发生,不用等全部读完。
项目 2:日志分析器(15 分钟)
需求:分析一个日志文件,找出 ERROR 最多的 5 个时间点。
假设 app.log 长这样:
2024-01-01 10:00:01 INFO User login
2024-01-01 10:00:02 ERROR Database connection failed
2024-01-01 10:00:03 INFO User logout
2024-01-01 10:00:04 ERROR Timeout
...
def read_log_lines(filename):
"""流式读取日志文件,一行一行吐出来"""
with open(filename, 'r', encoding='utf-8') as f:
for line in f:
yield line.strip()
def filter_errors(line_iter):
"""只保留 ERROR 行,并提取时间戳"""
for line in line_iter:
if 'ERROR' in line:
# 提取前19个字符作为时间戳
timestamp = line[:19]
yield timestamp
def count_errors(timestamp_iter):
"""统计每个时间戳的 ERROR 次数"""
counts = {}
for ts in timestamp_iter:
counts[ts] = counts.get(ts, 0) + 1
return counts
def top_n(counts, n=5):
"""返回前n个最常见的"""
sorted_items = sorted(counts.items(), key=lambda x: x[1], reverse=True)
return sorted_items[:n]
# 主流程
log_lines = read_log_lines('app.log')
errors = filter_errors(log_lines)
error_counts = count_errors(errors)
top5 = top_n(error_counts, 5)
print("ERROR 最多的 5 个时间点:")
for ts, count in top5:
print(f" {ts}: {count} 次")
预期输出:
ERROR 最多的 5 个时间点:
2024-01-01 14:23:07: 12 次
2024-01-01 14:23:08: 8 次
2024-01-01 09:15:33: 5 次
...
一句话:不管日志文件多大(1MB 还是 5GB),内存占用始终稳定。
项目 3:实时数据清洗工具(15 分钟)
需求:从 CSV 文件读取销售数据,清洗后输出到新文件。
输入 sales_raw.csv:
日期,产品,数量,单价
2024-01-01,iPhone,5,6999
2024-01-01,MacBook, 3 ,8999
2024-01-02,AirPods,10,1299
,Apple Watch,4,2999
2024-01-03,iPad, 2 ,4999
目标:清洗数据(去空格、转类型)并计算每行总价。
import csv
def read_csv_rows(filename):
"""流式读取 CSV,跳过表头"""
with open(filename, 'r', encoding='utf-8') as f:
reader = csv.reader(f)
next(reader) # 跳过表头
for row in reader:
yield row
def clean_row(row):
"""清洗一行数据"""
if len(row) != 4:
return None # 过滤掉不完整的行
date, product, quantity, price = row
# 清洗:去空格,转整数
try:
quantity = int(quantity.strip())
price = int(price.strip())
return (date.strip(), product.strip(), quantity, price)
except ValueError:
return None # 过滤掉无法转换的行
def add_total(row_iter):
"""给每行加上总价"""
for row in row_iter:
if row is None:
continue
date, product, quantity, price = row
total = quantity * price
yield (date, product, quantity, price, total)
def write_csv(filename, row_iter):
"""写入 CSV"""
with open(filename, 'w', encoding='utf-8', newline='') as f:
writer = csv.writer(f)
writer.writerow(['日期', '产品', '数量', '单价', '总价'])
for row in row_iter:
writer.writerow(row)
# 主流程:管道串起来
rows = read_csv_rows('sales_raw.csv')
cleaned = (clean_row(r) for r in rows) # 生成器表达式
with_totals = add_total(cleaned)
write_csv('sales_cleaned.csv', with_totals)
print("清洗完成!结果已保存到 sales_cleaned.csv")
预期输出 sales_cleaned.csv:
日期,产品,数量,单价,总价
2024-01-01,iPhone,5,6999,34995
2024-01-01,MacBook,3,8999,26997
2024-01-02,AirPods,10,1299,12990
2024-01-03,iPad,2,4999,9998
一句话:输入有脏数据(空格、缺失行),但管道自动「过」掉它们,干净的才流到输出。
💪 进阶 20 分钟:常见坑 + 性能小贴士
坑 1:生成器只能跑一遍
# ❌ 错误:以为能再读一次
gen = (x for x in range(5))
print(list(gen)) # [0, 1, 2, 3, 4]
print(list(gen)) # [] 空了!
# ✅ 正确:要多次用,转成 list 或重新创建
gen = (x for x in range(5))
result1 = list(gen)
gen = (x for x in range(5)) # 重新创建
result2 = list(gen)
类比:水管里的水,流过去就没了,想再要就得重新开水龙头。
坑 2:忘记 yield 是暂停不是退出
# ❌ 错误:在循环外 return,以为会停止
def bad_generator():
for i in range(10):
if i == 5:
return # 这不是停止,是抛 StopIteration!
yield i
# ✅ 正确:用 break 停止
def good_generator():
for i in range(10):
if i == 5:
break # 这才叫停止
yield i
坑 3:生成器链不看中间环节
# ❌ 错误:以为 chained 会立即执行
def step1():
print("step1 开始")
for i in range(3):
yield i
def step2(iterable):
print("step2 开始")
for x in iterable:
yield x * 2
chained = step2(step1())
# 这时什么都没打印!因为生成器是惰性的
# ✅ 正确:真正消耗时才会执行
for item in chained:
print(item)
# step1 开始
# step2 开始
# 0
# 2
# 4
类比:开电视但不按播放,屏幕还是黑的。按下播放键(迭代)才真正开始。
坑 4:大文件别用 readlines()
# ❌ 错误:readlines() 会一次性加载所有行到内存
with open('big_file.txt', 'r') as f:
lines = f.readlines() # 危险!
# ✅ 正确:直接迭代文件对象
with open('big_file.txt', 'r') as f:
for line in f: # 一行一行读,内存稳定
process(line)
坑 5:CSV 写入要用 newline=''
# ❌ 错误:Windows 下会多空行
with open('output.csv', 'w') as f:
writer = csv.writer(f)
writer.writerow(['a', 'b']) # 多一个空行
# ✅ 正确:加 newline=''
with open('output.csv', 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['a', 'b'])
性能小贴士:用 __slots__ 减少内存
如果你的流处理对象需要创建大量实例:
# 普通类:每个实例有 __dict__,占内存
class Row:
def __init__(self, date, product, quantity):
self.date = date
self.product = product
self.quantity = quantity
# 用 __slots__:固定属性,不建 __dict__,省 40-50% 内存
class Row:
__slots__ = ('date', 'product', 'quantity')
def __init__(self, date, product, quantity):
self.date = date
self.product = product
self.quantity = quantity
调试技巧:给生成器加日志
def logged_generator(iterable, label="Generator"):
"""包装一个生成器,打印每次 yield 的值"""
for item in iterable:
print(f"[{label}] yielding: {item}")
yield item
# 使用
gen = logged_generator(range(5), "数字")
for x in gen:
print(f"收到: {x}")
输出:
[数字] yielding: 0
收到: 0
[数字] yielding: 1
收到: 1
...
✏️ 练习题 + 作业题
练习 1(2 分钟):改个数字
把项目 1 的单词计数器,改成只统计「长度大于 3 的单词」。
- 输入:
"I love Python programming" - 预期输出:
{'love': 1, 'python': 1, 'programming': 1} - 提示:在
count_words函数里加个if len(word) > 3判断
练习 2(2 分钟):加个条件
在项目 2 的日志分析器里,加一个功能:同时统计 WARN 级别的数量。
- 输入:包含 ERROR 和 WARN 的日志
- 预期输出:分别打印 ERROR 和 WARN 的 top 5
- 提示:复制
filter_errors函数,改成filter_warns
练习 3(3 分钟):新数据源
用项目 2 的方法,分析这个内嵌的日志字符串(不用读文件):
log_data = """2024-01-01 10:00:01 ERROR Database fail
2024-01-01 10:00:02 INFO User login
2024-01-01 10:00:03 ERROR Timeout
2024-01-01 10:00:04 ERROR Timeout
2024-01-01 10:00:05 ERROR Database fail"""
- 预期输出:ERROR 最多的前 2 个时间点
- 提示:把字符串转成生成器
for line in log_data.split('\n')
练习 4(5 分钟):串个项目
把练习 3 的结果,用项目 3 的 CSV 写入方式,保存成 error_report.csv。
- 预期输出:CSV 文件包含「时间戳」和「错误次数」两列
- 提示:先生成
(timestamp, count)元组的生成器,再传给 CSV 写入函数
练习 5(3 分钟):找 bug
下面代码运行后什么都没输出,是什么原因?
def get_odd():
for i in range(10):
if i % 2 == 1:
yield i
gen = get_odd()
print(list(gen))
print(list(gen)) # 这次想再拿一次,结果是空的
- 预期输出:解释为什么第二次是空的
- 提示:生成器对象迭代完就「 exhausted」了
作业:做一个「流式文本分析工具」
需求描述:做一个命令行工具,统计一个文本文件中:
- 总行数
- 总单词数(流式统计,不一次性加载)
- 出现频率最高的 3 个单词
功能点:
- 用生成器流式读取文件
- 用链式处理统计单词
- 输出格式清晰
加分项:
- 支持忽略常见停用词(the, a, is, are, ...)
- 支持 --top N 参数显示前 N 个高频词
验收标准:
- 能处理任意大小的文本文件(测试用 100MB)
- 内存占用不超过 50MB
- 代码有注释
提交方式:评论区贴代码或 GitHub 链接
📚 总结 + 资源
本章 3 个核心点
- 生成器是流的基础:
yield让数据「来一个吐一个」,内存占用极低 - 链式处理像流水线:把「读→洗→算→写」串起来,每步只做一件事
- 惰性执行:生成器不迭代就不执行,理解这点能少踩一半坑
延伸资源
- Python 官方文档 - itertools:流处理神器文档
- 《Python 进阶用法(一):生成器与迭代器》- 推荐搜索 B 站相关视频
- Fluent Python 书:第 14 章深入讲解迭代器和生成器
你在处理大文件时遇到过内存问题吗?用的什么方法解决的?评论区聊聊,老粉优先回复!
📌 下章预告:学会了流的基础,下一章我们来解决一个关键问题——数据来得太快,处理不过来怎么办? 这个问题在 Node.js 里叫「背压」,处理不好系统会崩。下一章教你用「暂停-恢复」机制让数据流稳如老狗。

评论(0)