第9章 9.3 异步编程:让程序同时做多件事
上章回顾:上一章我们用 Laravel 快速搭建了一个博客页面,看到了「请求→路由→控制器→视图」这个经典流程。现在你手里有了一个能处理网页的框架。
本章目标:今天我们要学的东西,能让你的程序一边下载文件一边回复用户请求,一边查询数据库一边处理其他任务——这就是异步编程的威力。
🎯 开场 3 分钟:为什么要学这个?
普通人点外卖的经历
想象你去一家餐厅:
- 同步方式:你点了一个菜,等它做完,再点第二个。等餐时你只能干坐着。
- 异步方式:你点了6个菜,服务员说「做好了叫您」,你可以玩手机、和朋友聊天,菜好了叫你。
这就是同步和异步的区别。同步是排队干活,异步是多任务并行。
程序里的真实场景
你一定遇到过这些痛苦:
- 💔 「我调用了一个 API,页面卡了 3 秒才加载完」
- 💔 「下载 10 个文件,等了一个小时」
- 💔 「用户发了一条消息,我的程序就卡住了,后面的消息全在排队」
学完这章,你能让程序\n\n
\n\n
\n\n同时做很多事,不用再傻等着。
🧱 基础 25 分钟:核心概念
1. 协程:可以暂停和恢复的函数
生活类比:想象你是个厨师,你在做红烧肉的时候,肉在炖着(等待),你可以去炒个青菜。青菜好了,再回来看看肉炖好了没。这就是协程——在等待的时候去做别的事。
为什么要用:避免浪费时间在 I/O 等待上。
最简单的代码:
# Python 3.7+ 协程
import asyncio
async def cook_rice():
print("1. 开始煮饭")
await asyncio.sleep(3) # 模拟煮饭需要3秒
print("4. 饭好了")
async def cook_dish():
print("2. 开始炒菜")
await asyncio.sleep(1) # 模拟炒菜需要1秒
print("3. 菜好了")
async def main():
# 一起做:总耗时约3秒(不是4秒)
await asyncio.gather(cook_rice(), cook_dish())
print("5. 开饭!")
asyncio.run(main())
运行结果:
1. 开始煮饭
2. 开始炒菜
3. 菜好了
4. 饭好了
5. 开饭!
这行在干嘛:
- async def 定义了一个协程函数
- await 是暂停点,等着这件事完成
- asyncio.gather() 让多个任务同时跑
2. async/await:写异步代码的关键字
是什么:
- async def 声明这是一个异步函数,调用它不会立即执行
- await 等待一个异步操作完成
为什么要用:用同步的写法,写异步的逻辑,不用嵌套回调。
最简代码:
import asyncio
async def say_hello():
print("Hello")
await asyncio.sleep(1) # 暂停1秒,但不会阻塞其他代码
print("World")
async def count_numbers():
for i in range(3):
print(i)
await asyncio.sleep(0.5)
async def main():
await asyncio.gather(
say_hello(),
count_numbers()
)
asyncio.run(main())
运行结果:
Hello
0
1
2
World
注意:asyncio.sleep() 是异步等待,不会真的阻塞程序。而 time.sleep() 会阻塞一切。
3. Task:异步任务的管理器
是什么:asyncio.Task 用于单独管理一个协程的开始、取消、状态查询。
为什么要用:有时候你不想立刻 await,而是先「记下来」,等会儿再等它。
最简代码:
import asyncio
async def download_file(filename):
print(f"开始下载 {filename}")
await asyncio.sleep(2) # 模拟下载
print(f"{filename} 下载完成")
return f"{filename}.data"
async def main():
# 创建任务,但不立即执行
task1 = asyncio.create_task(download_file("电影.mp4"))
task2 = asyncio.create_task(download_file("歌曲.mp3"))
# 做别的事
print("我去泡杯茶...")
await asyncio.sleep(0.5)
print("茶泡好了")
# 等待所有任务完成
results = await asyncio.gather(task1, task2)
print(f"下载结果: {results}")
asyncio.run(main())
运行结果:
开始下载 电影.mp4
开始下载 歌曲.mp3
我去泡杯茶...
茶泡好了
电影.mp4 下载完成
歌曲.mp3 下载完成
下载结果: ['电影.mp4.data', '歌曲.mp3.data']
4. 并发 vs 并行:重要区分
生活类比:
- 并发:你一个人交替做多件事(边做饭边听音乐)
- 并行:多个人同时做多件事(两个人同时做饭)
Python 异步是并发,不是并行(单线程)。但对于 I/O 密集型任务,这已经很快了。
5. aiohttp:异步 HTTP 客户端
是什么:比 requests 更快的 HTTP 客户端,因为它不会阻塞。
为什么要用:同时请求多个 API,用同步方式要等很久,用异步可以同时请求。
最简代码:
import asyncio
import aiohttp
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = [
"https://httpbin.org/delay/1", # 故意延迟1秒
"https://httpbin.org/delay/2", # 故意延迟2秒
"https://httpbin.org/delay/1", # 故意延迟1秒
]
async with aiohttp.ClientSession() as session:
# 同时发3个请求,总耗时约2秒(最长的那个)
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
print(f"完成 {len(results)} 个请求")
print("响应状态:", [len(r) for r in results])
# 需要先安装:pip install aiohttp
asyncio.run(main())
这行在干嘛:
- aiohttp.ClientSession() 创建会话
- session.get() 发起异步请求
- async with 确保请求结束后关闭连接
🔥 实战 35 分钟:3 个递进的小项目
项目 1:异步批量下载网页(5 分钟)
目标:同时下载 3 个网页,计算总耗时。
import asyncio
import aiohttp
import time
async def fetch_page(session, url, index):
"""下载单个页面"""
print(f"[{index}] 开始下载: {url}")
async with session.get(url) as response:
content = await response.text()
print(f"[{index}] 完成: {url} ({len(content)} 字符)")
return len(content)
async def main():
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/1",
]
start = time.time()
async with aiohttp.ClientSession() as session:
tasks = [fetch_page(session, url, i) for i, url in enumerate(urls)]
results = await asyncio.gather(*tasks)
elapsed = time.time() - start
print(f"\n总耗时: {elapsed:.2f} 秒")
print(f"总字符数: {sum(results)}")
asyncio.run(main())
预期输出:
[0] 开始下载: https://httpbin.org/delay/1
[1] 开始下载: https://httpbin.org/delay/2
[2] 开始下载: https://httpbin.org/delay/1
[0] 完成: https://httpbin.org/delay/1 (321 字符)
[2] 完成: https://httpbin.org/delay/1 (321 字符)
[1] 完成: https://httpbin.org/delay/2 (321 字符)
总耗时: 2.08 秒
总字符数: 963
一句话解释:3 个请求同时跑,总耗时约等于最慢那个(2秒),而不是 3 个相加(4秒)。
项目 2:异步读取多个 CSV 文件并合并(15 分钟)
目标:模拟读取多个数据文件,统计总行数。
准备:先创建 3 个测试文件
import asyncio
import os
import csv
import time
# 创建测试 CSV 文件
for i in range(1, 4):
with open(f"data_{i}.csv", "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["id", "name", "score"])
for j in range(1, 51): # 每个文件50行
writer.writerow([j, f"学生{j}", 80 + j % 20])
print("测试文件创建完成")
异步读取合并:
import asyncio
import csv
import time
async def read_csv_file(filepath):
"""异步读取单个CSV文件"""
print(f"开始读取: {filepath}")
await asyncio.sleep(0.1) # 模拟I/O延迟
# 同步读取文件(asyncio不改变文件I/O本身)
with open(filepath, "r", encoding="utf-8") as f:
reader = csv.DictReader(f)
rows = list(reader)
print(f"{filepath} 读取完成: {len(rows)} 行")
return rows
async def main():
files = ["data_1.csv", "data_2.csv", "data_3.csv"]
start = time.time()
# 并发读取所有文件
tasks = [read_csv_file(f) for f in files]
results = await asyncio.gather(*tasks)
# 合并所有行
all_rows = []
for rows in results:
all_rows.extend(rows)
elapsed = time.time() - start
print(f"\n读取完成!共 {len(all_rows)} 行数据")
print(f"总耗时: {elapsed:.2f} 秒")
# 清理测试文件
for f in files:
os.remove(f)
asyncio.run(main())
预期输出:
开始读取: data_1.csv
开始读取: data_2.csv
开始读取: data_3.csv
data_1.csv 读取完成: 50 行
data_2.csv 读取完成: 50 行
data_3.csv 读取完成: 50 行
读取完成!共 150 行数据
总耗时: 0.11 秒
项目 3:异步待办清单管理器(15 分钟)
目标:做一个命令行待办清单,能添加、查看、删除任务,用异步方式模拟任务执行。
import asyncio
import json
import time
from datetime import datetime
class TodoManager:
def __init__(self):
self.tasks = []
def add_task(self, name, duration=1):
"""添加任务"""
task = {
"id": len(self.tasks) + 1,
"name": name,
"duration": duration,
"done": False,
"created_at": datetime.now().strftime("%H:%M:%S")
}
self.tasks.append(task)
return task["id"]
async def execute_task(self, task_id):
"""异步执行单个任务"""
task = next((t for t in self.tasks if t["id"] == task_id), None)
if not task:
print(f"任务 {task_id} 不存在")
return
print(f"[{datetime.now().strftime('%H:%M:%S')}] 开始执行: {task['name']}")
await asyncio.sleep(task["duration"]) # 模拟任务执行
task["done"] = True
print(f"[{datetime.now().strftime('%H:%M:%S')}] 完成: {task['name']}")
async def execute_all(self):
"""并发执行所有未完成的任务"""
pending = [t for t in self.tasks if not t["done"]]
if not pending:
print("没有待执行的任务")
return
print(f"开始执行 {len(pending)} 个任务...\n")
tasks = [self.execute_task(t["id"]) for t in pending]
await asyncio.gather(*tasks)
print("\n所有任务执行完成!")
def list_tasks(self):
"""列出所有任务"""
print("\n=== 待办清单 ===")
for t in self.tasks:
status = "✓" if t["done"] else "○"
print(f"{status} [{t['id']}] {t['name']} (预计{t['duration']}秒)")
def show_stats(self):
"""显示统计"""
total = len(self.tasks)
done = sum(1 for t in self.tasks if t["done"])
print(f"\n统计: 共 {total} 个任务,已完成 {done} 个")
async def main():
manager = TodoManager()
# 添加一些任务
manager.add_task("发送邮件", 2)
manager.add_task("生成报表", 3)
manager.add_task("备份数据", 2)
manager.add_task("发送通知", 1)
manager.list_tasks()
print("\n--- 开始批量执行 ---\n")
start = time.time()
await manager.execute_all()
elapsed = time.time() - start
manager.list_tasks()
manager.show_stats()
print(f"\n总耗时: {elapsed:.2f} 秒(同步执行需要 8 秒)")
asyncio.run(main())
预期输出:
=== 待办清单 ===
○ [1] 发送邮件 (预计2秒)
○ [2] 生成报表 (预计3秒)
○ [3] 备份数据 (预计2秒)
○ [4] 发送通知 (预计1秒)
--- 开始批量执行 ---
[14:30:01] 开始执行: 发送邮件
[14:30:01] 开始执行: 生成报表
[14:30:01] 开始执行: 备份数据
[14:30:01] 开始执行: 发送通知
[14:30:02] 完成: 发送通知
[14:30:03] 完成: 发送邮件
[14:30:03] 完成: 备份数据
[14:30:04] 完成: 生成报表
所有任务执行完成!
=== 待办清单 ===
✓ [1] 发送邮件 (预计2秒)
✓ [2] 生成报表 (预计3秒)
✓ [3] 备份数据 (预计2秒)
✓ [4] 发送通知 (预计1秒)
统计: 共 4 个任务,已完成 4 个
总耗时: 3.01 秒(同步执行需要 8 秒)
一句话解释:4 个任务并发执行,总耗时约等于最长的那个(3秒),而不是相加(8秒)。
💪 进阶 20 分钟:常见坑 + 性能小贴士
坑 1:忘了 await,程序不执行
# ❌ 错误:只是创建了协程对象,没有执行
async def main():
response = fetch_data() # 忘记 await
print(response) # 打印的是协程对象,不是结果
# ✅ 正确:一定要 await
async def main():
response = await fetch_data()
print(response)
坑 2:在同步函数里用 await
# ❌ 错误:普通函数不能用 await
def main():
await asyncio.sleep(1) # SyntaxError
# ✅ 正确:普通函数调用异步函数
def main():
asyncio.run(async_main()) # 用 asyncio.run 启动
# ✅ 或者:把主函数也变成 async
async def main():
await async_task()
坑 3:用 time.sleep() 而不是 asyncio.sleep()
# ❌ 错误:time.sleep 会阻塞整个线程
async def main():
time.sleep(10) # 程序卡住10秒,什么都做不了
print("done")
# ✅ 正确:用 asyncio.sleep
async def main():
await asyncio.sleep(10) # 这10秒可以做别的事
print("done")
坑 4:Task 创建了但没保存
# ❌ 错误:任务可能不会被执行
async def main():
asyncio.create_task(background_task()) # 没有保存引用
# ✅ 正确:保存 Task 引用
async def main():
task = asyncio.create_task(background_task())
await task # 或者用 gather 收集
坑 5:忘记了 async with 关闭资源
# ✅ 正确:使用 async with 确保资源释放
async def main():
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
data = await response.text()
# session 自动关闭
性能小贴士:善用 gather 批量处理
# ❌ 低效:串行执行
for url in urls:
result = await fetch(url) # 一个一个等
# ✅ 高效:并行执行
tasks = [fetch(url) for url in urls]
results = await asyncio.gather(*tasks) # 一起等
调试技巧:用 print 配合时间戳
import asyncio
import time
async def debug_task(name, seconds):
print(f"[{time.strftime('%H:%M:%S')}] {name} 开始")
await asyncio.sleep(seconds)
print(f"[{time.strftime('%H:%M:%S')}] {name} 结束")
async def main():
await asyncio.gather(
debug_task("任务A", 2),
debug_task("任务B", 1),
)
✏️ 练习题
练习 1(2 分钟):修改延迟时间
- 输入:把项目 1 中的 3 个 URL 改成
delay/2、delay/3、delay/2 - 预期输出:总耗时约 3 秒
- 提示:哪个数字决定总耗时?
练习 2(2 分钟):添加 if 判断
- 输入:在项目 2 中,添加一个判断,只统计分数大于 90 的学生
- 预期输出:显示符合条件的学生数量
- 提示:
int(row["score"]) > 90
练习 3(3 分钟):处理新数据
- 输入:创建 5 个 CSV 文件,每个 100 行,统计总行数
- 预期输出:
总共 500 行 - 提示:修改 files 列表和循环范围
练习 4(3 分钟):串联项目 2 和 3
- 输入:把项目 2 读取的 CSV 数据,用项目 3 的方式执行「统计」任务
- 预期输出:显示统计完成
- 提示:把
read_csv_file改造成返回带 duration 的任务
练习 5(5 分钟):分析报错
- 输入:运行以下代码,观察报错
import asyncio
async def main():
asyncio.sleep(1)
print("done")
main()
- 预期输出:找到错误并修复
- 提示:报错信息会提示缺少什么关键字
作业:做一个「异步数据采集器」
需求描述:做一个命令行工具,同时从多个「模拟 API」获取用户数据并汇总。
功能点:
1. 模拟 5 个 API 端点,每个返回 10 条用户数据(用 asyncio.sleep 模拟 0.5-2 秒延迟)
2. 并发获取所有数据,计算总耗时
3. 汇总所有用户名,按字母排序后打印
加分项:
1. 用 asyncio.create_task 单独管理每个任务
2. 显示进度(哪个 API 完成了)
验收标准:
- 能跑起来(python main.py)
- 输出包含总耗时和排序后的用户名列表
- 代码有中文注释
📚 总结
本文学到的 3 个核心点:
1. async/await 让你用同步写法写异步逻辑
2. asyncio.gather() 让多个任务同时执行,大幅缩短总耗时
3. 异步适用于 I/O 密集型任务(网络请求、文件读写),不适合 CPU 密集型任务
延伸学习资源:
- Python 官方文档 - asyncio
- aiohttp 官方文档
- 书籍《Python 高手之路》- 关于异步编程进阶内容
互动钩子:你在实际项目里遇到过「页面卡顿」或「下载太慢」的问题吗?当时是怎么解决的?评论区聊聊老粉优先回复!
下章预告:学会了异步编程,下一章我们要用它来做一个真实的东西——用 ThinkPHP 6 实战 CMS 系统,里面会用到异步来提升数据库查询效率。敬请期待!

评论(0)