第3章 3.3 async/await 语法糖

🎯 开场:为什么你需要一个"等外卖神器"?

上完第三章 3.2,你已经学会了 Promise——一种"我先挂起,回头再来拿结果"的编程方式。

但用 Promise 写代码,你会发现一个问题:嵌套太深了

举个例子,你点外卖需要:下单 → 等出餐 → 等骑手取餐 → 等送到。这用 Promise 写出来是这样:

# Promise 版本 - 嵌套炼狱
def order_food():
place_order().then(lambda: wait_cooking()).then(lambda: wait_pickup()).then(lambda: wait_delivery())

一串 .then() 下来,你自己都看不清先执行谁了。

痛点来了:你是不是也写过这种代码?等号对齐了,但脑子对齐不了?

这一章我们要学的 async/await,就是来解决这个问题的。说白了,它就是 Promise 的"普通话"——让你用同步的写法,执行异步的代码。

学完本文,你能:
- 写出看起来像同步、实际上是异步的代码
- 用 try/catch 统一处理成功和失败
- 理解并行和串行的区别,知道什么时候该用哪个


🧱 基础:async/await 核心概念

什么是 async 函数?

想象你去奶茶店点单。店员说"稍等",给你一个叫号器(就是 async 函数),然后去忙别的了。你拿着叫号器可以干别的事,等奶茶好了,叫号器会震。

import asyncio

# async def 就是"这是一个异步函数"的声明
# 跟普通函数比,它会返回一个"叫号器"(协程对象)
async def make_coffee():
print("正在煮咖啡...")
await asyncio.sleep(2)  # 模拟耗时操作
return "☕ 咖啡好了!"

# 调用 async 函数不会立即执行,只是创建了一个协程对象
coro = make_coffee()
print(type(coro))  # <class 'coroutine'>

运行结果:

<class 'coroutine'>

一句话解释async def 声明的函数叫"协程函数",调用它会返回一个"协程对象"——一个还没开始执行的异步任务。

配图1 - 配图1


什么是 await?为什么要用它?

继续上面的例子。你拿到叫号器后,需要等奶茶真的做好才能走。await 就是"等待这个异步操作完成"的意思。

import asyncio

async def make_coffee():
print("开始煮咖啡...")
await asyncio.sleep(2)  # await = 等待这个操作完成
return "☕ 咖啡好了!"

# 必须在 async 函数里用 await
async def main():
result = await make_coffee()  # 在这儿卡住,等咖啡煮好
print(result)

# asyncio.run 是 Python 异步的入口
asyncio.run(main())

运行结果:

开始煮咖啡...
(等2秒)
☕ 咖啡好了!

一句话解释await 关键字表示"暂停在这里,等我等的东西完成了再往下走"。


为什么需要 async/await?解决什么痛点?

痛点1:Promise 嵌套太深

# Promise 版本 - 看3秒能看懂算我输
place_order().then(wait_cooking).then(wait_pickup).then(wait_delivery)
# async/await 版本 - 跟读普通文章一样顺
async def order_food():
await place_order()
await wait_cooking()
await wait_pickup()
await wait_delivery()

痛点2:错误处理太麻烦

# Promise 的错误处理 - try/catch 散落在各处
place_order()
.then(do_something)
.catch(error => console.log("出错了"))
# async/await 的错误处理 - 跟同步代码一模一样
async def main():
try:
    result = await place_order()
    await do_something(result)
except Error as e:
    print("出错了", e)

一句话总结async/await 就是让你用同步的思维写异步的代码,再也不用跟 .then().then().then() 较劲了。


并行 vs 串行:什么时候该用 await?

想象你要做两件事:煮咖啡(5分钟)和烤面包(3分钟)。

串行做法(一个个来):先煮咖啡5分钟,再烤面包3分钟,总共8分钟。

import asyncio
import time

async def make_coffee():
await asyncio.sleep(5)  # 煮咖啡
return "☕"

async def make_toast():
await asyncio.sleep(3)  # 烤面包
return "🍞"

async def serial_cooking():
start = time.time()
coffee = await make_coffee()  # 先等咖啡
toast = await make_toast()    # 再等面包
print(f"串行耗时: {time.time()-start:.1f}秒")
return coffee, toast

asyncio.run(serial_cooking())

并行做法(一起做):咖啡和面包同时开始,总共5分钟(取较长的那个)。

async def parallel_cooking():
start = time.time()
# asyncio.gather 同时启动两个任务
coffee, toast = await asyncio.gather(
    make_coffee(),
    make_toast()
)
print(f"并行耗时: {time.time()-start:.1f}秒")
return coffee, toast

asyncio.run(parallel_cooking())

运行结果对比:

串行耗时: 8.0秒
并行耗时: 5.0秒

什么时候用哪个
- 任务有依赖(必须先做完A才能做B)→ 用串行 await
- 任务互相独立(A和B可以同时做)→ 用 asyncio.gather 并行

配图2 - 配图2


🔥 实战:3个递进小项目

项目1:异步读取多个文件(5分钟)

场景:你需要一次性读取 config.json、users.json、products.json 三个配置文件。

import asyncio
import json

async def read_file(filename):
"""模拟异步读取文件(实际用 aiofiles)"""
await asyncio.sleep(0.5)  # 模拟IO延迟
with open(filename, 'r') as f:
    return f.read()

async def load_all_configs():
"""并行读取三个配置文件"""
print("开始读取配置...")

# 用 gather 同时读取,效率拉满
results = await asyncio.gather(
    read_file('config.json'),
    read_file('users.json'),
    read_file('products.json')
)

print("全部读取完成!")
return results

# 测试(用 mock 数据)
async def main():
# 模拟三个文件的内容
async def mock_read(filename):
    await asyncio.sleep(0.1)
    return f'{{"file": "{filename}"}}'

configs = await asyncio.gather(
    mock_read('config.json'),
    mock_read('users.json'),
    mock_read('products.json')
)

for i, content in enumerate(configs):
    print(f"文件{i+1}内容: {content}")

asyncio.run(main())

预期输出

开始读取配置...
全部读取完成!
文件1内容: {"file": "config.json"}
文件2内容: {"file": "users.json"}
文件3内容: {"file": "products.json"}

一句话解释asyncio.gather() 让你同时发起多个异步任务,最后一次性收结果。


项目2:异步爬取天气数据(15分钟)

场景:你需要从三个城市的天气 API 获取数据,然后计算平均温度。

import asyncio
import random

async def fetch_weather(city):
"""模拟异步请求天气API"""
print(f"正在获取 {city} 的天气...")

# 模拟网络延迟 0.5-1.5 秒
await asyncio.sleep(random.uniform(0.5, 1.5))

# 模拟返回的温度数据
temps = {
    "北京": random.randint(15, 25),
    "上海": random.randint(20, 30),
    "广州": random.randint(25, 35)
}
return city, temps.get(city, 20)

async def get_average_temperature():
"""并行获取三城市天气,计算平均温度"""
print("开始获取各地天气数据...")
start = time.time()

# 同时发起三个请求
results = await asyncio.gather(
    fetch_weather("北京"),
    fetch_weather("上海"),
    fetch_weather("广州")
)

# 计算平均温度
total = sum(temp for _, temp in results)
avg_temp = total / len(results)

print(f"\n获取完成,耗时: {time.time()-start:.1f}秒")
print(f"各地温度: {dict(results)}")
print(f"平均温度: {avg_temp:.1f}°C")

return avg_temp

import time
asyncio.run(get_average_temperature())

预期输出

开始获取各地天气数据...
正在获取 广州 的天气...
正在获取 上海 的天气...
正在获取 北京 的天气...

获取完成,耗时: 1.2秒
各地温度: {'北京': 22, '上海': 27, '广州': 31}
平均温度: 26.7°C

一句话解释:用 asyncio.gather() 并行请求三个 API,总耗时取决于最慢那个(约1.5秒),而不是三个相加。


项目3:异步批量下载图片小工具(15分钟)

场景:写一个工具,批量下载图片链接列表,保存到本地,并统计成功/失败数。

import asyncio
import random
from pathlib import Path

async def download_image(url, save_path):
"""模拟下载一张图片"""
try:
    # 模拟网络延迟
    await asyncio.sleep(random.uniform(0.3, 0.8))

    # 模拟下载成功(90%概率)
    if random.random() > 0.1:
        # 实际这里会写文件
        Path(save_path).write_text(f"图片数据: {url}")
        return True, url
    else:
        return False, url
except Exception as e:
    return False, str(e)

async def batch_download(image_list, save_dir="downloads"):
"""批量下载图片,返回成功/失败统计"""
Path(save_dir).mkdir(exist_ok=True)

print(f"开始下载 {len(image_list)} 张图片...")

# 创建所有下载任务
tasks = []
for i, url in enumerate(image_list):
    save_path = f"{save_dir}/image_{i+1}.jpg"
    tasks.append(download_image(url, save_path))

# 并行执行所有任务
results = await asyncio.gather(*tasks)

# 统计结果
successes = [url for success, url in results if success]
failures = [url for success, url in results if not success]

print(f"\n下载完成!成功: {len(successes)}, 失败: {len(failures)}")

if failures:
    print(f"失败的链接: {failures}")

return successes, failures

# 测试
if __name__ == "__main__":
test_urls = [
    "https://example.com/img1.jpg",
    "https://example.com/img2.jpg",
    "https://example.com/img3.jpg",
    "https://example.com/img4.jpg",
    "https://example.com/img5.jpg",
]

asyncio.run(batch_download(test_urls))

预期输出

开始下载 5 张图片...

下载完成!成功: 5, 失败: 0

(或者有概率出现失败,输出类似 失败的链接: ['https://example.com/img3.jpg']

一句话解释:用 asyncio.gather(*tasks) 解包任务列表,并行下载所有图片,最后统一统计。


💪 进阶:常见坑 + 性能小贴士

坑1:忘了加 async 导致函数变成同步

# ❌ 错误写法 - 以为会异步执行,实际是同步
def fetch_data():
await asyncio.sleep(2)  # SyntaxError! 同步函数里不能用 await
return "data"
# ✅ 正确写法 - 声明 + 调用都要对
async def fetch_data():
await asyncio.sleep(2)
return "data"

一句话:记住 await 只能在 async def 里面用。


坑2:串行 await 写成并行性能反而差

# ❌ 错误写法 - 两个独立任务串行执行,白白多等2秒
async def bad_example():
result1 = await task1()  # 等1秒
result2 = await task2()  # 再等1秒
# ✅ 正确写法 - 两个独立任务并行执行
async def good_example():
result1, result2 = await asyncio.gather(task1(), task2())  # 同时等,总共1秒

一句话:独立任务用 gather() 并行,别一个个 await


坑3:asyncio.run() 只在顶层用一次

# ❌ 错误写法 - 嵌套调用 run
async def inner():
asyncio.run(outer())  # 报错!event loop 已经运行了
# ✅ 正确写法 - 只在最外层调用 run
async def main():
await inner()
await outer()

asyncio.run(main())  # 只在这里调用一次

一句话asyncio.run() 是入口,整个程序只应该调用一次。


坑4:忘记 await 导致拿到的是协程对象

# ❌ 错误写法 - 没 await,只拿到了"纸条"不是结果
async def get_data():
return "真实数据"

async def main():
result = get_data()  # 忘加 await
print(type(result))  # <class 'coroutine'>
print(result)        # <coroutine object at 0x...>
# ✅ 正确写法
async def main():
result = await get_data()  # 加了 await
print(type(result))  # <class 'str'>
print(result)        # 真实数据

一句话:调用协程函数不加 await,你拿到的是"未完成的任务"不是结果。


坑5:异步函数里调用同步阻塞代码

# ❌ 错误写法 - time.sleep 是阻塞的,整个事件循环被卡住
async def bad_fetch():
time.sleep(2)  # 阻塞!其他任务都动不了
return "done"
# ✅ 正确写法 - 用异步睡眠,不阻塞其他任务
async def good_fetch():
await asyncio.sleep(2)  # 非阻塞!事件循环可以干别的
return "done"

一句话:异步代码里遇到耗时操作,用 await asyncio.sleep() 而不是 time.sleep()


性能小贴士:连接复用

如果你的异步任务需要频繁请求同一个服务器,用 asyncio.Semaphore 限制并发数,避免被服务器封禁:

import asyncio

# 限制同时最多5个请求
semaphore = asyncio.Semaphore(5)

async def fetch_with_limit(url):
async with semaphore:
    # 这里写你的请求逻辑
    await asyncio.sleep(0.1)
    return f"Got: {url}"

async def main():
urls = [f"https://example.com/item{i}" for i in range(20)]
results = await asyncio.gather(*[fetch_with_limit(url) for url in urls])
print(f"完成 {len(results)} 个请求")

asyncio.run(main())

调试技巧:用 print 打日志

async def debug_demo():
print("1. 开始执行")
result = await asyncio.sleep(1)
print("2. 等待结束")
return "done"

# 简单场景用 print 够用了
asyncio.run(debug_demo())

对于复杂场景,可以用 logging

import asyncio
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')
logger = logging.getLogger(__name__)

async def logged_task(name):
logger.info(f"{name} 开始")
await asyncio.sleep(1)
logger.info(f"{name} 完成")

asyncio.run(logged_task("任务A"))

✏️ 练习题

练习1(2分钟):基础改写
- 输入:await asyncio.sleep(3)
- 预期输出:把 3 改成 1,重新运行,观察耗时变化
- 提示:asyncio.sleep(n) 的 n 表示秒数

练习2(3分钟):加个判断
- 输入:在项目1的代码里,加一个 if len(configs) > 2 的判断
- 预期输出:当文件数大于2时打印 "文件较多"
- 提示:判断加在 for 循环之前

练习3(5分钟):换个数据源
- 输入:用项目2的方法,爬取 "深圳"、"成都"、"武汉" 的天气
- 预期输出:显示三城市温度和平均温度
- 提示:修改 temps 字典和 fetch_weather 的调用参数

练习4(8分钟):串接两个项目
- 输入:用项目2获取天气数据 + 项目3的方式下载天气图标图片
- 预期输出:获取天气 + 并行下载3张占位图
- 提示:天气数据里的城市名可以作为下载任务的标识

练习5(5分钟):分析报错
- 输入:运行以下代码,分析为什么会报错

async def main():
result = fetch_data()  # 假设 fetch_data 是上面的异步函数
print(result)

main()
  • 预期输出:RuntimeError: asyncio.run() cannot be called from a running event loop
  • 提示:检查是否在异步环境里调用了异步函数

作业:做一个「异步批量数据处理工具」

  • 需求描述:做一个命令行工具,输入是一个 JSON 格式的用户列表,异步获取每个用户的"积分",然后按积分从高到低排序输出。

  • 功能点
    1. 从 users.json 文件读取用户列表(至少5个用户)
    2. 用异步模拟"获取用户积分"(每个用户耗时0.5-1秒随机)
    3. 并行获取所有用户积分
    4. 按积分从高到低排序并输出

  • 加分项
    1. 显示总耗时
    2. 用 try/catch 处理"获取积分失败"的情况(模拟10%失败率)

  • 验收标准

  • 能跑起来(python main.py
  • 输出格式正确(用户名 + 积分,按积分排序)
  • 代码有注释

  • 提交方式:评论区贴代码或 GitHub 链接


📚 总结 + 资源

本文学了3个核心点
1. async def 声明异步函数,await 等待异步操作完成
2. asyncio.gather() 让独立任务并行执行,省时间
3. try/catch 在 async/await 里的用法跟同步代码一模一样

延伸学习资源

  1. Python 官方文档 - asyncio (最权威,但有点枯燥)
  2. 《流畅的Python》第17章「异步编程」 (进阶必读)
  3. Real Python - AsyncIO 教程 (实战导向,例子丰富)

互动钩子:你在实际项目里用过 async/await 吗?遇到过什么坑?评论区聊聊,老粉优先回复!也别忘了做作业,下一章我们要讲「事件循环 Event Loop 原理」,学了它你才能真正理解 async/await 底层是怎么工作的。

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。