第10章 10.1 实时聊天系统(WebSocket)
前置章节衔接
上一章我们折腾完了 Nest.js 和 Fastify这两个"豪华餐厅"——它们帮你处理好了路由、中间件、依赖注入这些繁琐的后厨管理,让你专心炒菜。但你有没有遇到过这种情况:点完餐后想"实时知道菜做好了没",而不是一次次去问服务员"好了吗好了吗"?
这就好比你发消息给朋友,不想每次都刷新页面等对方回复,而是希望对方一回复,你这边立刻弹出来——不需要你做任何操作。
这就是 WebSocket 要解决的问题。
🎯 开场 3 分钟:为什么要学这个?
想象这个场景:
你在做一个「在线客服」网站。用户发一条消息,你得等几秒甚至几十秒才能收到响应——因为每次通信都要重新建立连接、发送请求、等待响应。这感觉就像:
每次你想和女朋友说句话,都要先拨电话、等接通、说两句、挂断,下次再说再拨一遍。
累不累?
再想象另一个场景:
你做了一个「股票行情监控」页面,股价每秒都在变。你用普通 HTTP 轮询(每隔几秒请求一次),结果:
- 服务器压力大(1000个人每人每秒请求10次 = 1万次/秒)
- 数据不实时(你拿到的是几秒前的数据)
- 用户体验差(页面一闪一闪地更新)
这时候 WebSocket 就是你的救星:
它像是给你们俩拉了一个「专线电话」——一次接通,双向通话,想啥时候说就啥时候说,不用每次重新拨号。
学完这章,你将能够:
- 理解 WebSocket 的「握手」原理
- 用 Python 的 websockets 库写一个实时聊天服务器
- 实现消息广播、在线人数统计、心跳保活
🧱 基础 25 分钟:核心概念
什么是 WebSocket?
生活类比:
普通 HTTP 就像外卖点餐:
- 你下单(发送请求)
- 店家接单、处理
- 骑手送来(返回响应)
- 交易结束,挂断联系
WebSocket 就像你跟朋友的微信聊天:
- 你们互相加了好友(建立连接)
- 你随时可以发消息,对方秒收
- 对方回复,你秒收
- 连接一直保持着,想聊就聊
为什么要用 WebSocket?
| 场景 | HTTP 轮询 | WebSocket |
|---|---|---|
| 实时性 | 慢(几秒延迟) | 快(毫秒级) |
| 服务器压力 | 大(频繁建连) | 小(长连接) |
| 双向通信 | 不支持 | 支持 |
| 适用场景 | 一次性请求 | 实时交互 |
WebSocket 的「握手」是什么?
生活类比:
握手就像你们第一次见面时的自我介绍——
你说"你好,我是小明,请多关照"
对方说"你好,我是小红,以后一起玩"
说完这两句,你们就认识了,后面就可以随便聊天了,不用每次说话前再自我介绍一遍。
技术解释:
WebSocket 建立连接时,客户端发一个特殊的 HTTP 请求:
GET /chat HTTP/1.1
Host: server.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
服务器如果同意,就会返回:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
然后——连接从 HTTP 协议「升级」成了 WebSocket 协议,专线电话接通!

第一个 WebSocket 程序
安装库:
pip install websockets
服务器端代码:
import asyncio
import websockets
async def chat(websocket, path):
# 等待客户端发来消息
message = await websocket.recv()
print(f"收到消息: {message}")
# 回复一条消息
await websocket.send(f"服务器收到了: {message}")
# 启动服务器,监听 8765 端口
start_server = websockets.serve(chat, "localhost", 8765)
print("聊天服务器启动中...")
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
保存为 server.py,运行:
python server.py
你会看到输出:聊天服务器启动中...
客户端代码(用浏览器控制台或 Python 连接):
import asyncio
import websockets
async def client():
# 连接到服务器
async with websockets.connect("ws://localhost:8765") as ws:
# 发送消息
await ws.send("你好,服务器!")
print("已发送: 你好,服务器!")
# 接收回复
response = await ws.recv()
print(f"收到回复: {response}")
asyncio.get_event_loop().run_until_complete(client())
保存为 client.py,新开一个终端运行:
python client.py
预期输出:
已发送: 你好,服务器!
收到回复: 服务器收到了: 你好,服务器!
服务器端会显示:
收到消息: 你好,服务器!
这行代码在干嘛:
| 代码 | 解释 |
|---|---|
websockets.serve(chat, "localhost", 8765) |
启动服务器,监听 8765 端口 |
await websocket.recv() |
阻塞等待,等客户端发消息来 |
await websocket.send(...) |
把消息发回给客户端 |
广播消息:多人聊天
现在我们一个人说话只能服务器一个人收到,怎么让所有人都收到?
思路:用一个「公告板」存所有连接,收到消息就贴上去让大家看。

import asyncio
import websockets
# 所有活跃的连接,像一个微信群
connected_clients = set()
async def chat(websocket, path):
# 把新朋友加入群
connected_clients.add(websocket)
print(f"新人入群,当前人数: {len(connected_clients)}")
try:
# 一直等着收消息
async for message in websocket:
print(f"收到消息: {message}")
# 广播给所有人(除了发消息的人)
for client in connected_clients:
if client != websocket:
await client.send(f"有人说了: {message}")
finally:
# 朋友走了,从群里移除
connected_clients.remove(websocket)
print(f"有人退群,剩余人数: {len(connected_clients)}")
start_server = websockets.serve(chat, "localhost", 8765)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
这行代码在干嘛:
| 代码 | 解释 |
|---|---|
connected_clients = set() |
创建「群成员名单」 |
connected_clients.add(websocket) |
新连接进来,记录一下 |
async for message in websocket: |
不断收消息,收到后循环处理 |
for client in connected_clients: |
遍历所有人,一个一个发 |
测试:
开三个终端,分别运行 client.py,你发一条消息,其他两个终端都能收到!
心跳机制:怎么知道对方还「活着」?
问题: 连接开着,但对方电脑死机了、网线拔了——你还以为他在在线发消息。
生活类比:
就像你跟异地恋女朋友打电话,你们约定:
"每 10 秒说一声'在吗',超过 30 秒没回就挂断重打"
这就是心跳机制。
代码实现:
import asyncio
import websockets
import json
connected_clients = {}
async def heartbeat(websocket, path):
client_id = id(websocket)
connected_clients[client_id] = websocket
# 给这个客户端发一个"ping"
await websocket.send(json.dumps({"type": "ping"}))
try:
# 等待pong回复,10秒超时
response = await asyncio.wait_for(websocket.recv(), timeout=10)
data = json.loads(response)
if data.get("type") == "pong":
print(f"客户端 {client_id} 还活着")
# 心跳正常,继续保持连接
except asyncio.TimeoutError:
print(f"客户端 {client_id} 没回应,断开连接")
del connected_clients[client_id]
start_server = websockets.serve(heartbeat, "localhost", 8765)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
🔥 实战 35 分钟:3 个递进的小项目
项目 1(5 分钟):简易实时广播服务器
需求: 做一个服务器,多个客户端可以连接,任何人发消息所有人都能收到。
完整代码:
import asyncio
import websockets
clients = set()
async def echo_handler(websocket, path):
clients.add(websocket)
print(f"[+] 客户端接入,当前在线: {len(clients)}")
try:
async for msg in websocket:
# 广播给所有客户端
broadcast_msg = f"[广播] {msg}"
print(f"广播消息: {msg}")
for client in clients:
await client.send(broadcast_msg)
except websockets.exceptions.ConnectionClosed:
pass
finally:
clients.remove(websocket)
print(f"[-] 客户端断开,当前在线: {len(clients)}")
async def main():
server = await websockets.serve(echo_handler, "localhost", 8765)
print("🌟 广播服务器已启动,端口 8765")
await asyncio.Future() # 永久运行
asyncio.run(main())
保存为 echo_server.py,运行:
python echo_server.py
客户端测试代码:
import asyncio
import websockets
async def client(name):
async with websockets.connect("ws://localhost:8765") as ws:
print(f"{name} 已连接")
async def receive():
async for msg in ws:
print(f"收到: {msg}")
# 同时接收和发送
receive_task = asyncio.create_task(receive())
# 发3条消息
for i in range(3):
await ws.send(f"{name} 的第 {i+1} 条消息")
await asyncio.sleep(1)
await receive_task
asyncio.run(client("小明"))
运行结果: 开两个终端分别运行 client("小明") 和 client("小红"),就能看到双向广播效果了。
项目 2(15 分钟):实时股价监控
需求: 模拟一个股票行情服务器,客户端连上来后,每秒推送一只股票的价格更新。
数据源(简化模拟):
import asyncio
import websockets
import json
import random
import datetime
# 模拟股票数据
STOCKS = {
"AAPL": 150.0,
"GOOGL": 2800.0,
"MSFT": 300.0,
"TSLA": 700.0
}
async def stock_price_generator(websocket, path):
"""每秒推送股票价格更新"""
try:
while True:
# 随机更新一只股票的价格(模拟波动)
stock = random.choice(list(STOCKS.keys()))
change = random.uniform(-5, 5)
STOCKS[stock] = round(STOCKS[stock] + change, 2)
# 构造消息
message = json.dumps({
"stock": stock,
"price": STOCKS[stock],
"change": round(change, 2),
"time": datetime.datetime.now().strftime("%H:%M:%S")
})
await websocket.send(message)
print(f"推送: {message}")
await asyncio.sleep(1) # 每秒推送一次
except websockets.exceptions.ConnectionClosed:
print("客户端断开")
async def main():
server = await websockets.serve(stock_price_generator, "localhost", 8766)
print("📈 股价监控服务器已启动,端口 8766")
await asyncio.Future()
asyncio.run(main())
客户端代码:
import asyncio
import websockets
import json
async def monitor():
async with websockets.connect("ws://localhost:8766") as ws:
print("已连接股价服务器,等待数据...\n")
async for msg in ws:
data = json.loads(msg)
change_symbol = "🔼" if data["change"] >= 0 else "🔽"
print(f"[{data['time']}] {data['stock']}: ${data['price']} {change_symbol} {data['change']}")
asyncio.run(monitor())
保存并运行:
# 终端1
python stock_server.py
# 终端2
python stock_client.py
预期输出:
已连接股价服务器,等待数据...
[14:30:01] TSLA: $702.50 🔼 2.50
[14:30:02] AAPL: $148.30 🔽 -1.70
[14:30:03] GOOGL: $2798.20 🔽 -1.80
...
项目 3(15 分钟):多人在线待办清单
需求: 做一个实时同步的待办清单,多人共享,一人添加/删除,所有人立即看到更新。
服务器代码:
import asyncio
import websockets
import json
# 共享的待办清单
todos = [
{"id": 1, "text": "买牛奶", "done": False},
{"id": 2, "text": "交电费", "done": True}
]
connected_clients = set()
next_id = 3
async def todo_server(websocket, path):
connected_clients.add(websocket)
# 发送当前所有待办
await websocket.send(json.dumps({"type": "init", "todos": todos}))
async for msg in websocket:
global next_id
data = json.loads(msg)
if data["action"] == "add":
todo = {"id": next_id, "text": data["text"], "done": False}
todos.append(todo)
next_id += 1
broadcast = {"type": "add", "todo": todo}
elif data["action"] == "toggle":
for todo in todos:
if todo["id"] == data["id"]:
todo["done"] = not todo["done"]
break
broadcast = {"type": "toggle", "id": data["id"]}
elif data["action"] == "delete":
todos[:] = [t for t in todos if t["id"] != data["id"]]
broadcast = {"type": "delete", "id": data["id"]}
# 广播给所有人
for client in connected_clients:
await client.send(json.dumps(broadcast))
async def main():
server = await websockets.serve(todo_server, "localhost", 8767)
print("📝 待办清单服务器已启动,端口 8767")
await asyncio.Future()
asyncio.run(main())
客户端代码:
import asyncio
import websockets
import json
async def todo_client():
async with websockets.connect("ws://localhost:8767") as ws:
print("已连接待办清单服务器\n")
async def send_action(action, **kwargs):
msg = json.dumps({"action": action, **kwargs})
await ws.send(msg)
async def handle_updates():
async for msg in ws:
data = json.loads(msg)
if data["type"] == "init":
print("当前待办清单:")
for t in data["todos"]:
status = "✅" if t["done"] else "⬜"
print(f" {status} {t['id']}. {t['text']}")
elif data["type"] == "add":
print(f"\n➕ 新增: {data['todo']['text']}")
elif data["type"] == "toggle":
print(f"\n🔄 切换待办 ID: {data['id']}")
elif data["type"] == "delete":
print(f"\n🗑️ 删除待办 ID: {data['id']}")
# 启动接收任务
receive_task = asyncio.create_task(handle_updates())
# 模拟操作
await asyncio.sleep(1)
await send_action("add", text="给猫铲屎")
await asyncio.sleep(1)
await send_action("toggle", id=1)
await asyncio.sleep(1)
await send_action("delete", id=2)
await asyncio.sleep(1)
await send_action("add", text="复习考试")
await receive_task
asyncio.run(todo_client())
预期输出:
已连接待办清单服务器
当前待办清单:
⬜ 1. 买牛奶
✅ 2. 交电费
➕ 新增: 给猫铲屎
🔄 切换待办 ID: 1
🗑️ 删除待办 ID: 2
➕ 新增: 复习考试
💪 进阶 20 分钟:常见坑 + 性能小贴士
坑 1:连接关闭后还在发消息
❌ 错误示例:
async def bad_handler(websocket, path):
async for msg in websocket:
# 网络波动导致连接断了,但代码还在尝试发消息
await websocket.send(f"echo: {msg}") # 可能抛异常
✅ 正确示例:
async def good_handler(websocket, path):
try:
async for msg in websocket:
await websocket.send(f"echo: {msg}")
except websockets.exceptions.ConnectionClosed:
print("连接已正常关闭") # 优雅处理
坑 2:广播时没有排除自己
❌ 错误示例:
for client in clients:
await client.send(msg) # 发给所有人包括自己,导致自己收到重复消息
✅ 正确示例:
for client in clients:
if client != websocket: # 排除自己
await client.send(msg)
坑 3:共享状态没有加锁
❌ 错误示例:
connected_clients = set()
async def handler(websocket, path):
connected_clients.add(websocket)
# 如果这时候另一个连接也 add,可能出问题
✅ 正确示例(用锁保护):
import asyncio
connected_clients = set()
clients_lock = asyncio.Lock()
async def handler(websocket, path):
async with clients_lock:
connected_clients.add(websocket)
坑 4:忘记处理 Unicode 编码
❌ 错误示例:
await websocket.send("你好,世界") # Python 3 默认就是 str,没问题
# 但如果是从文件/数据库读取的 bytes,没有 decode
✅ 正确示例:
# 发送时确保是 str
message = data.decode("utf-8") if isinstance(data, bytes) else data
await websocket.send(message)
坑 5:心跳超时时间设置不合理
❌ 错误示例:
# 超时时间太短,网络抖动就断开
response = await asyncio.wait_for(ws.recv(), timeout=1) # 1秒,太严苛
✅ 正确示例:
# 根据实际网络情况调整
response = await asyncio.wait_for(ws.recv(), timeout=30) # 30秒比较合理
性能小贴士:批量发送
如果短时间内要发很多条消息,可以攒起来一起发:
# 不要这样(每次 await 都是一次网络往返)
for item in large_list:
await websocket.send(item)
# 这样更快(构造一个大消息一次性发)
batch = "\n".join(large_list)
await websocket.send(batch)
调试技巧:打印日志
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
async def handler(websocket, path):
logger.info(f"新连接来自: {websocket.remote_address}")
try:
async for msg in websocket:
logger.debug(f"收到消息: {msg}")
await websocket.send(msg)
except Exception as e:
logger.error(f"错误: {e}")
✏️ 练习题 + 作业题
练习题(5 道,10 分钟内完成)
练习 1(2 分钟):修改端口号
- 输入:把 echo_server.py 的端口从 8765 改成 8888
- 预期输出:客户端连接 ws://localhost:8888 能正常通信
- 提示:start_server = websockets.serve(echo_handler, "localhost", 8888)
练习 2(2 分钟):添加昵称广播
- 输入:在 echo_server.py 中,收到消息时显示发送者 IP
- 预期输出:收到客户端 127.0.0.1 的消息: hello
- 提示:websocket.remote_address 可以拿到 IP
练习 3(2 分钟):过滤空消息
- 输入:在 echo_server.py 中,不广播空消息
- 预期输出:发送空字符串时不广播,发送 "hi" 时正常广播
- 提示:if not msg.strip(): continue
练习 4(2 分钟):统计在线人数
- 输入:在 echo_server.py 中,客户端连接时广播 "有人加入了"
- 预期输出:第二个客户端连接时,第一个客户端收到 "有人加入了,当前在线 2 人"
- 提示:在 add 之后、remove 之前广播
练习 5(2 分钟):修复编码报错
- 输入:客户端发送中文 "你好",服务器报错 UnicodeEncodeError
- 预期输出:服务器正常显示中文,不报错
- 提示:确保 websockets 库版本是最新的 pip install --upgrade websockets
作业题(30 分钟 - 2 小时)
作业:做一个「实时聊天室」
-
需求描述: 实现一个多人在线聊天室,支持:
1. 用户进入时输入昵称
2. 发送消息所有人可见
3. 显示当前在线人数
4. 用户离开时广播「xxx 离开了」 -
功能点:
1. 服务器维护在线用户列表{websocket: nickname}
2. 消息格式:[时间] 昵称: 消息内容
3. 系统消息(如加入/离开)用不同颜色/前缀区分 -
加分项:
1. 支持/nick 新昵称命令修改昵称
2. 支持/dm 昵称 消息发送私信 -
验收标准:
- 能跑起来(运行
python chat_server.py不报错) - 多开两个客户端能互相收发消息
-
在线人数统计正确
-
提交方式: 评论区贴代码或 GitHub 链接
📚 总结 + 资源
本文学到的 3 个核心点:
- WebSocket 是双向长连接 —— 一次握手,一直通话,不像 HTTP 每次都要重新建连
- 广播机制靠「群成员名单」 —— 用
set()存所有连接,收到消息遍历发送 - 心跳保活不能少 —— 定期发
ping检测对方是否还活着,超时就断开
延伸学习资源:
- websockets 官方文档 —— 最新的 API 和示例
- 《Python 网络编程》—— 深入理解 socket、asyncio、异步 I/O
- Socket.io vs WebSocket —— 了解什么时候用更高级的方案
互动钩子:
你在项目中用过 WebSocket 吗?比如在线文档协作、游戏联机、实时推送?评论区聊聊你的使用场景,老粉优先回复!
下一章我们要玩点不一样的——告别浏览器,打开终端,用 Python 做一个自己的命令行工具(CLI),带颜色、带进度条、还能交互输入。学完你就可以说「我会做工具了」而不是「我只会在 IDE 里跑代码」。
(全文约 5200 字)

评论(0)