第7章 7.2 Stream 流基础

📖 这是「Node.js 从入门到精通」系列第 32 / 45 章

⚠️ 重要说明:本章节虽属 Node.js 系列,但用 Python 授课。如果你正在找 Node.js 的 Stream 章节,请等后续更新。


🎯 开场 3 分钟:为什么要学这个?

上一章我们学会了用 Buffer 处理二进制数据,比如一张图片、一段音频,这些数据都是「一次性生成」的。

但真实场景往往更麻烦:

痛点 1:老板丢给你一个 5GB 的 CSV 日志文件,让你统计访问量。你一上来就读进内存?内存直接爆了。

痛点 2:你需要实时监控一个 API,每秒吐出一行 JSON。程序怎么「边收边处理」,而不是等全部收到再动手?

这俩问题的共同特点是:数据是「流」出来的,不是「堆」在那儿的

学完这一章,你就能用「流」的思维处理数据——来一点处理一点,不占内存,效率翻倍


🧱 基础 25 分钟:核心概念

什么是「流」?先别想代码

想象你用水管浇花:

  • 传统方式:把整个游泳池的水抽到水桶里,然后用水桶浇(对应:一次性把数据全读进内存)
  • 流的方式:直接接一根水管,边开水龙头边浇(对应:数据来一点处理一点)

一句话理解:流就是让数据「流动」起来,而不是「堆积」起来。

Python 里的「流」在哪里?

Python 其实到处都是流:

# 文件操作就是最常见的流
with open('data.csv', 'r') as f:
for line in f:  # 一行一行读,不是全读进来
    print(line.strip())

这里的 f 就是一个可迭代对象(像水管),数据从文件「流」出来。


生成器:流的基础

生成器(Generator) 是 Python 里「流」的核心实现。

普通函数一次性返回所有数据:

def get_numbers():
result = []
for i in range(5):
    result.append(i)
return result

print(get_numbers())  # [0, 1, 2, 3, 4]

生成器用 yield「吐」出数据,来一个算一个:

def generate_numbers():
for i in range(5):
    yield i  # 吐出一个,停一下,等下次调用

gen = generate_numbers()
print(next(gen))  # 0
print(next(gen))  # 1
print(next(gen))  # 2

yield 就是水龙头的开关,每次调用只流出一滴水。

配图1 - 配图1


为什么要用生成器?

省内存!

# 普通方式:一次性生成 100万个数字,占内存
def get_million():
return list(range(1_000_000))

# 生成器方式:每次只占1个数字的内存
def generate_million():
for i in range(1_000_000):
    yield i

假设每个数字占 28 字节:
- 普通方式:28MB
- 生成器方式:28 字节

差了 1000 倍


读取大文件的正确姿势

假设 large_file.txt 有 100 万行:

# ❌ 错误:一次性全读进来
with open('large_file.txt', 'r') as f:
lines = f.readlines()  # 内存爆炸警告

# ✅ 正确:一行一行读,流式处理
with open('large_file.txt', 'r') as f:
for line in f:  # f 本身就是一个生成器
    process(line)

一行一行读,就是最简单、最直觉的流处理。


链式处理:把多个流串起来

Python 没有 Node.js 的 pipe(),但我们可以用生成器链模拟:

def source():
"""数据源:无限生成数字"""
i = 0
while True:
    yield i
    i += 1

def filter_even(iterable):
"""过滤器:只留偶数"""
for item in iterable:
    if item % 2 == 0:
        yield item

def take(n, iterable):
"""限制器:只取前n个"""
for i, item in enumerate(iterable):
    if i >= n:
        break
    yield item

# 串起来!source -> filter_even -> take
result = list(take(5, filter_even(source())))
print(result)  # [0, 2, 4, 6, 8]

这就是「管道」的思路:数据从 A 流到 B 流到 C,每个环节只做一件事。

配图2 - 配图2


实战:用 itertools 做流式处理

Python 内置的 itertools 就是流处理神器:

import itertools

# 无限序列:1, 2, 3, 4, 5, 1, 2, 3...
cyclic = itertools.cycle([1, 2, 3, 4, 5])

# 累加:1, 3, 6, 10, 15...
accumulated = itertools.accumulate(cyclic)

# 只取前10个
result = list(itertools.islice(accumulated, 10))
print(result)  # [1, 3, 6, 10, 15, 21, 28, 36, 45, 55]

记住这三个函数
- cycle() - 无限循环
- accumulate() - 累加
- islice() - 截取前N个


🔥 实战 35 分钟:3 个递进项目

项目 1:单词计数器(5 分钟)

需求:统计一段文字里,每个单词出现的次数。

def read_words(text):
"""把文字流式拆成单词"""
word = ''
for char in text:
    if char.isalnum():
        word += char.lower()
    else:
        if word:
            yield word
        word = ''
if word:
    yield word

def count_words(word_iter):
"""边读边统计"""
counts = {}
for word in word_iter:
    counts[word] = counts.get(word, 0) + 1
return counts

# 测试
text = "Hello world, hello Python! World of Python."
result = count_words(read_words(text))
print(result)
# {'hello': 2, 'world': 2, 'python': 2, 'of': 1}

一句话:生成器让「读」和「处理」同时发生,不用等全部读完。


项目 2:日志分析器(15 分钟)

需求:分析一个日志文件,找出 ERROR 最多的 5 个时间点。

假设 app.log 长这样:

2024-01-01 10:00:01 INFO User login
2024-01-01 10:00:02 ERROR Database connection failed
2024-01-01 10:00:03 INFO User logout
2024-01-01 10:00:04 ERROR Timeout
...
def read_log_lines(filename):
"""流式读取日志文件,一行一行吐出来"""
with open(filename, 'r', encoding='utf-8') as f:
    for line in f:
        yield line.strip()

def filter_errors(line_iter):
"""只保留 ERROR 行,并提取时间戳"""
for line in line_iter:
    if 'ERROR' in line:
        # 提取前19个字符作为时间戳
        timestamp = line[:19]
        yield timestamp

def count_errors(timestamp_iter):
"""统计每个时间戳的 ERROR 次数"""
counts = {}
for ts in timestamp_iter:
    counts[ts] = counts.get(ts, 0) + 1
return counts

def top_n(counts, n=5):
"""返回前n个最常见的"""
sorted_items = sorted(counts.items(), key=lambda x: x[1], reverse=True)
return sorted_items[:n]

# 主流程
log_lines = read_log_lines('app.log')
errors = filter_errors(log_lines)
error_counts = count_errors(errors)
top5 = top_n(error_counts, 5)

print("ERROR 最多的 5 个时间点:")
for ts, count in top5:
print(f"  {ts}: {count} 次")

预期输出

ERROR 最多的 5 个时间点:
2024-01-01 14:23:07: 12 次
2024-01-01 14:23:08: 8 次
2024-01-01 09:15:33: 5 次
...

一句话:不管日志文件多大(1MB 还是 5GB),内存占用始终稳定。


项目 3:实时数据清洗工具(15 分钟)

需求:从 CSV 文件读取销售数据,清洗后输出到新文件。

输入 sales_raw.csv

日期,产品,数量,单价
2024-01-01,iPhone,5,6999
2024-01-01,MacBook,  3 ,8999
2024-01-02,AirPods,10,1299
,Apple Watch,4,2999
2024-01-03,iPad, 2 ,4999

目标:清洗数据(去空格、转类型)并计算每行总价。

import csv

def read_csv_rows(filename):
"""流式读取 CSV,跳过表头"""
with open(filename, 'r', encoding='utf-8') as f:
    reader = csv.reader(f)
    next(reader)  # 跳过表头
    for row in reader:
        yield row

def clean_row(row):
"""清洗一行数据"""
if len(row) != 4:
    return None  # 过滤掉不完整的行

date, product, quantity, price = row
# 清洗:去空格,转整数
try:
    quantity = int(quantity.strip())
    price = int(price.strip())
    return (date.strip(), product.strip(), quantity, price)
except ValueError:
    return None  # 过滤掉无法转换的行

def add_total(row_iter):
"""给每行加上总价"""
for row in row_iter:
    if row is None:
        continue
    date, product, quantity, price = row
    total = quantity * price
    yield (date, product, quantity, price, total)

def write_csv(filename, row_iter):
"""写入 CSV"""
with open(filename, 'w', encoding='utf-8', newline='') as f:
    writer = csv.writer(f)
    writer.writerow(['日期', '产品', '数量', '单价', '总价'])
    for row in row_iter:
        writer.writerow(row)

# 主流程:管道串起来
rows = read_csv_rows('sales_raw.csv')
cleaned = (clean_row(r) for r in rows)  # 生成器表达式
with_totals = add_total(cleaned)
write_csv('sales_cleaned.csv', with_totals)

print("清洗完成!结果已保存到 sales_cleaned.csv")

预期输出 sales_cleaned.csv

日期,产品,数量,单价,总价
2024-01-01,iPhone,5,6999,34995
2024-01-01,MacBook,3,8999,26997
2024-01-02,AirPods,10,1299,12990
2024-01-03,iPad,2,4999,9998

一句话:输入有脏数据(空格、缺失行),但管道自动「过」掉它们,干净的才流到输出。


💪 进阶 20 分钟:常见坑 + 性能小贴士

坑 1:生成器只能跑一遍

# ❌ 错误:以为能再读一次
gen = (x for x in range(5))
print(list(gen))  # [0, 1, 2, 3, 4]
print(list(gen))  # [] 空了!

# ✅ 正确:要多次用,转成 list 或重新创建
gen = (x for x in range(5))
result1 = list(gen)
gen = (x for x in range(5))  # 重新创建
result2 = list(gen)

类比:水管里的水,流过去就没了,想再要就得重新开水龙头。


坑 2:忘记 yield 是暂停不是退出

# ❌ 错误:在循环外 return,以为会停止
def bad_generator():
for i in range(10):
    if i == 5:
        return  # 这不是停止,是抛 StopIteration!
    yield i

# ✅ 正确:用 break 停止
def good_generator():
for i in range(10):
    if i == 5:
        break  # 这才叫停止
    yield i

坑 3:生成器链不看中间环节

# ❌ 错误:以为 chained 会立即执行
def step1():
print("step1 开始")
for i in range(3):
    yield i

def step2(iterable):
print("step2 开始")
for x in iterable:
    yield x * 2

chained = step2(step1())
# 这时什么都没打印!因为生成器是惰性的

# ✅ 正确:真正消耗时才会执行
for item in chained:
print(item)
# step1 开始
# step2 开始
# 0
# 2
# 4

类比:开电视但不按播放,屏幕还是黑的。按下播放键(迭代)才真正开始。


坑 4:大文件别用 readlines()

# ❌ 错误:readlines() 会一次性加载所有行到内存
with open('big_file.txt', 'r') as f:
lines = f.readlines()  # 危险!

# ✅ 正确:直接迭代文件对象
with open('big_file.txt', 'r') as f:
for line in f:  # 一行一行读,内存稳定
    process(line)

坑 5:CSV 写入要用 newline=''

# ❌ 错误:Windows 下会多空行
with open('output.csv', 'w') as f:
writer = csv.writer(f)
writer.writerow(['a', 'b'])  # 多一个空行

# ✅ 正确:加 newline=''
with open('output.csv', 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['a', 'b'])

性能小贴士:用 __slots__ 减少内存

如果你的流处理对象需要创建大量实例:

# 普通类:每个实例有 __dict__,占内存
class Row:
def __init__(self, date, product, quantity):
    self.date = date
    self.product = product
    self.quantity = quantity

# 用 __slots__:固定属性,不建 __dict__,省 40-50% 内存
class Row:
__slots__ = ('date', 'product', 'quantity')
def __init__(self, date, product, quantity):
    self.date = date
    self.product = product
    self.quantity = quantity

调试技巧:给生成器加日志

def logged_generator(iterable, label="Generator"):
"""包装一个生成器,打印每次 yield 的值"""
for item in iterable:
    print(f"[{label}] yielding: {item}")
    yield item

# 使用
gen = logged_generator(range(5), "数字")
for x in gen:
print(f"收到: {x}")

输出:

[数字] yielding: 0
收到: 0
[数字] yielding: 1
收到: 1
...

✏️ 练习题 + 作业题

练习 1(2 分钟):改个数字

把项目 1 的单词计数器,改成只统计「长度大于 3 的单词」。

  • 输入:"I love Python programming"
  • 预期输出:{'love': 1, 'python': 1, 'programming': 1}
  • 提示:在 count_words 函数里加个 if len(word) > 3 判断

练习 2(2 分钟):加个条件

在项目 2 的日志分析器里,加一个功能:同时统计 WARN 级别的数量。

  • 输入:包含 ERROR 和 WARN 的日志
  • 预期输出:分别打印 ERROR 和 WARN 的 top 5
  • 提示:复制 filter_errors 函数,改成 filter_warns

练习 3(3 分钟):新数据源

用项目 2 的方法,分析这个内嵌的日志字符串(不用读文件):

log_data = """2024-01-01 10:00:01 ERROR Database fail
2024-01-01 10:00:02 INFO User login
2024-01-01 10:00:03 ERROR Timeout
2024-01-01 10:00:04 ERROR Timeout
2024-01-01 10:00:05 ERROR Database fail"""
  • 预期输出:ERROR 最多的前 2 个时间点
  • 提示:把字符串转成生成器 for line in log_data.split('\n')

练习 4(5 分钟):串个项目

把练习 3 的结果,用项目 3 的 CSV 写入方式,保存成 error_report.csv

  • 预期输出:CSV 文件包含「时间戳」和「错误次数」两列
  • 提示:先生成 (timestamp, count) 元组的生成器,再传给 CSV 写入函数

练习 5(3 分钟):找 bug

下面代码运行后什么都没输出,是什么原因?

def get_odd():
for i in range(10):
    if i % 2 == 1:
        yield i

gen = get_odd()
print(list(gen))
print(list(gen))  # 这次想再拿一次,结果是空的
  • 预期输出:解释为什么第二次是空的
  • 提示:生成器对象迭代完就「 exhausted」了

作业:做一个「流式文本分析工具」

需求描述:做一个命令行工具,统计一个文本文件中:

  1. 总行数
  2. 总单词数(流式统计,不一次性加载)
  3. 出现频率最高的 3 个单词

功能点
- 用生成器流式读取文件
- 用链式处理统计单词
- 输出格式清晰

加分项
- 支持忽略常见停用词(the, a, is, are, ...)
- 支持 --top N 参数显示前 N 个高频词

验收标准
- 能处理任意大小的文本文件(测试用 100MB)
- 内存占用不超过 50MB
- 代码有注释

提交方式:评论区贴代码或 GitHub 链接


📚 总结 + 资源

本章 3 个核心点

  1. 生成器是流的基础yield 让数据「来一个吐一个」,内存占用极低
  2. 链式处理像流水线:把「读→洗→算→写」串起来,每步只做一件事
  3. 惰性执行:生成器不迭代就不执行,理解这点能少踩一半坑

延伸资源

  1. Python 官方文档 - itertools:流处理神器文档
  2. 《Python 进阶用法(一):生成器与迭代器》- 推荐搜索 B 站相关视频
  3. Fluent Python 书:第 14 章深入讲解迭代器和生成器

你在处理大文件时遇到过内存问题吗?用的什么方法解决的?评论区聊聊,老粉优先回复!

📌 下章预告:学会了流的基础,下一章我们来解决一个关键问题——数据来得太快,处理不过来怎么办? 这个问题在 Node.js 里叫「背压」,处理不好系统会崩。下一章教你用「暂停-恢复」机制让数据流稳如老狗。

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。