第8章 8.2 itertools 迭代器工具
上一章我们学会了用
collections管理「特殊抽屉」里的数据(队列、默认值、链表),这一章我要给你介绍一个更强大的工具——itertools,它能让你像指挥家一样,优雅地操控数据流的「播放进度」。
🎯 开场 3 分钟:为什么要学这个?
你有没有遇到过这些情况:
场景 1:你要处理一个上百万行的 CSV 文件,内存直接爆了
场景 2:你手上有两个列表,想像拉链一样把它们「咬合」在一起,但 for 循环写了 10 行还没搞定
场景 3:你需要生成一个序列的所有排列组合,手指敲键盘敲了 10 分钟还没写出来
这些都是我当年学编程时真实踩过的坑。后来我学会了 itertools,发现这些问题用 3 行代码 就能解决。
itertools 是 Python 内置的「迭代器工具箱」,它能帮你:
- 🐇 生成无穷序列(不用自己写 while True)
- 🔗 组合多个数据源(像搭积木一样)
- 🎰 排列组合(数学老师看了都说好)
- ⚡ 惰性计算(用到才算,省内存)
学完这一章,你处理大数据文件时不会再「蓝屏」,写数据处理代码会像写伪代码一样简洁。
🧱 基础 25 分钟:核心概念
8.2.1 什么是迭代器?先把这个「自来水厂」搞明白
在讲 itertools 之前,我们先理解一个概念:迭代器。
想象一下自来水厂:
- 普通列表 = 你一次买了一大桶水放家里,占地方但随时能用
- 迭代器 = 水龙头,你打开才来水,不用提前存着
# 普通列表:一次性把数据全部加载到内存
数据_桶 = [1, 2, 3, 4, 5]
# 迭代器:需要的时候才「算」出来
迭代器 = iter(数据_桶) # 这只是一个"水龙头",还没水呢
print(next(迭代器)) # 打开水龙头,才出来"1"
print(next(迭代器)) # 出来"2"
小明:老师,这有啥区别?我直接用列表不就行了?
好问题!区别在于:
- 处理 1000 万行数据时,列表会把内存撑爆
- 迭代器「用到才算」,内存几乎不变

8.2.2 count() - 无限计数器
生活类比:自动售货机的编号,从 001 开始一直往下排,永不重复。
import itertools
# 创建一个无限计数器
计数器 = itertools.count(start=1, step=1)
# 取前5个
for i in range(5):
print(next(计数器))
输出:
1
2
3
4
5
这行
itertools.count(start=1, step=1)就是在创建一个「永动机」一样的序列,按顺序从 1 开始,步长 1 无限生成数字。
为什么要用:生成 ID、编号、序列号的时候贼好用,不用手动 i += 1。
8.2.3 cycle() - 循环复用
生活类比:星期一到星期日,过完周日又从周一开始,无限循环。
import itertools
# 创建一个循环器,无限重复 [1, 2, 3]
循环器 = itertools.cycle([1, 2, 3])
# 取前10个
for i in range(10):
print(next(循环器))
输出:
1
2
3
1
2
3
1
2
3
1
itertools.cycle([1, 2, 3])就像一个「走马灯」,走完一圈自动从头再来。
8.2.4 chain() - 串联多个序列
生活类比:两条铁路拼成一条长铁路,火车可以无缝跑完全程。
import itertools
列表A = [1, 2, 3]
列表B = [4, 5, 6]
列表C = [7, 8, 9]
# 用 chain 把它们串起来
长列表 = itertools.chain(列表A, 列表B, 列表C)
print(list(长列表))
输出:
[1, 2, 3, 4, 5, 6, 7, 8, 9]
itertools.chain()就是「粘合剂」,把多个列表/元组/任何可迭代对象,粘成一个长长的序列。
什么时候用:合并多个 CSV 文件、拼接用户数据表、多来源数据整合。
8.2.5 islice() - 切片迭代器
生活类比:自助餐排队,你只需要第 5-10 位的菜,不用看整个队伍。
import itertools
无限数字 = itertools.count(1)
# 只取第 5 到第 10 个(索引 5, 6, 7, 8, 9)
切片 = itertools.islice(无限数字, 5, 10)
print(list(切片))
输出:
[6, 7, 8, 9, 10]
itertools.islice(无限数字, 5, 10)意思是:从无限序列中取索引 5-9 的元素(左闭右开)。
为什么要用:处理大数据文件时,你不需要全部读完,只要读其中一段。
8.2.6 tee() - 一分为多
生活类比:一根水管分叉成三根,可以同时浇三块地。
import itertools
原始数据 = [1, 2, 3, 4, 5]
# 一个迭代器分成三个
份1, 份2, 份3 = itertools.tee(原始数据, 3)
print("份1:", list(份1))
print("份2:", list(份2))
print("份3:", list(份3))
输出:
份1: [1, 2, 3, 4, 5]
份2: [1, 2, 3, 4, 5]
份3: [1, 2, 3, 4, 5]
itertools.tee()会把一个迭代器「复制」成多个独立的迭代器,每个都可以单独遍历。
⚠️ 注意:tee 会占用内存,因为要保存「分叉点」之后的数据。

8.2.7 groupby() - 分组
生活类比:超市收银台按会员/非会员分组排队。
import itertools
数据 = [
("甲", "苹果"),
("甲", "香蕉"),
("乙", "橙子"),
("乙", "葡萄"),
("甲", "西瓜"),
]
# 按姓名分组
数据.sort(key=lambda x: x[0]) # groupby 要求同组元素相邻,先排序
for 姓名, 水果组 in itertools.groupby(数据, key=lambda x: x[0]):
水果列表 = list(水果组)
print(f"{姓名}买了: {[水果 for _, 水果 in 水果列表]}")
输出:
甲买了: ['苹果', '香蕉', '西瓜']
乙买了: ['橙子', '葡萄']
itertools.groupby()就像「自动分类机器」,把相同特征的物品归到一组。
坑来了:使用前记得排序!groupby 只识别相邻的相同元素。
8.2.8 combinations() 和 permutations() - 排列组合
生活类比:
- combinations = 从班级里选 3 个人去值日(顺序不重要,选出来就行)
- permutations = 从班级里选 3 个人坐前三把椅子(顺序重要,谁坐第一把很重要)
import itertools
colors = ["红", "黄", "蓝"]
# 组合:不讲顺序,C(3,2) = 3种
print("组合(选2个,不讲顺序):")
for 组 in itertools.combinations(colors, 2):
print(组)
输出:
('红', '黄')
('红', '蓝')
('黄', '蓝')
# 排列:讲顺序,P(3,2) = 6种
print("排列(选2个,讲顺序):")
for 组 in itertools.permutations(colors, 2):
print(组)
输出:
('红', '黄')
('红', '蓝')
('黄', '红')
('黄', '蓝')
('蓝', '红')
('蓝', '黄')
combinations是「不排队」的选法,permutations是「排好队」的选法。
什么时候用:
- 组合:考试选题、生成套餐
- 排列:密码破解(别问我怎么知道这个)、排名次
🔥 实战 35 分钟:3 个递进的小项目
📦 项目 1:自动生成测试数据(5 分钟)
场景:你正在开发一个电商系统,需要大量假订单数据测试。
import itertools
import random
import datetime
# 定义数据池
用户名池 = ["小明", "小红", "张三", "李四", "王五"]
商品池 = ["iPhone", "MacBook", "AirPods", "iPad", "Apple Watch"]
状态池 = ["已支付", "待发货", "已发货", "已完成"]
# 无限计数器生成订单号
订单计数器 = itertools.count(10001)
# 无线循环生成用户名和商品
用户名循环 = itertools.cycle(用户名池)
商品循环 = itertools.cycle(商品池)
状态循环 = itertools.cycle(状态池)
# 生成 10 条订单
print("=" * 50)
print("📋 自动生成订单数据")
print("=" * 50)
for i in range(10):
订单号 = next(订单计数器)
用户名 = next(用户名循环)
商品 = next(商品循环)
状态 = next(状态循环)
金额 = random.randint(100, 10000)
print(f"订单{订单号} | {用户名} | {商品} | {状态} | ¥{金额}")
输出:
==================================================
📋 自动生成订单数据
==================================================
订单10001 | 小明 | iPhone | 已支付 | ¥3821
订单10002 | 小红 | MacBook | 待发货 | ¥9234
订单10003 | 张三 | AirPods | 已发货 | ¥1245
订单10004 | 李四 | iPad | 已完成 | ¥5678
订单10005 | 王五 | Apple Watch | 已支付 | ¥2341
订单10006 | 小明 | iPhone | 待发货 | ¥8765
订单10007 | 小红 | MacBook | 已发货 | ¥4321
订单10008 | 张三 | AirPods | 已完成 | ¥1987
订单10009 | 李四 | iPad | 已支付 | ¥6543
订单10010 | 王五 | Apple Watch | 待发货 | ¥3214
这个例子展示了
count+cycle的组合威力:无限生成订单号 + 循环复用用户名/商品数据。
📦 项目 2:日志文件数据清洗(15 分钟)
场景:你有一个日志文件,里面记录了用户的访问记录,你只需要提取「2024 年以后」「错误级别」的日志。
import itertools
# 模拟日志数据(实际使用时从文件读取)
日志数据 = """
2023-01-15 10:23:45 [INFO] 用户登录成功
2023-03-20 14:30:00 [ERROR] 数据库连接失败
2024-01-10 09:15:30 [INFO] 用户下单成功
2024-02-14 16:45:00 [ERROR] 支付接口超时
2024-03-05 11:20:00 [INFO] 用户评价完成
2024-06-20 08:00:00 [ERROR] 短信发送失败
2024-12-25 22:30:00 [INFO] 管理员登录
""".strip().split('\n')
def 解析日志行(行):
"""解析单行日志,返回日期、级别、消息"""
parts = 行.split(']', 1)
时间级别 = parts[0].split('[')
日期 = 时间级别[0].strip()
级别 = 时间级别[1].strip()
消息 = parts[1].strip() if len(parts) > 1 else ""
return (日期, 级别, 消息)
def 年份大于2023(日志行):
"""判断日志是否是2024年及以后的"""
日期 = 解析日志行(日志行)[0]
return 日期.startswith("2024")
def 是错误级别(日志行):
"""判断日志是否是ERROR级别"""
return "[ERROR]" in 日志行
def 提取消息(日志行):
"""提取错误消息"""
_, _, 消息 = 解析日志行(日志行)
return 消息
# 第一步:过滤出 2024 年的日志
日志_2024 = itertools.filterfalse(lambda x: not 年份大于2023(x), 日志数据)
# 第二步:再过滤出 ERROR 级别
错误日志 = itertools.filterfalse(lambda x: not 是错误级别(x), 日志_2024)
# 第三步:只提取消息部分
错误消息 = map(提取消息, 错误日志)
print("=" * 50)
print("🐛 2024年错误日志汇总")
print("=" * 50)
for idx, msg in enumerate(错误消息, 1):
print(f"{idx}. {msg}")
输出:
==================================================
🐛 2024年错误日志汇总
==================================================
1. 支付接口超时
2. 短信发送失败
这个例子展示了
filterfalse(反向过滤)的用法:先按年份过滤,再按级别过滤,最后提取消息。三个操作链式调用,代码像「流水线」一样清晰。
📦 项目 3:组合套餐生成器(15 分钟)
场景:你是餐饮系统开发,要生成所有可能的「主食 + 配菜 + 饮料」组合,供用户选择。
import itertools
# 定义套餐选项
主食选项 = ["米饭", "面条", "馒头"]
配菜选项 = ["番茄炒蛋", "红烧肉", "清炒时蔬"]
饮料选项 = ["可乐", "橙汁", "矿泉水"]
# 生成所有排列组合
所有组合 = itertools.product(主食选项, 配菜选项, 饮料选项, repeat=1)
print("=" * 60)
print("🍱 套餐组合生成器 - 所有可能组合")
print("=" * 60)
# 转为列表方便操作
组合列表 = list(所有组合)
总数量 = len(组合列表)
# 计算总价(假设价格固定)
def 计算套餐价格(组合):
主食, 配菜, 饮料 = 组合
价格表 = {
"米饭": 3, "面条": 5, "馒头": 2,
"番茄炒蛋": 8, "红烧肉": 15, "清炒时蔬": 6,
"可乐": 3, "橙汁": 5, "矿泉水": 2
}
return sum(价格表.get(item, 0) for item in 组合)
# 只显示前 10 个组合
for idx, 组合 in enumerate(组合列表[:10], 1):
价格 = 计算套餐价格(组合)
print(f"{idx:2}. {组合[0]} + {组合[1]} + {组合[2]} = ¥{价格}")
print(f"\n... 共 {总数量} 种组合,显示前10个")
# 进阶:筛选低价组合(总价 < 15 元)
print("\n" + "=" * 60)
print("💰 省钱攻略:总价低于15元的组合")
print("=" * 60)
省钱组合 = filter(lambda c: 计算套餐价格(c) < 15, 组合列表)
for idx, 组合 in enumerate(list(省钱组合)[:5], 1):
价格 = 计算套餐价格(组合)
print(f"{idx}. {组合[0]} + {组合[1]} + {组合[2]} = ¥{价格}")
输出:
============================================================
🍱 套餐组合生成器 - 所有可能组合
============================================================
1. 米饭 + 番茄炒蛋 + 可乐 = ¥14
2. 米饭 + 番茄炒蛋 + 橙汁 = ¥16
3. 米饭 + 番茄炒蛋 + 矿泉水 = ¥13
4. 米饭 + 红烧肉 + 可乐 = ¥21
5. 米饭 + 红烧肉 + 橙汁 = ¥23
6. 米饭 + 红烧肉 + 矿泉水 = ¥20
7. 米饭 + 清炒时蔬 + 可乐 = ¥11
8. 米饭 + 清炒时蔬 + 橙汁 = ¥13
9. 米饭 + 清炒时蔬 + 矿泉水 = ¥10
10. 面条 + 番茄炒蛋 + 可乐 = ¥16
... 共 27 种组合,显示前10个
============================================================
💰 省钱攻略:总价低于15元的组合
============================================================
1. 米饭 + 番茄炒蛋 + 矿泉水 = ¥13
2. 米饭 + 清炒时蔬 + 可乐 = ¥11
3. 米饭 + 清炒时蔬 + 橙汁 = ¥13
4. 米饭 + 清炒时蔬 + 矿泉水 = ¥10
5. 馒头 + 番茄炒蛋 + 矿泉水 = ¥12
itertools.product是「笛卡尔积」,把三个列表两两配对,生成所有可能的组合。这比你写三层嵌套 for 循环优雅多了!
💪 进阶 20 分钟:常见坑 + 性能小贴士
❌ 坑 1:迭代器只能用一次
# ❌ 错误示例
计数器 = itertools.count(1)
print(list(计数器)) # [1, 2, 3, ...]
print(list(计数器)) # [] 空了!已经耗尽了!
# ✅ 正确示例
计数器 = itertools.count(1)
结果1 = list(itertools.islice(计数器, 5)) # 取5个
结果2 = list(itertools.islice(计数器, 5)) # 再取5个(接着上面)
print(结果1) # [1, 2, 3, 4, 5]
print(结果2) # [6, 7, 8, 9, 10]
迭代器就像「单向胶片」,看完就没了,不能回放。
❌ 坑 2:groupby 之前必须排序
# ❌ 错误示例
数据 = [("乙", 1), ("甲", 2), ("乙", 3)]
for 键, 组 in itertools.groupby(数据, key=lambda x: x[0]):
print(list(组)) # 输出: [('乙', 1)] [('甲', 2)] [('乙', 3)]
# 甲和乙分开了,没有正确分组!
# ✅ 正确示例
数据 = [("甲", 2), ("乙", 1), ("乙", 3)]
数据.sort() # 先排序!
for 键, 组 in itertools.groupby(数据, key=lambda x: x[0]):
print(list(组)) # 输出:[('甲', 2)] [('乙', 1), ('乙', 3)]
❌ 坑 3:无穷迭代器要有退出条件
# ❌ 危险示例:没有终止条件,会死循环
# for x in itertools.count(1):
# print(x) # 永远不会停!
# ✅ 正确示例:用 islice 限制数量
for x in itertools.islice(itertools.count(1), 100):
print(x) # 只打印前100个
小明:老师,我上次写了个
while True+count()组合,跑了一晚上电脑没关机……
❌ 坑 4:tee 会吃内存
# ❌ 危险示例
超大数据 = range(10000000) # 1000万个数据
份1, 份2, 份3 = itertools.tee(超大数据, 3)
# 如果你同时遍历这三个,内存会爆炸!
# ✅ 正确示例:用完一份再处理另一份,或者用 chain 合并
份1, 份2 = itertools.tee(range(100))
处理结果1 = list(份1) # 先处理完第一份
处理结果2 = list(份2) # 再处理第二份
❌ 坑 5:combinations 和 permutations 数量爆炸
# 10 个元素选 5 个排列:P(10,5) = 30240
# 20 个元素选 10 个排列:P(20,10) = 67044257280000 (67万亿!)
# ❌ 危险示例
# for 组合 in itertools.permutations(range(20), 10):
# print(组合) # 等到天荒地老
# ✅ 正确示例:先用 islice 限制数量
计数 = 0
for 组合 in itertools.permutations(range(20), 10):
计数 += 1
if 计数 >= 100:
break
print(f"显示了前100个组合")
⚡ 性能小贴士:链式操作比列表推导式更快
import itertools
import time
数据 = range(100000)
# ❌ 列表推导式:先算完所有结果,再过滤
start = time.time()
结果1 = [x*2 for x in 数据 if x % 2 == 0]
列表推导时间 = time.time() - start
# ✅ itertools 链式操作:惰性计算,用到才算
start = time.time()
结果2 = list(itertools.map(lambda x: x*2, itertools.filterfalse(lambda x: x % 2 != 0, 数据)))
itertools时间 = time.time() - start
print(f"列表推导式: {列表推导时间:.4f}秒")
print(f"itertools: {itertools时间:.4f}秒")
对于大数据量,
itertools的惰性计算可以显著减少内存占用。
🔍 调试技巧:用 enumerate 看迭代器走到哪了
import itertools
数据源 = itertools.cycle(["A", "B", "C", "D"])
print("调试迭代器状态:")
for idx, 值 in enumerate(数据源):
print(f"索引={idx}, 值={值}")
if idx >= 9: # 手动退出条件
print(f"当前走到第 {idx} 位,值为 {值}")
break
✏️ 练习题
练习 1(2 分钟):改改计数器
- 输入:把 itertools.count(start=10) 改成从 100 开始
- 预期输出:100, 101, 102, 103, 104
- 提示:count() 的 start 参数控制起始值
练习 2(3 分钟):加上 if 判断
- 输入:在项目 1 的订单生成代码中,只生成「已支付」状态的订单
- 预期输出:所有订单状态都是「已支付」
- 提示:用 itertools.repeat("已支付") 替代 状态循环
练习 3(5 分钟):处理新数据
- 输入:用项目 2 的方法,筛选出日志中所有「INFO」级别且包含「成功」二字的消息
- 预期输出:两条日志消息
- 提示:加一个 是成功级别 的过滤函数
练习 4(5 分钟):串联项目 2 和 3
- 输入:用项目 2 的数据过滤 + 项目 3 的组合思路,给「米饭」「面条」「馒头」分别搭配「番茄炒蛋」「红烧肉」
- 预期输出:6 种组合
- 提示:itertools.product 可以接受任意数量的可迭代对象
练习 5(5 分钟):分析报错
- 输入:下面的代码为什么会卡住?如何修改?
import itertools
for x in itertools.cycle([1,2,3]):
print(x)
- 预期输出:说明原因,并给出修复代码
- 提示:无穷迭代器需要手动退出
作业:做一个「数据抽样检查工具」
- 需求描述:假设你有一个超大数据集(模拟 10000 条用户数据),你需要抽样检查数据质量
- 功能点:
1. 生成 10000 条用户数据(用count+cycle模拟)
2. 每 100 条里抽 1 条,组成样本集(用islice实现)
3. 统计样本中每个「年龄段」的人数分布(用groupby) - 加分项:
1. 支持随机抽样(用random.sample从迭代器采样)
2. 生成抽样报告(文本格式) - 验收标准:
- 能跑起来
- 输出包含「总共 X 条数据,抽样 Y 条」
- 代码有注释
📚 总结 + 资源
本章 3 个核心要点
itertools是「惰性计算」的工具箱:用到才算,不占内存,特别适合处理大数据- 四大核心函数:生成(count/cycle)、组合(chain/product)、切片(islice/tee)、分组(groupby)
- 排列组合别傻算:
combinations和permutations帮你搞定数学题
延伸学习资源
- 官方文档 - itertools(英文)— 最权威的参考
- Python 3 模块参考 - itertools 章节(中文)— 官方中文翻译
- 《Python 编程:从入门到实践》第 9 章 — 有更多实战项目
互动钩子
你是做什么场景的?处理日志、数据清洗、还是生成测试数据?评论区聊聊你的需求,老粉优先回复!
下章预告:学会了 itertools 这个「迭代器工具箱」,下一章我们要介绍它的好搭档——functools,它专门处理「函数」的高级技巧。想象一下:给任意函数加装「缓存加速器」,让重复计算快 100 倍!敬请期待第 8 章 8.3 节。

评论(0)