第3章 3.6 综合实战:并发爬虫
🎯 开场:为什么你需要一个"爬虫机器人"?
想象一下:你是一个购物达人,要在 10 个电商网站同时查找"iPhone 15 最新价格"。一个一个网站打开、等待加载、复制粘贴——这套操作重复 10 遍,手都酸了。
但如果有个机器人,同时打开 10 个网站、同时抓取价格、同时把结果整理成表格发给你呢?
这就是并发爬虫能干的事。
痛点 1:串行爬虫(一个一个来)太慢了。抓 100 个页面要 10 分钟?并发爬虫只需要 30 秒。
痛点 2:手动一个个复制粘贴容易出错,而且重复劳动太无聊。
学完这一章,你就能写出一个同时抓取多个网页内容的小工具,而且还能控制并发数量——不会因为请求太多被网站封 IP。
上一章我们学了 util.promisify 把回调函数变成 Promise,这一章我们要把这个能力用起来,解决一个真实问题:如何同时高效地抓一堆网页?
🧱 基础:async/await + 并发控制的核心概念
什么是并发?别和并行搞混了!
串行(一个接一个):就像排队买奶茶,必须等前面的人买完你才能点。
并发(同时开始,交替执行):就像用手机同时打开 10 个网页,虽然 CPU 在切换,但看起来像同时运行。
并行(真正同时):就像 10 个人同时在 10 个柜台买奶茶,需要多核 CPU。
# 举个例子:串行 vs 并发
import asyncio
import time
# 模拟一个耗时 1 秒的任务(比如下载一个网页)
def 任务(n):
time.sleep(1)
return f"任务{n}完成"
# 串行:总共需要 3 秒
start = time.time()
结果1 = 任务(1)
结果2 = 任务(2)
结果3 = 任务(3)
print(f"串行耗时: {time.time() - start:.2f}秒") # 输出: 3.00秒
async/await:让异步代码长得像同步代码
Python 的 asyncio 就像是给你的代码开了"时间暂停"的能力。await 的意思是:"等这个操作完成,但期间 CPU 可以去干别的事"。
import asyncio
# 用 async def 定义一个"可以暂停"的函数
async def say_hello():
print("Hello")
await asyncio.sleep(1) # 暂停 1 秒,CPU 可以去干别的
print("World")
# 运行它
asyncio.run(say_hello())
类比:就像你煮开水的时候(等待),可以先去切菜(做别的事),水开了再回来。
asyncio.gather:同时启动多个任务
这就是"Promise.all"的作用——一次性启动多个协程,等它们全部完成。
import asyncio
async def 抓取页面(id):
await asyncio.sleep(1) # 模拟网络请求
return f"页面{id}内容"
async def main():
# 同时启动 3 个抓取任务
results = await asyncio.gather(
抓取页面(1),
抓取页面(2),
抓取页面(3)
)
print(results) # ['页面1内容', '页面2内容', '页面3内容']
asyncio.run(main())

Semaphore:控制同时进行的数量
问题来了:同时开 100 个网页可能会被网站封 IP。怎么办?
用 Semaphore(信号量) 控制同时运行的任务数,就像餐厅里的叫号器——即使有 100 个人在等,店里同时最多只接待 10 位。
import asyncio
# 创建一个最多同时运行 2 个任务的信号量
semaphore = asyncio.Semaphore(2)
async def 限流任务(id):
async with semaphore: # 想执行?先拿号
print(f"任务{id}开始")
await asyncio.sleep(1)
print(f"任务{id}结束")
return f"任务{id}结果"
async def main():
# 启动 5 个任务,但最多同时运行 2 个
results = await asyncio.gather(*[限流任务(i) for i in range(5)])
print(results)
asyncio.run(main())
注意:输出会显示任务 0 和 1 先开始,结束后任务 2 和 3 才启动,最后是任务 4——因为同一时间只有 2 个在跑。
🔥 实战:3 个递进的小项目
项目 1:5 分钟搞定——同时获取多个网页的标题
场景:你想同时查看几个网站的标题,看看它们是否还活着。
import asyncio
import aiohttp
async def 获取网页标题(session, url):
"""抓取单个网页的标题"""
try:
async with session.get(url) as response:
html = await response.text()
# 简单粗暴地提取 <title> 标签内容
start = html.find('<title>') + 7
end = html.find('</title>')
title = html[start:end] if start > 6 and end > 0 else "无标题"
return f"{url} -> {title}"
except Exception as e:
return f"{url} -> 失败: {e}"
async def main():
urls = [
"https://www.baidu.com",
"https://www.github.com",
"https://www.python.org",
]
async with aiohttp.ClientSession() as session:
tasks = [获取网页标题(session, url) for url in urls]
results = await asyncio.gather(*tasks)
print("=" * 50)
print("抓取结果:")
for r in results:
print(r)
print("=" * 50)
if __name__ == "__main__":
asyncio.run(main())
预期输出:
==================================================
抓取结果:
https://www.baidu.com -> 百度一下,你就知道
https://www.github.com -> GitHub
https://www.python.org -> Welcome to Python.org
==================================================
一句话解释:asyncio.gather(*tasks) 一次性发起 3 个请求,谁先完成谁先返回,但最终会等全部完成。
项目 2:15 分钟——从 JSON 文件读取 URL 列表,并发抓取后存 CSV
场景:你有一份 urls.json 里面存了 20 个网页链接,要同时抓它们的内容,然后保存成 CSV。
首先创建 urls.json:
[
{"name": "百度", "url": "https://www.baidu.com"},
{"name": "知乎", "url": "https://www.zhihu.com"},
{"name": "豆瓣", "url": "https://www.douban.com"},
{"name": "微博", "url": "https://weibo.com"},
{"name": "掘金", "url": "https://juejin.cn"}
]
然后写爬虫代码:
import asyncio
import aiohttp
import csv
import json
# 控制最多同时 2 个请求(防止被封)
semaphore = asyncio.Semaphore(2)
async def 抓取单个(url_info, session):
"""带信号量控制的抓取函数"""
name = url_info["name"]
url = url_info["url"]
async with semaphore: # 拿号,号没了就等着
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
status = resp.status
# 简单取前 100 字符作为描述
text = await resp.text()
desc = text[100:200].strip() if len(text) > 100 else ""
print(f"✓ {name} ({status})")
return {"name": name, "url": url, "status": status, "desc": desc}
except asyncio.TimeoutError:
print(f"✗ {name} 超时")
return {"name": name, "url": url, "status": 0, "desc": "超时"}
except Exception as e:
print(f"✗ {name} 错误: {e}")
return {"name": name, "url": url, "status": -1, "desc": str(e)}
async def main():
# 1. 读取 URL 列表
with open("urls.json", "r", encoding="utf-8") as f:
urls = json.load(f)
print(f"共 {len(urls)} 个URL,开始并发抓取...")
# 2. 并发抓取(控制最多 2 个同时)
async with aiohttp.ClientSession() as session:
tasks = [抓取单个(u, session) for u in urls]
results = await asyncio.gather(*tasks)
# 3. 保存到 CSV
with open("结果.csv", "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=["name", "url", "status", "desc"])
writer.writeheader()
writer.writerows(results)
print(f"\n完成!结果已保存到 结果.csv")
if __name__ == "__main__":
asyncio.run(main())
预期输出:
共 5 个URL,开始并发抓取...
✓ 百度 (200)
✓ 知乎 (200)
✗ 豆瓣 错误: 403
✗ 微博 错误: 403
✓ 掘金 (200)
完成!结果已保存到 结果.csv
一句话解释:用 Semaphore(2) 控制同时最多 2 个请求,超时设置 10 秒,避免某个网站卡住导致全部卡死。
项目 3:15 分钟——做个"每日科技新闻聚合器"
场景:做一个每天早上运行的脚本,自动去几个科技博客抓最新文章标题,整理成一个简报。
import asyncio
import aiohttp
import csv
from datetime import datetime
# 新闻源配置
NEWS_SOURCES = {
"36氪": "https://36kr.com/",
"少数派": "https://sspai.com/",
"虎嗅": "https://www.huxiu.com/",
"InfoQ": "https://www.infoq.com/",
}
semaphore = asyncio.Semaphore(3) # 最多同时 3 个
async def 抓取新闻源(名称, url, session):
"""抓取单个新闻源"""
async with semaphore:
try:
headers = {"User-Agent": "Mozilla/5.0"} # 假装是浏览器
async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=15)) as resp:
if resp.status == 200:
text = await resp.text()
# 简化处理:找所有 <a> 标签的链接
count = text.count('<a ') # 粗略统计链接数量
return {
"来源": 名称,
"状态": "成功",
"文章数": count,
"时间": datetime.now().strftime("%H:%M:%S")
}
else:
return {"来源": 名称, "状态": f"HTTP {resp.status}", "文章数": 0, "时间": ""}
except Exception as e:
return {"来源": 名称, "状态": f"错误: {e}", "文章数": 0, "时间": ""}
async def 生成简报():
print("=" * 60)
print(f"📰 每日科技简报 - {datetime.now().strftime('%Y-%m-%d %H:%M')}")
print("=" * 60)
async with aiohttp.ClientSession() as session:
tasks = [抓取新闻源(名称, url, session) for 名称, url in NEWS_SOURCES.items()]
results = await asyncio.gather(*tasks)
# 打印简报
print(f"\n{'来源':<10} {'状态':<20} {'时间':<10} {'链接数':<10}")
print("-" * 50)
for r in results:
print(f"{r['来源']:<10} {r['状态']:<20} {r['时间']:<10} {r['文章数']:<10}")
# 保存到文件
filename = f"简报_{datetime.now().strftime('%Y%m%d_%H%M')}.txt"
with open(filename, "w", encoding="utf-8") as f:
f.write(f"每日科技简报 - {datetime.now().strftime('%Y-%m-%d %H:%M')}\n")
f.write("=" * 50 + "\n")
for r in results:
f.write(f"{r['来源']}: {r['状态']}\n")
print(f"\n📁 简报已保存到 {filename}")
if __name__ == "__main__":
asyncio.run(生成简报())
预期输出:
============================================================
📰 每日科技简报 - 2024-01-15 08:30
============================================================
来源 状态 时间 链接数
--------------------------------------------------
36氪 成功 08:30:01 2847
少数派 成功 08:30:02 1234
虎嗅 成功 08:30:03 892
InfoQ 成功 08:30:04 1567
📁 简报已保存到 简报_20240115_0830.txt
一句话解释:用字典存配置方便扩展,Semaphore(3) 限制并发数,gather 一次性发起所有请求。

💪 进阶:新手最容易踩的 5 个坑
坑 1:忘记 await,协程变成"假"执行
# ❌ 错误:没写 await,任务根本不会执行
async def main():
tasks = [抓取页面(url) for url in urls] # 只是创建了协程对象列表
results = await asyncio.gather(*tasks) # 这时候才开始执行!
# ✅ 正确:先 await 再 gather
async def main():
tasks = [抓取页面(url) for url in urls]
results = await asyncio.gather(*tasks)
坑 2:在非 async 函数里用 await
# ❌ 错误:普通函数里不能用 await
def 普通函数():
await asyncio.sleep(1) # SyntaxError!
# ✅ 正确:要用 async def
async def 异步函数():
await asyncio.sleep(1)
坑 3:信号量放错位置,起不到限流作用
# ❌ 错误:semaphore 在 async with 外面
async def 限流任务(id):
async with semaphore: # 正确用法
...
# ❌ 错误用法:gather 一次发起太多任务
async def main():
# 1000 个 URL,同时发 1000 个请求——可能被封!
tasks = [抓取页面(url) for url in urls] # 没有限流
await asyncio.gather(*tasks)
坑 4:不设置超时,某个网站卡住全部卡死
# ❌ 错误:没设超时,永远等下去
async with session.get(url) as resp:
...
# ✅ 正确:设置 10 秒超时
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
...
坑 5:用 time.sleep() 而不是 asyncio.sleep()
# ❌ 错误:time.sleep 会阻塞整个程序
await time.sleep(1) # 这期间什么都干不了!
# ✅ 正确:用 asyncio.sleep 才能切换任务
await asyncio.sleep(1) # 暂停当前任务,CPU 可以干别的
调试技巧:用 print 配合 task 对象名
async def 抓取页面(session, url, task_id):
print(f"[任务{task_id}] 开始抓取 {url}")
async with session.get(url) as resp:
print(f"[任务{task_id}] 收到响应 {resp.status}")
return await resp.text()
# 运行时会看到哪个任务先完成,便于排查问题
✏️ 练习题
练习 1(2 分钟):改 URL 列表
- 输入:把项目 1 中的 urls 列表改成
["https://www.bing.com", "https://www.yahoo.com"] - 预期输出:打印出这两个网站的标题
- 提示:列表里有两个元素,替换掉原来的就行
练习 2(3 分钟):加个状态判断
- 输入:在项目 1 代码里,加一个判断,如果状态码不是 200,打印 "失败"
- 预期输出:404 的网站打印 "失败"
- 提示:在
获取网页标题函数里加个 if 判断
练习 3(10 分钟):处理新的 JSON 数据
- 输入:创建一个
products.json,包含 5 个商品链接 - 预期输出:并发抓取后保存到
products_result.csv - 提示:复用项目 2 的代码结构
练习 4(15 分钟):串起两个项目
- 输入:把项目 2 的 JSON 读取 + 项目 3 的格式化输出组合起来
- 预期输出:读取 urls.json,抓取后打印成表格而不是存 CSV
- 提示:把 CSV 写入部分换成 print 格式化输出
练习 5(5 分钟):分析报错
- 输入:下面代码运行后报错
SyntaxError: 'await' outside async function - 预期输出:找出问题并修复
- 提示:哪个函数忘记加
async了?
def 获取数据():
await asyncio.sleep(1)
return "数据"
asyncio.run(获取数据())
作业:做一个「并发爬虫实战工具」
需求描述:做一个可配置的网页标题批量抓取工具,支持:
1. 从 CSV 文件读取 URL 列表(格式:name,url)
2. 并发抓取(可配置最大并发数,默认为 3)
3. 抓取结果保存到新 CSV(包含:名称、URL、状态码、标题、抓取耗时)
4. 彩色终端输出,看着更专业
功能点:
- ✅ 从 input.csv 读取 URL
- ✅ 用 Semaphore 控制并发数
- ✅ 保存到 output.csv
- ✅ 记录每个任务耗时
加分项:
- ⭐ 支持 --concurrency 命令行参数设置并发数
- ⭐ 支持 --timeout 设置超时时间
- ⭐ 显示进度条(用 tqdm)
验收标准:
- 能跑起来(python crawler.py)
- 正确读取 CSV 并输出 CSV
- 代码有注释(每段代码干啥的)
提交方式:评论区贴代码或 GitHub 链接
📚 总结 + 资源
本文学到的 3 个核心点:
1. async/await 让异步代码写得和同步一样自然
2. asyncio.gather 一次性发起多个任务,等全部完成
3. Semaphore 控制同时运行的任务数,防止被封 IP
延伸学习资源:
- Python 官方 asyncio 文档 —— 权威但有点枯燥,当字典查
- 《Python 高手之路》—— 进阶必读,讲了很多工程实践
- aiohttp 官方文档 —— 做爬虫必备的 HTTP 客户端库
互动钩子:你在爬虫实战中踩过什么坑?被封过 IP 吗?评论区聊聊,老粉优先回复!
下章预告:学会了并发抓网页,下一章我们要学HTTP 模块基础——亲手写一个最简单的 Web 服务器,理解浏览器和服务器是怎么"对话"的。敬请期待!

评论(0)