第3章 3.6 综合实战:并发爬虫

🎯 开场:为什么你需要一个"爬虫机器人"?

想象一下:你是一个购物达人,要在 10 个电商网站同时查找"iPhone 15 最新价格"。一个一个网站打开、等待加载、复制粘贴——这套操作重复 10 遍,手都酸了。

但如果有个机器人,同时打开 10 个网站、同时抓取价格、同时把结果整理成表格发给你呢?

这就是并发爬虫能干的事。

痛点 1:串行爬虫(一个一个来)太慢了。抓 100 个页面要 10 分钟?并发爬虫只需要 30 秒。

痛点 2:手动一个个复制粘贴容易出错,而且重复劳动太无聊。

学完这一章,你就能写出一个同时抓取多个网页内容的小工具,而且还能控制并发数量——不会因为请求太多被网站封 IP。

上一章我们学了 util.promisify 把回调函数变成 Promise,这一章我们要把这个能力用起来,解决一个真实问题:如何同时高效地抓一堆网页?


🧱 基础:async/await + 并发控制的核心概念

什么是并发?别和并行搞混了!

串行(一个接一个):就像排队买奶茶,必须等前面的人买完你才能点。

并发(同时开始,交替执行):就像用手机同时打开 10 个网页,虽然 CPU 在切换,但看起来像同时运行。

并行(真正同时):就像 10 个人同时在 10 个柜台买奶茶,需要多核 CPU。

# 举个例子:串行 vs 并发
import asyncio
import time

# 模拟一个耗时 1 秒的任务(比如下载一个网页)
def 任务(n):
time.sleep(1)
return f"任务{n}完成"

# 串行:总共需要 3 秒
start = time.time()
结果1 = 任务(1)
结果2 = 任务(2)
结果3 = 任务(3)
print(f"串行耗时: {time.time() - start:.2f}秒")  # 输出: 3.00秒

async/await:让异步代码长得像同步代码

Python 的 asyncio 就像是给你的代码开了"时间暂停"的能力。await 的意思是:"等这个操作完成,但期间 CPU 可以去干别的事"。

import asyncio

# 用 async def 定义一个"可以暂停"的函数
async def say_hello():
print("Hello")
await asyncio.sleep(1)  # 暂停 1 秒,CPU 可以去干别的
print("World")

# 运行它
asyncio.run(say_hello())

类比:就像你煮开水的时候(等待),可以先去切菜(做别的事),水开了再回来。

asyncio.gather:同时启动多个任务

这就是"Promise.all"的作用——一次性启动多个协程,等它们全部完成

import asyncio

async def 抓取页面(id):
await asyncio.sleep(1)  # 模拟网络请求
return f"页面{id}内容"

async def main():
# 同时启动 3 个抓取任务
results = await asyncio.gather(
    抓取页面(1),
    抓取页面(2),
    抓取页面(3)
)
print(results)  # ['页面1内容', '页面2内容', '页面3内容']

asyncio.run(main())

配图1 - 配图1

Semaphore:控制同时进行的数量

问题来了:同时开 100 个网页可能会被网站封 IP。怎么办?

Semaphore(信号量) 控制同时运行的任务数,就像餐厅里的叫号器——即使有 100 个人在等,店里同时最多只接待 10 位。

import asyncio

# 创建一个最多同时运行 2 个任务的信号量
semaphore = asyncio.Semaphore(2)

async def 限流任务(id):
async with semaphore:  # 想执行?先拿号
    print(f"任务{id}开始")
    await asyncio.sleep(1)
    print(f"任务{id}结束")
return f"任务{id}结果"

async def main():
# 启动 5 个任务,但最多同时运行 2 个
results = await asyncio.gather(*[限流任务(i) for i in range(5)])
print(results)

asyncio.run(main())

注意:输出会显示任务 0 和 1 先开始,结束后任务 2 和 3 才启动,最后是任务 4——因为同一时间只有 2 个在跑。


🔥 实战:3 个递进的小项目

项目 1:5 分钟搞定——同时获取多个网页的标题

场景:你想同时查看几个网站的标题,看看它们是否还活着。

import asyncio
import aiohttp

async def 获取网页标题(session, url):
"""抓取单个网页的标题"""
try:
    async with session.get(url) as response:
        html = await response.text()
        # 简单粗暴地提取 <title> 标签内容
        start = html.find('<title>') + 7
        end = html.find('</title>')
        title = html[start:end] if start > 6 and end > 0 else "无标题"
        return f"{url} -> {title}"
except Exception as e:
    return f"{url} -> 失败: {e}"

async def main():
urls = [
    "https://www.baidu.com",
    "https://www.github.com",
    "https://www.python.org",
]

async with aiohttp.ClientSession() as session:
    tasks = [获取网页标题(session, url) for url in urls]
    results = await asyncio.gather(*tasks)

    print("=" * 50)
    print("抓取结果:")
    for r in results:
        print(r)
    print("=" * 50)

if __name__ == "__main__":
asyncio.run(main())

预期输出

==================================================
抓取结果:
https://www.baidu.com -> 百度一下,你就知道
https://www.github.com -> GitHub
https://www.python.org -> Welcome to Python.org
==================================================

一句话解释asyncio.gather(*tasks) 一次性发起 3 个请求,谁先完成谁先返回,但最终会等全部完成。


项目 2:15 分钟——从 JSON 文件读取 URL 列表,并发抓取后存 CSV

场景:你有一份 urls.json 里面存了 20 个网页链接,要同时抓它们的内容,然后保存成 CSV。

首先创建 urls.json

[
{"name": "百度", "url": "https://www.baidu.com"},
{"name": "知乎", "url": "https://www.zhihu.com"},
{"name": "豆瓣", "url": "https://www.douban.com"},
{"name": "微博", "url": "https://weibo.com"},
{"name": "掘金", "url": "https://juejin.cn"}
]

然后写爬虫代码:

import asyncio
import aiohttp
import csv
import json

# 控制最多同时 2 个请求(防止被封)
semaphore = asyncio.Semaphore(2)

async def 抓取单个(url_info, session):
"""带信号量控制的抓取函数"""
name = url_info["name"]
url = url_info["url"]


async with semaphore:  # 拿号,号没了就等着
    try:
        async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
            status = resp.status
            # 简单取前 100 字符作为描述
            text = await resp.text()
            desc = text[100:200].strip() if len(text) > 100 else ""
            print(f"✓ {name} ({status})")
            return {"name": name, "url": url, "status": status, "desc": desc}
    except asyncio.TimeoutError:
        print(f"✗ {name} 超时")
        return {"name": name, "url": url, "status": 0, "desc": "超时"}
    except Exception as e:
        print(f"✗ {name} 错误: {e}")
        return {"name": name, "url": url, "status": -1, "desc": str(e)}

async def main():
# 1. 读取 URL 列表
with open("urls.json", "r", encoding="utf-8") as f:
    urls = json.load(f)

print(f"共 {len(urls)} 个URL,开始并发抓取...")

# 2. 并发抓取(控制最多 2 个同时)
async with aiohttp.ClientSession() as session:
    tasks = [抓取单个(u, session) for u in urls]
    results = await asyncio.gather(*tasks)

# 3. 保存到 CSV
with open("结果.csv", "w", newline="", encoding="utf-8") as f:
    writer = csv.DictWriter(f, fieldnames=["name", "url", "status", "desc"])
    writer.writeheader()
    writer.writerows(results)

print(f"\n完成!结果已保存到 结果.csv")

if __name__ == "__main__":
asyncio.run(main())

预期输出

共 5 个URL,开始并发抓取...
✓ 百度 (200)
✓ 知乎 (200)
✗ 豆瓣 错误: 403
✗ 微博 错误: 403
✓ 掘金 (200)

完成!结果已保存到 结果.csv

一句话解释:用 Semaphore(2) 控制同时最多 2 个请求,超时设置 10 秒,避免某个网站卡住导致全部卡死。


项目 3:15 分钟——做个"每日科技新闻聚合器"

场景:做一个每天早上运行的脚本,自动去几个科技博客抓最新文章标题,整理成一个简报。

import asyncio
import aiohttp
import csv
from datetime import datetime

# 新闻源配置
NEWS_SOURCES = {
"36氪": "https://36kr.com/",
"少数派": "https://sspai.com/",
"虎嗅": "https://www.huxiu.com/",
"InfoQ": "https://www.infoq.com/",
}

semaphore = asyncio.Semaphore(3)  # 最多同时 3 个

async def 抓取新闻源(名称, url, session):
"""抓取单个新闻源"""
async with semaphore:
    try:
        headers = {"User-Agent": "Mozilla/5.0"}  # 假装是浏览器
        async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=15)) as resp:
            if resp.status == 200:
                text = await resp.text()
                # 简化处理:找所有 <a> 标签的链接
                count = text.count('<a ')  # 粗略统计链接数量
                return {
                    "来源": 名称,
                    "状态": "成功",
                    "文章数": count,
                    "时间": datetime.now().strftime("%H:%M:%S")
                }
            else:
                return {"来源": 名称, "状态": f"HTTP {resp.status}", "文章数": 0, "时间": ""}
    except Exception as e:
        return {"来源": 名称, "状态": f"错误: {e}", "文章数": 0, "时间": ""}

async def 生成简报():
print("=" * 60)
print(f"📰 每日科技简报 - {datetime.now().strftime('%Y-%m-%d %H:%M')}")
print("=" * 60)

async with aiohttp.ClientSession() as session:
    tasks = [抓取新闻源(名称, url, session) for 名称, url in NEWS_SOURCES.items()]
    results = await asyncio.gather(*tasks)

# 打印简报
print(f"\n{'来源':<10} {'状态':<20} {'时间':<10} {'链接数':<10}")
print("-" * 50)
for r in results:
    print(f"{r['来源']:<10} {r['状态']:<20} {r['时间']:<10} {r['文章数']:<10}")

# 保存到文件
filename = f"简报_{datetime.now().strftime('%Y%m%d_%H%M')}.txt"
with open(filename, "w", encoding="utf-8") as f:
    f.write(f"每日科技简报 - {datetime.now().strftime('%Y-%m-%d %H:%M')}\n")
    f.write("=" * 50 + "\n")
    for r in results:
        f.write(f"{r['来源']}: {r['状态']}\n")
print(f"\n📁 简报已保存到 {filename}")

if __name__ == "__main__":
asyncio.run(生成简报())

预期输出

============================================================
📰 每日科技简报 - 2024-01-15 08:30
============================================================

来源       状态                   时间       链接数     
--------------------------------------------------
36氪       成功                   08:30:01   2847
少数派     成功                   08:30:02   1234
虎嗅       成功                   08:30:03   892
InfoQ      成功                   08:30:04   1567

📁 简报已保存到 简报_20240115_0830.txt

一句话解释:用字典存配置方便扩展,Semaphore(3) 限制并发数,gather 一次性发起所有请求。

配图2 - 配图2


💪 进阶:新手最容易踩的 5 个坑

坑 1:忘记 await,协程变成"假"执行

# ❌ 错误:没写 await,任务根本不会执行
async def main():
tasks = [抓取页面(url) for url in urls]  # 只是创建了协程对象列表
results = await asyncio.gather(*tasks)   # 这时候才开始执行!

# ✅ 正确:先 await 再 gather
async def main():
tasks = [抓取页面(url) for url in urls]
results = await asyncio.gather(*tasks)

坑 2:在非 async 函数里用 await

# ❌ 错误:普通函数里不能用 await
def 普通函数():
await asyncio.sleep(1)  # SyntaxError!

# ✅ 正确:要用 async def
async def 异步函数():
await asyncio.sleep(1)

坑 3:信号量放错位置,起不到限流作用

# ❌ 错误:semaphore 在 async with 外面
async def 限流任务(id):
async with semaphore:  # 正确用法
    ...

# ❌ 错误用法:gather 一次发起太多任务
async def main():
# 1000 个 URL,同时发 1000 个请求——可能被封!
tasks = [抓取页面(url) for url in urls]  # 没有限流
await asyncio.gather(*tasks)

坑 4:不设置超时,某个网站卡住全部卡死

# ❌ 错误:没设超时,永远等下去
async with session.get(url) as resp:
...

# ✅ 正确:设置 10 秒超时
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
...

坑 5:用 time.sleep() 而不是 asyncio.sleep()

# ❌ 错误:time.sleep 会阻塞整个程序
await time.sleep(1)  # 这期间什么都干不了!

# ✅ 正确:用 asyncio.sleep 才能切换任务
await asyncio.sleep(1)  # 暂停当前任务,CPU 可以干别的

调试技巧:用 print 配合 task 对象名

async def 抓取页面(session, url, task_id):
print(f"[任务{task_id}] 开始抓取 {url}")
async with session.get(url) as resp:
    print(f"[任务{task_id}] 收到响应 {resp.status}")
    return await resp.text()

# 运行时会看到哪个任务先完成,便于排查问题

✏️ 练习题

练习 1(2 分钟):改 URL 列表

  • 输入:把项目 1 中的 urls 列表改成 ["https://www.bing.com", "https://www.yahoo.com"]
  • 预期输出:打印出这两个网站的标题
  • 提示:列表里有两个元素,替换掉原来的就行

练习 2(3 分钟):加个状态判断

  • 输入:在项目 1 代码里,加一个判断,如果状态码不是 200,打印 "失败"
  • 预期输出:404 的网站打印 "失败"
  • 提示:在 获取网页标题 函数里加个 if 判断

练习 3(10 分钟):处理新的 JSON 数据

  • 输入:创建一个 products.json,包含 5 个商品链接
  • 预期输出:并发抓取后保存到 products_result.csv
  • 提示:复用项目 2 的代码结构

练习 4(15 分钟):串起两个项目

  • 输入:把项目 2 的 JSON 读取 + 项目 3 的格式化输出组合起来
  • 预期输出:读取 urls.json,抓取后打印成表格而不是存 CSV
  • 提示:把 CSV 写入部分换成 print 格式化输出

练习 5(5 分钟):分析报错

  • 输入:下面代码运行后报错 SyntaxError: 'await' outside async function
  • 预期输出:找出问题并修复
  • 提示:哪个函数忘记加 async 了?
def 获取数据():
await asyncio.sleep(1)
return "数据"

asyncio.run(获取数据())

作业:做一个「并发爬虫实战工具」

需求描述:做一个可配置的网页标题批量抓取工具,支持:
1. 从 CSV 文件读取 URL 列表(格式:name,url
2. 并发抓取(可配置最大并发数,默认为 3)
3. 抓取结果保存到新 CSV(包含:名称、URL、状态码、标题、抓取耗时)
4. 彩色终端输出,看着更专业

功能点
- ✅ 从 input.csv 读取 URL
- ✅ 用 Semaphore 控制并发数
- ✅ 保存到 output.csv
- ✅ 记录每个任务耗时

加分项
- ⭐ 支持 --concurrency 命令行参数设置并发数
- ⭐ 支持 --timeout 设置超时时间
- ⭐ 显示进度条(用 tqdm

验收标准
- 能跑起来(python crawler.py
- 正确读取 CSV 并输出 CSV
- 代码有注释(每段代码干啥的)

提交方式:评论区贴代码或 GitHub 链接


📚 总结 + 资源

本文学到的 3 个核心点
1. async/await 让异步代码写得和同步一样自然
2. asyncio.gather 一次性发起多个任务,等全部完成
3. Semaphore 控制同时运行的任务数,防止被封 IP

延伸学习资源
- Python 官方 asyncio 文档 —— 权威但有点枯燥,当字典查
- 《Python 高手之路》—— 进阶必读,讲了很多工程实践
- aiohttp 官方文档 —— 做爬虫必备的 HTTP 客户端库

互动钩子:你在爬虫实战中踩过什么坑?被封过 IP 吗?评论区聊聊,老粉优先回复!


下章预告:学会了并发抓网页,下一章我们要学HTTP 模块基础——亲手写一个最简单的 Web 服务器,理解浏览器和服务器是怎么"对话"的。敬请期待!

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。