第9章 9.3 异步编程:让程序同时做多件事

上章回顾:上一章我们用 Laravel 快速搭建了一个博客页面,看到了「请求→路由→控制器→视图」这个经典流程。现在你手里有了一个能处理网页的框架。

本章目标:今天我们要学的东西,能让你的程序一边下载文件一边回复用户请求,一边查询数据库一边处理其他任务——这就是异步编程的威力。


🎯 开场 3 分钟:为什么要学这个?

普通人点外卖的经历

想象你去一家餐厅:
- 同步方式:你点了一个菜,等它做完,再点第二个。等餐时你只能干坐着。
- 异步方式:你点了6个菜,服务员说「做好了叫您」,你可以玩手机、和朋友聊天,菜好了叫你。

这就是同步和异步的区别。同步是排队干活,异步是多任务并行。

程序里的真实场景

你一定遇到过这些痛苦:

  • 💔 「我调用了一个 API,页面卡了 3 秒才加载完」
  • 💔 「下载 10 个文件,等了一个小时」
  • 💔 「用户发了一条消息,我的程序就卡住了,后面的消息全在排队」

学完这章,你能让程序\n\nSimple tech illustration expla\n\nAI comic creation scene, creat\n\n同时做很多事,不用再傻等着。


🧱 基础 25 分钟:核心概念

1. 协程:可以暂停和恢复的函数

生活类比:想象你是个厨师,你在做红烧肉的时候,肉在炖着(等待),你可以去炒个青菜。青菜好了,再回来看看肉炖好了没。这就是协程——在等待的时候去做别的事。

为什么要用:避免浪费时间在 I/O 等待上。

最简单的代码

# Python 3.7+ 协程
import asyncio

async def cook_rice():
print("1. 开始煮饭")
await asyncio.sleep(3)  # 模拟煮饭需要3秒
print("4. 饭好了")

async def cook_dish():
print("2. 开始炒菜")
await asyncio.sleep(1)  # 模拟炒菜需要1秒
print("3. 菜好了")

async def main():
# 一起做:总耗时约3秒(不是4秒)
await asyncio.gather(cook_rice(), cook_dish())
print("5. 开饭!")

asyncio.run(main())

运行结果

1. 开始煮饭
2. 开始炒菜
3. 菜好了
4. 饭好了
5. 开饭!

这行在干嘛
- async def 定义了一个协程函数
- await 是暂停点,等着这件事完成
- asyncio.gather() 让多个任务同时跑

2. async/await:写异步代码的关键字

是什么
- async def 声明这是一个异步函数,调用它不会立即执行
- await 等待一个异步操作完成

为什么要用:用同步的写法,写异步的逻辑,不用嵌套回调。

最简代码

import asyncio

async def say_hello():
print("Hello")
await asyncio.sleep(1)  # 暂停1秒,但不会阻塞其他代码
print("World")

async def count_numbers():
for i in range(3):
    print(i)
    await asyncio.sleep(0.5)

async def main():
await asyncio.gather(
    say_hello(),
    count_numbers()
)

asyncio.run(main())

运行结果

Hello
0
1
2
World

注意asyncio.sleep() 是异步等待,不会真的阻塞程序。而 time.sleep() 会阻塞一切。

3. Task:异步任务的管理器

是什么asyncio.Task 用于单独管理一个协程的开始、取消、状态查询。

为什么要用:有时候你不想立刻 await,而是先「记下来」,等会儿再等它。

最简代码

import asyncio

async def download_file(filename):
print(f"开始下载 {filename}")
await asyncio.sleep(2)  # 模拟下载
print(f"{filename} 下载完成")
return f"{filename}.data"

async def main():
# 创建任务,但不立即执行
task1 = asyncio.create_task(download_file("电影.mp4"))
task2 = asyncio.create_task(download_file("歌曲.mp3"))

# 做别的事
print("我去泡杯茶...")
await asyncio.sleep(0.5)
print("茶泡好了")

# 等待所有任务完成
results = await asyncio.gather(task1, task2)
print(f"下载结果: {results}")

asyncio.run(main())

运行结果

开始下载 电影.mp4
开始下载 歌曲.mp3
我去泡杯茶...
茶泡好了
电影.mp4 下载完成
歌曲.mp3 下载完成
下载结果: ['电影.mp4.data', '歌曲.mp3.data']

4. 并发 vs 并行:重要区分

生活类比
- 并发:你一个人交替做多件事(边做饭边听音乐)
- 并行:多个人同时做多件事(两个人同时做饭)

Python 异步是并发,不是并行(单线程)。但对于 I/O 密集型任务,这已经很快了。

5. aiohttp:异步 HTTP 客户端

是什么:比 requests 更快的 HTTP 客户端,因为它不会阻塞。

为什么要用:同时请求多个 API,用同步方式要等很久,用异步可以同时请求。

最简代码

import asyncio
import aiohttp

async def fetch_url(session, url):
async with session.get(url) as response:
    return await response.text()

async def main():
urls = [
    "https://httpbin.org/delay/1",  # 故意延迟1秒
    "https://httpbin.org/delay/2",  # 故意延迟2秒
    "https://httpbin.org/delay/1",  # 故意延迟1秒
]

async with aiohttp.ClientSession() as session:
    # 同时发3个请求,总耗时约2秒(最长的那个)
    tasks = [fetch_url(session, url) for url in urls]
    results = await asyncio.gather(*tasks)

print(f"完成 {len(results)} 个请求")
print("响应状态:", [len(r) for r in results])

# 需要先安装:pip install aiohttp
asyncio.run(main())

这行在干嘛
- aiohttp.ClientSession() 创建会话
- session.get() 发起异步请求
- async with 确保请求结束后关闭连接


🔥 实战 35 分钟:3 个递进的小项目

项目 1:异步批量下载网页(5 分钟)

目标:同时下载 3 个网页,计算总耗时。

import asyncio
import aiohttp
import time

async def fetch_page(session, url, index):
"""下载单个页面"""
print(f"[{index}] 开始下载: {url}")
async with session.get(url) as response:
    content = await response.text()
    print(f"[{index}] 完成: {url} ({len(content)} 字符)")
    return len(content)

async def main():
urls = [
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/2", 
    "https://httpbin.org/delay/1",
]

start = time.time()

async with aiohttp.ClientSession() as session:
    tasks = [fetch_page(session, url, i) for i, url in enumerate(urls)]
    results = await asyncio.gather(*tasks)

elapsed = time.time() - start
print(f"\n总耗时: {elapsed:.2f} 秒")
print(f"总字符数: {sum(results)}")

asyncio.run(main())

预期输出

[0] 开始下载: https://httpbin.org/delay/1
[1] 开始下载: https://httpbin.org/delay/2
[2] 开始下载: https://httpbin.org/delay/1
[0] 完成: https://httpbin.org/delay/1 (321 字符)
[2] 完成: https://httpbin.org/delay/1 (321 字符)
[1] 完成: https://httpbin.org/delay/2 (321 字符)

总耗时: 2.08 秒
总字符数: 963

一句话解释:3 个请求同时跑,总耗时约等于最慢那个(2秒),而不是 3 个相加(4秒)。


项目 2:异步读取多个 CSV 文件并合并(15 分钟)

目标:模拟读取多个数据文件,统计总行数。

准备:先创建 3 个测试文件

import asyncio
import os
import csv
import time

# 创建测试 CSV 文件
for i in range(1, 4):
with open(f"data_{i}.csv", "w", newline="") as f:
    writer = csv.writer(f)
    writer.writerow(["id", "name", "score"])
    for j in range(1, 51):  # 每个文件50行
        writer.writerow([j, f"学生{j}", 80 + j % 20])

print("测试文件创建完成")

异步读取合并

import asyncio
import csv
import time

async def read_csv_file(filepath):
"""异步读取单个CSV文件"""
print(f"开始读取: {filepath}")
await asyncio.sleep(0.1)  # 模拟I/O延迟

# 同步读取文件(asyncio不改变文件I/O本身)
with open(filepath, "r", encoding="utf-8") as f:
    reader = csv.DictReader(f)
    rows = list(reader)

print(f"{filepath} 读取完成: {len(rows)} 行")
return rows

async def main():
files = ["data_1.csv", "data_2.csv", "data_3.csv"]

start = time.time()

# 并发读取所有文件
tasks = [read_csv_file(f) for f in files]
results = await asyncio.gather(*tasks)

# 合并所有行
all_rows = []
for rows in results:
    all_rows.extend(rows)

elapsed = time.time() - start

print(f"\n读取完成!共 {len(all_rows)} 行数据")
print(f"总耗时: {elapsed:.2f} 秒")

# 清理测试文件
for f in files:
    os.remove(f)

asyncio.run(main())

预期输出

开始读取: data_1.csv
开始读取: data_2.csv
开始读取: data_3.csv
data_1.csv 读取完成: 50 行
data_2.csv 读取完成: 50 行
data_3.csv 读取完成: 50 行

读取完成!共 150 行数据
总耗时: 0.11 秒

项目 3:异步待办清单管理器(15 分钟)

目标:做一个命令行待办清单,能添加、查看、删除任务,用异步方式模拟任务执行。

import asyncio
import json
import time
from datetime import datetime

class TodoManager:
def __init__(self):
    self.tasks = []

def add_task(self, name, duration=1):
    """添加任务"""
    task = {
        "id": len(self.tasks) + 1,
        "name": name,
        "duration": duration,
        "done": False,
        "created_at": datetime.now().strftime("%H:%M:%S")
    }
    self.tasks.append(task)
    return task["id"]

async def execute_task(self, task_id):
    """异步执行单个任务"""
    task = next((t for t in self.tasks if t["id"] == task_id), None)
    if not task:
        print(f"任务 {task_id} 不存在")
        return

    print(f"[{datetime.now().strftime('%H:%M:%S')}] 开始执行: {task['name']}")
    await asyncio.sleep(task["duration"])  # 模拟任务执行
    task["done"] = True
    print(f"[{datetime.now().strftime('%H:%M:%S')}] 完成: {task['name']}")

async def execute_all(self):
    """并发执行所有未完成的任务"""
    pending = [t for t in self.tasks if not t["done"]]
    if not pending:
        print("没有待执行的任务")
        return

    print(f"开始执行 {len(pending)} 个任务...\n")
    tasks = [self.execute_task(t["id"]) for t in pending]
    await asyncio.gather(*tasks)
    print("\n所有任务执行完成!")

def list_tasks(self):
    """列出所有任务"""
    print("\n=== 待办清单 ===")
    for t in self.tasks:
        status = "✓" if t["done"] else "○"
        print(f"{status} [{t['id']}] {t['name']} (预计{t['duration']}秒)")

def show_stats(self):
    """显示统计"""
    total = len(self.tasks)
    done = sum(1 for t in self.tasks if t["done"])
    print(f"\n统计: 共 {total} 个任务,已完成 {done} 个")

async def main():
manager = TodoManager()

# 添加一些任务
manager.add_task("发送邮件", 2)
manager.add_task("生成报表", 3)
manager.add_task("备份数据", 2)

manager.add_task("发送通知", 1)

manager.list_tasks()

print("\n--- 开始批量执行 ---\n")
start = time.time()

await manager.execute_all()

elapsed = time.time() - start
manager.list_tasks()
manager.show_stats()
print(f"\n总耗时: {elapsed:.2f} 秒(同步执行需要 8 秒)")

asyncio.run(main())

预期输出

=== 待办清单 ===
○ [1] 发送邮件 (预计2秒)
○ [2] 生成报表 (预计3秒)
○ [3] 备份数据 (预计2秒)
○ [4] 发送通知 (预计1秒)

--- 开始批量执行 ---

[14:30:01] 开始执行: 发送邮件
[14:30:01] 开始执行: 生成报表
[14:30:01] 开始执行: 备份数据
[14:30:01] 开始执行: 发送通知
[14:30:02] 完成: 发送通知
[14:30:03] 完成: 发送邮件
[14:30:03] 完成: 备份数据
[14:30:04] 完成: 生成报表

所有任务执行完成!

=== 待办清单 ===
✓ [1] 发送邮件 (预计2秒)
✓ [2] 生成报表 (预计3秒)
✓ [3] 备份数据 (预计2秒)
✓ [4] 发送通知 (预计1秒)

统计: 共 4 个任务,已完成 4 个

总耗时: 3.01 秒(同步执行需要 8 秒)

一句话解释:4 个任务并发执行,总耗时约等于最长的那个(3秒),而不是相加(8秒)。


💪 进阶 20 分钟:常见坑 + 性能小贴士

坑 1:忘了 await,程序不执行

# ❌ 错误:只是创建了协程对象,没有执行
async def main():
response = fetch_data()  # 忘记 await
print(response)  # 打印的是协程对象,不是结果

# ✅ 正确:一定要 await
async def main():
response = await fetch_data()
print(response)

坑 2:在同步函数里用 await

# ❌ 错误:普通函数不能用 await
def main():
await asyncio.sleep(1)  # SyntaxError

# ✅ 正确:普通函数调用异步函数
def main():
asyncio.run(async_main())  # 用 asyncio.run 启动

# ✅ 或者:把主函数也变成 async
async def main():
await async_task()

坑 3:用 time.sleep() 而不是 asyncio.sleep()

# ❌ 错误:time.sleep 会阻塞整个线程
async def main():
time.sleep(10)  # 程序卡住10秒,什么都做不了
print("done")

# ✅ 正确:用 asyncio.sleep
async def main():
await asyncio.sleep(10)  # 这10秒可以做别的事
print("done")

坑 4:Task 创建了但没保存

# ❌ 错误:任务可能不会被执行
async def main():
asyncio.create_task(background_task())  # 没有保存引用

# ✅ 正确:保存 Task 引用
async def main():
task = asyncio.create_task(background_task())
await task  # 或者用 gather 收集

坑 5:忘记了 async with 关闭资源

# ✅ 正确:使用 async with 确保资源释放
async def main():
async with aiohttp.ClientSession() as session:
    async with session.get(url) as response:
        data = await response.text()
# session 自动关闭

性能小贴士:善用 gather 批量处理

# ❌ 低效:串行执行
for url in urls:
result = await fetch(url)  # 一个一个等

# ✅ 高效:并行执行
tasks = [fetch(url) for url in urls]
results = await asyncio.gather(*tasks)  # 一起等

调试技巧:用 print 配合时间戳

import asyncio
import time

async def debug_task(name, seconds):
print(f"[{time.strftime('%H:%M:%S')}] {name} 开始")
await asyncio.sleep(seconds)
print(f"[{time.strftime('%H:%M:%S')}] {name} 结束")

async def main():
await asyncio.gather(
    debug_task("任务A", 2),
    debug_task("任务B", 1),
)

✏️ 练习题

练习 1(2 分钟):修改延迟时间

  • 输入:把项目 1 中的 3 个 URL 改成 delay/2delay/3delay/2
  • 预期输出:总耗时约 3 秒
  • 提示:哪个数字决定总耗时?

练习 2(2 分钟):添加 if 判断

  • 输入:在项目 2 中,添加一个判断,只统计分数大于 90 的学生
  • 预期输出:显示符合条件的学生数量
  • 提示int(row["score"]) > 90

练习 3(3 分钟):处理新数据

  • 输入:创建 5 个 CSV 文件,每个 100 行,统计总行数
  • 预期输出总共 500 行
  • 提示:修改 files 列表和循环范围

练习 4(3 分钟):串联项目 2 和 3

  • 输入:把项目 2 读取的 CSV 数据,用项目 3 的方式执行「统计」任务
  • 预期输出:显示统计完成
  • 提示:把 read_csv_file 改造成返回带 duration 的任务

练习 5(5 分钟):分析报错

  • 输入:运行以下代码,观察报错
import asyncio

async def main():
asyncio.sleep(1)
print("done")

main()
  • 预期输出:找到错误并修复
  • 提示:报错信息会提示缺少什么关键字

作业:做一个「异步数据采集器」

需求描述:做一个命令行工具,同时从多个「模拟 API」获取用户数据并汇总。

功能点
1. 模拟 5 个 API 端点,每个返回 10 条用户数据(用 asyncio.sleep 模拟 0.5-2 秒延迟)
2. 并发获取所有数据,计算总耗时
3. 汇总所有用户名,按字母排序后打印

加分项
1. 用 asyncio.create_task 单独管理每个任务
2. 显示进度(哪个 API 完成了)

验收标准
- 能跑起来(python main.py
- 输出包含总耗时和排序后的用户名列表
- 代码有中文注释


📚 总结

本文学到的 3 个核心点
1. async/await 让你用同步写法写异步逻辑
2. asyncio.gather() 让多个任务同时执行,大幅缩短总耗时
3. 异步适用于 I/O 密集型任务(网络请求、文件读写),不适合 CPU 密集型任务

延伸学习资源
- Python 官方文档 - asyncio
- aiohttp 官方文档
- 书籍《Python 高手之路》- 关于异步编程进阶内容

互动钩子:你在实际项目里遇到过「页面卡顿」或「下载太慢」的问题吗?当时是怎么解决的?评论区聊聊老粉优先回复!


下章预告:学会了异步编程,下一章我们要用它来做一个真实的东西——用 ThinkPHP 6 实战 CMS 系统,里面会用到异步来提升数据库查询效率。敬请期待!

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。