第4章 4.4 事件循环 Event Loop
「上章我们学会了 async/await,写异步代码终于不用被 Promise.then().then() 绕晕了。但你有没有想过——这些异步任务到底是谁在排队?谁在决定先执行哪个?这一章我们就来揭开这个调度员的面纱。」
🎯 开场 3 分钟:为什么要学这个?
你有没有遇到过这些情况?
- 明明代码顺序没问题,但结果总是乱套
- 写了个爬虫,requests 发出去后后面的代码提前跑了
- 「同步」和「异步」分不清,一调试就懵
我当年学 Python 异步的时候,卡在事件循环上整整 3 天。就是搞不懂:await 到底在等谁?为什么有时候代码不按顺序执行?
这一章结束之后,你会彻底搞清楚:
Python 的事件循环是怎么调度你的异步任务的,以及为什么 await 能「暂停」一个函数但不阻塞整个程序。
🧱 基础 25 分钟:核心概念(小白视角)
4.4.1 什么是事件循环?
想象你是一家外卖店的老板。
你有 10 个\n\n
\n\n
\n\n订单,但只有 1 个厨师。厨师一次只能做一道菜,但他不会傻等着菜做完——他会在等菜炖的时候去准备下一道菜的食材。
事件循环就是这个逻辑:
一个永远不停止的「循环」,不断检查「有没有事情要做」,有就处理,没有就继续等。
在 Python 里,这个「老板」就是 asyncio 的事件循环。它负责:
1. 看看有哪些任务在等待
2. 调度它们交替执行
3. 保证等待的时候不浪费时间
4.4.2 生活中的例子:等电梯
想象你在公司等电梯:
- 你按了按钮(发起异步请求)
- 电梯正在从 5 楼下来(执行中)
- 你不会一直盯着电梯看(不阻塞)
- 你可以刷手机、做别的事(事件循环去处理其他任务)
- 电梯到了,通知你(回调/继续执行)
事件循环就是这个「通知系统」——它帮你管理等待,让程序在你等待的时候去干别的事。
4.4.3 代码初体验:亲眼看事件循环
import asyncio
async def 煮水():
print("1️⃣ 开始烧水...")
await asyncio.sleep(3) # 模拟耗时操作
print("1️⃣ 水烧开了!")
async def 切菜():
print("2️⃣ 开始切菜...")
await asyncio.sleep(2)
print("2️⃣ 菜切好了!")
async def main():
# 创建两个任务(但不立即执行)
task1 = asyncio.create_task(煮水())
task2 = asyncio.create_task(切菜())
print("🚀 主程序开始")
await task1
await task2
print("✅ 全部完成")
asyncio.run(main())
预期输出:
🚀 主程序开始
1️⃣ 开始烧水...
2️⃣ 开始切菜...
1️⃣ 水烧开了!
2️⃣ 菜切好了!
✅ 全部完成
注意看:切菜只用了 2 秒,水要 3 秒,但因为它们「同时」开始,所以总时间只用了 3 秒而不是 5 秒。这就是事件循环的威力——任务在等待时自动切换。
4.4.4 宏任务 vs 微任务(重点!)
这是最容易搞混的地方。
宏任务(Macrotask):可以理解成「大任务」,每次事件循环只执行一个
- setTimeout 的回调
- I/O 操作
- asyncio.sleep()
微任务(Microtask):可以理解成「小任务」,在当前宏任务结束后立即执行,全部执行完才换下一个宏任务
- Promise.then() 的回调(JavaScript 概念)
- await 后面的代码
Python 版本:
在 Python 的 asyncio 里:
- 宏任务 = asyncio.create_task() 创建的任务
- 微任务 = await 表达式后面紧跟的代码
import asyncio
async def demo():
print("A")
await asyncio.sleep(0) # 这是一个「让出」操作
print("B")
await asyncio.sleep(0)
print("C")
asyncio.run(demo())
输出: A → B → C(按顺序)
但如果是这样:
import asyncio
async def 任务1():
print("任务1: 开始")
await asyncio.sleep(2)
print("任务1: 结束")
async def 任务2():
print("任务2: 开始")
await asyncio.sleep(1)
print("任务2: 结束")
async def main():
task1 = asyncio.create_task(任务1())
task2 = asyncio.create_task(任务2())
await asyncio.sleep(0.5) # 模拟主程序的中途检查
print("主程序: 我在中间插了一脚!")
asyncio.run(main())
预期输出:
任务1: 开始
任务2: 开始
主程序: 我在中间插了一脚!
任务2: 结束
任务1: 结束
看到了吗?事件循环在 sleep 的时候切换任务,而不是等一个完全做完再做另一个。
4.4.5 手动控制事件循环
有时候你想更精细地控制事件循环,比如在一个已有的循环里添加任务:
import asyncio
async def 任务(name, 耗时):
print(f"{name} 开始(需要 {耗时} 秒)")
await asyncio.sleep(耗时)
print(f"{name} 结束")
async def main():
# 获取当前事件循环
loop = asyncio.get_running_loop()
# 创建任务但先不执行
task1 = loop.create_task(任务("任务A", 2))
task2 = loop.create_task(任务("任务B", 1))
print("两个任务已创建,等待完成...")
# 等待所有任务完成
await asyncio.gather(task1, task2)
print("全部完成!")
asyncio.run(main())
预期输出:
两个任务已创建,等待完成...
任务A 开始(需要 2 秒)
任务B 开始(需要 1 秒)
任务B 结束
任务A 结束
全部完成!
🔥 实战 35 分钟:3 个递进的小项目
📦 项目 1:并发下载模拟器(5 分钟)
场景: 你要做竞品分析,需要同时抓取多个网页的数据。
import asyncio
import time
async def 下载网页(网址, 延迟秒数):
"""模拟下载一个网页"""
print(f"📥 开始下载: {网址}")
await asyncio.sleep(延迟秒数) # 模拟网络延迟
print(f"✅ 下载完成: {网址}")
return f"{网址} 的内容"
async def main():
网页列表 = [
("淘宝", 2),
("京东", 1.5),
("拼多多", 1),
]
开始时间 = time.time()
# 创建任务列表
任务们 = [下载网页(网址, 延迟) for 网址, 延迟 in 网页列表]
# 并发执行所有任务
结果 = await asyncio.gather(*任务们)
耗时 = time.time() - 开始时间
print(f"\n📊 总耗时: {耗时:.2f} 秒")
print(f"📦 下载了 {len(结果)} 个网页")
asyncio.run(main())
预期输出:
📥 开始下载: 淘宝
📥 开始下载: 京东
📥 开始下载: 拼多多
✅ 下载完成: 拼多多
✅ 下载完成: 京东
✅ 下载完成: 淘宝
📊 总耗时: 2.01 秒
📦 下载了 3 个网页
一句话解释:
asyncio.gather()让所有任务「同时」执行,总耗时等于最慢的那个(2 秒),而不是三个加起来(4.5 秒)。
📦 项目 2:带优先级的任务调度器(15 分钟)
场景: 你在做一个爬虫,需要按优先级抓取数据。紧急的任务要先执行。
import asyncio
from dataclasses import dataclass
from typing import Optional
@dataclass
class 爬虫任务:
名称: str
优先级: int # 数字越大越优先
预估耗时: float
async def 执行爬虫任务(任务: 爬虫任务):
"""模拟执行一个爬虫任务"""
print(f"🔄 开始 [{任务.名称}] (优先级:{任务.优先级})")
await asyncio.sleep(任务.预估耗时)
print(f"✅ 完成 [{任务.名称}]")
return 任务.名称
async def 优先调度器(任务列表: list[爬虫任务]):
"""按优先级顺序执行任务"""
# 按优先级排序(降序)
排序后 = sorted(任务列表, key=lambda x: x.优先级, reverse=True)
print("📋 任务执行顺序:", " → ".join(t.名称 for t in 排序后))
# 按顺序执行(因为是IO密集型,实际可以并发,但这里演示顺序)
结果 = []
for 任务 in 排序后:
结果.append(await 执行爬虫任务(任务))
return 结果
async def main():
任务们 = [
爬虫任务("首页", 优先级=1, 预估耗时=2),
爬虫任务("商品详情", 优先级=5, 预估耗时=1),
爬虫任务("用户评价", 优先级=3, 预估耗时=1.5),
爬虫任务("推荐商品", 优先级=4, 预估耗时=0.5),
]
print("=" * 40)
print("🕷️ 爬虫任务调度演示")
print("=" * 40)
结果 = await 优先调度器(任务们)
print("\n📊 执行结果:", 结果)
asyncio.run(main())
预期输出:
========================================
🕷️ 爬虫任务调度演示
========================================
📋 任务执行顺序:商品详情 → 推荐商品 → 用户评价 → 首页
🔄 开始 [商品详情] (优先级:5)
✅ 完成 [商品详情]
🔄 开始 [推荐商品] (优先级:4)
✅ 完成 [推荐商品]
🔄 开始 [用户评价] (优先级:3)
✅ 完成 [用户评价]
🔄 开始 [首页] (优先级:1)
✅ 完成 [首页]
📊 执行结果:['商品详情', '推荐商品', '用户评价', '首页']
一句话解释:事件循环按优先级顺序「排队」,高优先级的任务先执行。
📦 项目 3:实时数据监控面板(15 分钟)
场景: 做一个简单的监控系统,并发检查多个服务的状态。
import asyncio
import time
from dataclasses import dataclass
@dataclass
class 服务状态:
名称: str
状态: str
延迟毫秒: int
async def 检查单个服务(服务名: str) -> 服务状态:
"""模拟检查一个服务的健康状态"""
# 模拟不同服务的响应时间
延迟表 = {
"用户服务": 0.1,
"订单服务": 0.2,
"支付服务": 0.3,
"库存服务": 0.15,
"通知服务": 0.25,
}
延迟 = 延迟表.get(服务名, 0.2)
await asyncio.sleep(延迟)
# 模拟:90% 概率正常
import random
是否正常 = random.random() > 0.1
状态 = "🟢 正常" if 是否正常 else "🔴 故障"
return 服务状态(
名称=服务名,
状态=状态,
延迟毫秒=int(延迟 * 1000)
)
async def 监控面板():
"""并发检查所有服务"""
服务列表 = ["用户服务", "订单服务", "支付服务", "库存服务", "通知服务"]
print("🔍 正在检查所有服务状态...\n")
开始时间 = time.time()
# 并发检查所有服务
任务们 = [检查单个服务(s) for s in 服务列表]
结果列表 = await asyncio.gather(*任务们)
总耗时 = time.time() - 开始时间
# 打印结果表格
print("-" * 50)
print(f"{'服务名称':<12} {'状态':<10} {'延迟':<10}")
print("-" * 50)
for 状态 in 结果列表:
print(f"{状态.名称:<12} {状态.状态:<10} {状态.延迟毫秒}ms")
print("-" * 50)
print(f"✅ 检查完成,总耗时: {总耗时:.2f}秒")
# 统计
正常数 = sum(1 for s in 结果列表 if "正常" in s.状态)
print(f"📊 存活率: {正常数}/{len(结果列表)}")
asyncio.run(监控面板())
预期输出:
🔍 正在检查所有服务状态...
--------------------------------------------------
服务名称 状态 延迟
--------------------------------------------------
用户服务 🟢 正常 100ms
订单服务 🟢 正常 200ms
支付服务 🔴 故障 300ms
库存服务 🟢 正常 150ms
通知服务 🟢 正常 250ms
--------------------------------------------------
✅ 检查完成,总耗时: 0.31秒
📊 存活率: 4/5
一句话解释:用
asyncio.gather()并发检查 5 个服务,总耗时只有 0.31 秒(最慢的 0.3 秒),而不是串行的 0.95 秒。
💪 进阶 20 分钟:常见坑 + 性能小贴士
❌ 坑 1:忘记 await,任务根本不执行
# ❌ 错误写法
async def main():
task = asyncio.create_task(某个异步函数())
# 这里task根本没执行!因为忘了await
# ✅ 正确写法
async def main():
task = asyncio.create_task(某个异步函数())
await task # 一定要await!
❌ 坑 2:在同步函数里用 await
# ❌ 错误写法
def 同步函数():
await asyncio.sleep(1) # SyntaxError! 同步函数不能await
# ✅ 正确写法
async def 异步函数():
await asyncio.sleep(1)
❌ 坑 3:混淆 sleep 和 sleep(0)
# asyncio.sleep(1) - 真正等待1秒,期间可以切换到其他任务
await asyncio.sleep(1)
# asyncio.sleep(0) - 几乎不等待,但会强制让出执行权给其他任务
await asyncio.sleep(0) # 常用于「让出」控制权
❌ 坑 4:事件循环嵌套(新手禁区)
# ❌ 错误写法 - 嵌套事件循环
async def 子函数():
await asyncio.sleep(1)
async def 父函数():
await 子函数() # 这个可以
# 但如果你在已有事件循环里用 asyncio.run(),会报错
# RuntimeError: asyncio.run() cannot be called from a running event loop
# ✅ 正确做法:用 await 或 get_running_loop()
❌ 坑 5:任务创建后立即丢弃
# ❌ 错误写法 - 任务创建了但没保存
async def main():
asyncio.create_task(某个异步函数()) # 任务可能永远不会执行!
print("主函数结束了") # 程序直接退出,任务被取消了
# ✅ 正确写法 - 保存任务引用并等待
async def main():
task = asyncio.create_task(某个异步函数())
await task
💡 性能小贴士:IO 密集型用并发,CPU 密集型别硬撑
# 如果你的任务是 CPU 密集型(计算、加密等),asyncio 帮不了太多
# 这种时候用 multiprocessing 而不是 asyncio
# 但如果你的任务是 IO 密集型(网络请求、文件读写),asyncio 效果明显
async def 爬取1000个网页():
任务们 = [爬取单个(url) for url in urls]
await asyncio.gather(*任务们) # 并发,效率高
🔧 调试技巧:用 asyncio.current_task()
import asyncio
async def 任务A():
当前任务 = asyncio.current_task()
print(f"当前任务: {当前任务.get_name()}")
async def main():
await asyncio.gather(
任务A(),
任务A(),
)
asyncio.run(main())
✏️ 练习题 + 作业题
练习题(10 分钟)
练习 1(2 分钟):并发任务基础
- 输入:创建一个异步函数 打招呼(名字, 延迟),打印 "你好,{名字}!"
- 预期输出:同时启动 3 个任务,总耗时约 1 秒
- 提示:asyncio.gather() 可以并发执行多个任务
练习 2(2 分钟):加个判断
- 输入:在项目 1 基础上,如果延迟大于 1.5 秒,打印 "超时警告"
- 预期输出:根据延迟显示不同信息
- 提示:在 await 之后加 if 判断
练习 3(2 分钟):新数据处理
- 输入:用项目 2 的优先级调度器处理新的任务列表:[("爬虫A", 3, 2), ("爬虫B", 1, 1), ("爬虫C", 5, 0.5)]
- 预期输出:按优先级 5→3→1 的顺序执行
- 提示:直接改任务列表的参数即可
练习 4(3 分钟):串两个项目
- 输入:把项目 2 的优先级排序 + 项目 3 的并发执行结合起来
- 预期输出:先按优先级排序,再用 gather() 并发执行
- 提示:排序后用 create_task() 创建任务,然后 gather()
练习 5(1 分钟):找错
- 输入:以下代码为什么只打印 "Done" 而不打印 "Task finished"?
async def long_task():
await asyncio.sleep(3)
print("Task finished")
async def main():
asyncio.create_task(long_task())
print("Done")
asyncio.run(main())
- 预期输出:需要修改才能让 Task finished 打印出来
- 提示:任务创建后需要
await它
📝 作业:做一个「批量文件下载器」
需求描述:
做一个简单的批量文件下载模拟器,可以并发下载多个「文件」,并显示总耗时。
功能点:
1. 支持配置 5-10 个文件的下载任务
2. 每个任务模拟不同的下载延迟
3. 显示最终统计:总耗时、下载成功的数量
加分项:
1. 支持中断功能(某个任务失败后不影响其他任务)
2. 显示每个任务的完成顺序
验收标准:
- 能跑起来
- 输出显示总耗时(应该远小于串行执行的时间)
- 代码有注释
📚 总结 + 资源
本文学到的 3 个核心点:
1. 事件循环是调度员——它决定哪个任务先执行,哪个后执行
2. await 让出控制权——等待时不阻塞,可以切换到其他任务
3. asyncio.gather() 实现并发——多个任务同时执行,总耗时取最长的那个
延伸学习资源:
- Python 官方文档 - asyncio(英文)
- 《流畅的 Python》第 17 章 - 异步编程
- 视频:Corey Schafer 的 Asyncio 教程(B站有搬运)
互动钩子:
「你有没有被事件循环坑过?比如代码明明在前面,结果却后面才执行?在评论区说说你的经历,老粉优先回复!」
「下一章我们要学的是:综合实战——并发请求 + 限流。想象一下,你要同时发 100 个请求,但服务器限制每秒只能发 10 个,这时候怎么破?下一章揭晓答案。」

评论(0)