第10章 10.1 实时聊天系统(WebSocket)

前置章节衔接

上一章我们折腾完了 Nest.js 和 Fastify这两个"豪华餐厅"——它们帮你处理好了路由、中间件、依赖注入这些繁琐的后厨管理,让你专心炒菜。但你有没有遇到过这种情况:点完餐后想"实时知道菜做好了没",而不是一次次去问服务员"好了吗好了吗"?

这就好比你发消息给朋友,不想每次都刷新页面等对方回复,而是希望对方一回复,你这边立刻弹出来——不需要你做任何操作。

这就是 WebSocket 要解决的问题。


🎯 开场 3 分钟:为什么要学这个?

想象这个场景:

你在做一个「在线客服」网站。用户发一条消息,你得等几秒甚至几十秒才能收到响应——因为每次通信都要重新建立连接、发送请求、等待响应。这感觉就像:

每次你想和女朋友说句话,都要先拨电话、等接通、说两句、挂断,下次再说再拨一遍。

累不累?

再想象另一个场景:

你做了一个「股票行情监控」页面,股价每秒都在变。你用普通 HTTP 轮询(每隔几秒请求一次),结果:
- 服务器压力大(1000个人每人每秒请求10次 = 1万次/秒)
- 数据不实时(你拿到的是几秒前的数据)
- 用户体验差(页面一闪一闪地更新)

这时候 WebSocket 就是你的救星:

它像是给你们俩拉了一个「专线电话」——一次接通,双向通话,想啥时候说就啥时候说,不用每次重新拨号。

学完这章,你将能够:
- 理解 WebSocket 的「握手」原理
- 用 Python 的 websockets 库写一个实时聊天服务器
- 实现消息广播、在线人数统计、心跳保活


🧱 基础 25 分钟:核心概念

什么是 WebSocket?

生活类比:

普通 HTTP 就像外卖点餐:
- 你下单(发送请求)
- 店家接单、处理
- 骑手送来(返回响应)
- 交易结束,挂断联系

WebSocket 就像你跟朋友的微信聊天:
- 你们互相加了好友(建立连接)
- 你随时可以发消息,对方秒收
- 对方回复,你秒收
- 连接一直保持着,想聊就聊

为什么要用 WebSocket?

场景 HTTP 轮询 WebSocket
实时性 慢(几秒延迟) 快(毫秒级)
服务器压力 大(频繁建连) 小(长连接)
双向通信 不支持 支持
适用场景 一次性请求 实时交互

WebSocket 的「握手」是什么?

生活类比:

握手就像你们第一次见面时的自我介绍——
你说"你好,我是小明,请多关照"
对方说"你好,我是小红,以后一起玩"

说完这两句,你们就认识了,后面就可以随便聊天了,不用每次说话前再自我介绍一遍。

技术解释:

WebSocket 建立连接时,客户端发一个特殊的 HTTP 请求:

GET /chat HTTP/1.1
Host: server.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==

服务器如果同意,就会返回:

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade

然后——连接从 HTTP 协议「升级」成了 WebSocket 协议,专线电话接通!

配图1 - 配图1

第一个 WebSocket 程序

安装库:

pip install websockets

服务器端代码:

import asyncio
import websockets

async def chat(websocket, path):
# 等待客户端发来消息
message = await websocket.recv()
print(f"收到消息: {message}")

# 回复一条消息
await websocket.send(f"服务器收到了: {message}")

# 启动服务器,监听 8765 端口
start_server = websockets.serve(chat, "localhost", 8765)

print("聊天服务器启动中...")
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

保存为 server.py,运行:

python server.py

你会看到输出:聊天服务器启动中...

客户端代码(用浏览器控制台或 Python 连接):

import asyncio
import websockets

async def client():
# 连接到服务器
async with websockets.connect("ws://localhost:8765") as ws:
    # 发送消息
    await ws.send("你好,服务器!")
    print("已发送: 你好,服务器!")

    # 接收回复
    response = await ws.recv()
    print(f"收到回复: {response}")

asyncio.get_event_loop().run_until_complete(client())

保存为 client.py,新开一个终端运行:

python client.py

预期输出:

已发送: 你好,服务器!
收到回复: 服务器收到了: 你好,服务器!

服务器端会显示:

收到消息: 你好,服务器!

这行代码在干嘛:

代码 解释
websockets.serve(chat, "localhost", 8765) 启动服务器,监听 8765 端口
await websocket.recv() 阻塞等待,等客户端发消息来
await websocket.send(...) 把消息发回给客户端

广播消息:多人聊天

现在我们一个人说话只能服务器一个人收到,怎么让所有人都收到

思路:用一个「公告板」存所有连接,收到消息就贴上去让大家看。

配图2 - 配图2

import asyncio
import websockets

# 所有活跃的连接,像一个微信群
connected_clients = set()

async def chat(websocket, path):
# 把新朋友加入群
connected_clients.add(websocket)
print(f"新人入群,当前人数: {len(connected_clients)}")

try:
    # 一直等着收消息
    async for message in websocket:
        print(f"收到消息: {message}")

        # 广播给所有人(除了发消息的人)
        for client in connected_clients:
            if client != websocket:
                await client.send(f"有人说了: {message}")
finally:
    # 朋友走了,从群里移除
    connected_clients.remove(websocket)
    print(f"有人退群,剩余人数: {len(connected_clients)}")

start_server = websockets.serve(chat, "localhost", 8765)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

这行代码在干嘛:

代码 解释
connected_clients = set() 创建「群成员名单」
connected_clients.add(websocket) 新连接进来,记录一下
async for message in websocket: 不断收消息,收到后循环处理
for client in connected_clients: 遍历所有人,一个一个发

测试:

开三个终端,分别运行 client.py,你发一条消息,其他两个终端都能收到!

心跳机制:怎么知道对方还「活着」?

问题: 连接开着,但对方电脑死机了、网线拔了——你还以为他在在线发消息。

生活类比:

就像你跟异地恋女朋友打电话,你们约定:

"每 10 秒说一声'在吗',超过 30 秒没回就挂断重打"

这就是心跳机制。

代码实现:

import asyncio
import websockets
import json

connected_clients = {}

async def heartbeat(websocket, path):
client_id = id(websocket)
connected_clients[client_id] = websocket

# 给这个客户端发一个"ping"
await websocket.send(json.dumps({"type": "ping"}))

try:
    # 等待pong回复,10秒超时
    response = await asyncio.wait_for(websocket.recv(), timeout=10)
    data = json.loads(response)

    if data.get("type") == "pong":
        print(f"客户端 {client_id} 还活着")
        # 心跳正常,继续保持连接
except asyncio.TimeoutError:
    print(f"客户端 {client_id} 没回应,断开连接")
    del connected_clients[client_id]

start_server = websockets.serve(heartbeat, "localhost", 8765)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

🔥 实战 35 分钟:3 个递进的小项目

项目 1(5 分钟):简易实时广播服务器

需求: 做一个服务器,多个客户端可以连接,任何人发消息所有人都能收到。

完整代码:

import asyncio
import websockets

clients = set()

async def echo_handler(websocket, path):
clients.add(websocket)
print(f"[+] 客户端接入,当前在线: {len(clients)}")

try:
    async for msg in websocket:
        # 广播给所有客户端
        broadcast_msg = f"[广播] {msg}"
        print(f"广播消息: {msg}")

        for client in clients:
            await client.send(broadcast_msg)
except websockets.exceptions.ConnectionClosed:
    pass
finally:
    clients.remove(websocket)
    print(f"[-] 客户端断开,当前在线: {len(clients)}")

async def main():
server = await websockets.serve(echo_handler, "localhost", 8765)
print("🌟 广播服务器已启动,端口 8765")
await asyncio.Future()  # 永久运行

asyncio.run(main())

保存为 echo_server.py,运行:

python echo_server.py

客户端测试代码:

import asyncio
import websockets

async def client(name):
async with websockets.connect("ws://localhost:8765") as ws:

    print(f"{name} 已连接")

    async def receive():
        async for msg in ws:
            print(f"收到: {msg}")

    # 同时接收和发送
    receive_task = asyncio.create_task(receive())

    # 发3条消息
    for i in range(3):
        await ws.send(f"{name} 的第 {i+1} 条消息")
        await asyncio.sleep(1)

    await receive_task

asyncio.run(client("小明"))

运行结果: 开两个终端分别运行 client("小明")client("小红"),就能看到双向广播效果了。


项目 2(15 分钟):实时股价监控

需求: 模拟一个股票行情服务器,客户端连上来后,每秒推送一只股票的价格更新。

数据源(简化模拟):

import asyncio
import websockets
import json
import random
import datetime

# 模拟股票数据
STOCKS = {
"AAPL": 150.0,
"GOOGL": 2800.0,
"MSFT": 300.0,
"TSLA": 700.0
}

async def stock_price_generator(websocket, path):
"""每秒推送股票价格更新"""
try:
    while True:
        # 随机更新一只股票的价格(模拟波动)
        stock = random.choice(list(STOCKS.keys()))
        change = random.uniform(-5, 5)
        STOCKS[stock] = round(STOCKS[stock] + change, 2)

        # 构造消息
        message = json.dumps({
            "stock": stock,
            "price": STOCKS[stock],
            "change": round(change, 2),
            "time": datetime.datetime.now().strftime("%H:%M:%S")
        })

        await websocket.send(message)
        print(f"推送: {message}")

        await asyncio.sleep(1)  # 每秒推送一次
except websockets.exceptions.ConnectionClosed:
    print("客户端断开")

async def main():
server = await websockets.serve(stock_price_generator, "localhost", 8766)
print("📈 股价监控服务器已启动,端口 8766")
await asyncio.Future()

asyncio.run(main())

客户端代码:

import asyncio
import websockets
import json

async def monitor():
async with websockets.connect("ws://localhost:8766") as ws:
    print("已连接股价服务器,等待数据...\n")

    async for msg in ws:
        data = json.loads(msg)
        change_symbol = "🔼" if data["change"] >= 0 else "🔽"
        print(f"[{data['time']}] {data['stock']}: ${data['price']} {change_symbol} {data['change']}")

asyncio.run(monitor())

保存并运行:

# 终端1
python stock_server.py

# 终端2
python stock_client.py

预期输出:

已连接股价服务器,等待数据...

[14:30:01] TSLA: $702.50 🔼 2.50
[14:30:02] AAPL: $148.30 🔽 -1.70
[14:30:03] GOOGL: $2798.20 🔽 -1.80
...

项目 3(15 分钟):多人在线待办清单

需求: 做一个实时同步的待办清单,多人共享,一人添加/删除,所有人立即看到更新。

服务器代码:

import asyncio
import websockets
import json

# 共享的待办清单
todos = [
{"id": 1, "text": "买牛奶", "done": False},
{"id": 2, "text": "交电费", "done": True}
]

connected_clients = set()
next_id = 3

async def todo_server(websocket, path):
connected_clients.add(websocket)

# 发送当前所有待办
await websocket.send(json.dumps({"type": "init", "todos": todos}))

async for msg in websocket:
    global next_id
    data = json.loads(msg)

    if data["action"] == "add":
        todo = {"id": next_id, "text": data["text"], "done": False}
        todos.append(todo)
        next_id += 1
        broadcast = {"type": "add", "todo": todo}

    elif data["action"] == "toggle":
        for todo in todos:
            if todo["id"] == data["id"]:
                todo["done"] = not todo["done"]
                break
        broadcast = {"type": "toggle", "id": data["id"]}

    elif data["action"] == "delete":
        todos[:] = [t for t in todos if t["id"] != data["id"]]
        broadcast = {"type": "delete", "id": data["id"]}

    # 广播给所有人
    for client in connected_clients:
        await client.send(json.dumps(broadcast))

async def main():
server = await websockets.serve(todo_server, "localhost", 8767)
print("📝 待办清单服务器已启动,端口 8767")
await asyncio.Future()

asyncio.run(main())

客户端代码:

import asyncio
import websockets
import json

async def todo_client():
async with websockets.connect("ws://localhost:8767") as ws:
    print("已连接待办清单服务器\n")

    async def send_action(action, **kwargs):
        msg = json.dumps({"action": action, **kwargs})
        await ws.send(msg)

    async def handle_updates():
        async for msg in ws:
            data = json.loads(msg)

            if data["type"] == "init":
                print("当前待办清单:")
                for t in data["todos"]:
                    status = "✅" if t["done"] else "⬜"
                    print(f"  {status} {t['id']}. {t['text']}")

            elif data["type"] == "add":
                print(f"\n➕ 新增: {data['todo']['text']}")

            elif data["type"] == "toggle":
                print(f"\n🔄 切换待办 ID: {data['id']}")

            elif data["type"] == "delete":
                print(f"\n🗑️  删除待办 ID: {data['id']}")

    # 启动接收任务
    receive_task = asyncio.create_task(handle_updates())

    # 模拟操作
    await asyncio.sleep(1)
    await send_action("add", text="给猫铲屎")

    await asyncio.sleep(1)
    await send_action("toggle", id=1)

    await asyncio.sleep(1)
    await send_action("delete", id=2)

    await asyncio.sleep(1)
    await send_action("add", text="复习考试")

    await receive_task

asyncio.run(todo_client())

预期输出:

已连接待办清单服务器

当前待办清单:
⬜ 1. 买牛奶
✅ 2. 交电费

➕ 新增: 给猫铲屎
🔄 切换待办 ID: 1
🗑️  删除待办 ID: 2
➕ 新增: 复习考试

💪 进阶 20 分钟:常见坑 + 性能小贴士

坑 1:连接关闭后还在发消息

❌ 错误示例:

async def bad_handler(websocket, path):
async for msg in websocket:
    # 网络波动导致连接断了,但代码还在尝试发消息
    await websocket.send(f"echo: {msg}")  # 可能抛异常

✅ 正确示例:

async def good_handler(websocket, path):
try:
    async for msg in websocket:
        await websocket.send(f"echo: {msg}")
except websockets.exceptions.ConnectionClosed:
    print("连接已正常关闭")  # 优雅处理

坑 2:广播时没有排除自己

❌ 错误示例:

for client in clients:
await client.send(msg)  # 发给所有人包括自己,导致自己收到重复消息

✅ 正确示例:

for client in clients:
if client != websocket:  # 排除自己
    await client.send(msg)

坑 3:共享状态没有加锁

❌ 错误示例:

connected_clients = set()

async def handler(websocket, path):
connected_clients.add(websocket)
# 如果这时候另一个连接也 add,可能出问题

✅ 正确示例(用锁保护):

import asyncio

connected_clients = set()
clients_lock = asyncio.Lock()

async def handler(websocket, path):
async with clients_lock:
    connected_clients.add(websocket)

坑 4:忘记处理 Unicode 编码

❌ 错误示例:

await websocket.send("你好,世界")  # Python 3 默认就是 str,没问题
# 但如果是从文件/数据库读取的 bytes,没有 decode

✅ 正确示例:

# 发送时确保是 str
message = data.decode("utf-8") if isinstance(data, bytes) else data
await websocket.send(message)

坑 5:心跳超时时间设置不合理

❌ 错误示例:

# 超时时间太短,网络抖动就断开
response = await asyncio.wait_for(ws.recv(), timeout=1)  # 1秒,太严苛

✅ 正确示例:

# 根据实际网络情况调整
response = await asyncio.wait_for(ws.recv(), timeout=30)  # 30秒比较合理

性能小贴士:批量发送

如果短时间内要发很多条消息,可以攒起来一起发:

# 不要这样(每次 await 都是一次网络往返)
for item in large_list:
await websocket.send(item)

# 这样更快(构造一个大消息一次性发)
batch = "\n".join(large_list)
await websocket.send(batch)

调试技巧:打印日志

import logging

logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)

logger = logging.getLogger(__name__)

async def handler(websocket, path):
logger.info(f"新连接来自: {websocket.remote_address}")
try:
    async for msg in websocket:
        logger.debug(f"收到消息: {msg}")
        await websocket.send(msg)
except Exception as e:
    logger.error(f"错误: {e}")

✏️ 练习题 + 作业题

练习题(5 道,10 分钟内完成)

练习 1(2 分钟):修改端口号
- 输入:把 echo_server.py 的端口从 8765 改成 8888
- 预期输出:客户端连接 ws://localhost:8888 能正常通信
- 提示:start_server = websockets.serve(echo_handler, "localhost", 8888)

练习 2(2 分钟):添加昵称广播
- 输入:在 echo_server.py 中,收到消息时显示发送者 IP
- 预期输出:收到客户端 127.0.0.1 的消息: hello
- 提示:websocket.remote_address 可以拿到 IP

练习 3(2 分钟):过滤空消息
- 输入:在 echo_server.py 中,不广播空消息
- 预期输出:发送空字符串时不广播,发送 "hi" 时正常广播
- 提示:if not msg.strip(): continue

练习 4(2 分钟):统计在线人数
- 输入:在 echo_server.py 中,客户端连接时广播 "有人加入了"
- 预期输出:第二个客户端连接时,第一个客户端收到 "有人加入了,当前在线 2 人"
- 提示:在 add 之后、remove 之前广播

练习 5(2 分钟):修复编码报错
- 输入:客户端发送中文 "你好",服务器报错 UnicodeEncodeError
- 预期输出:服务器正常显示中文,不报错
- 提示:确保 websockets 库版本是最新的 pip install --upgrade websockets


作业题(30 分钟 - 2 小时)

作业:做一个「实时聊天室」

  • 需求描述: 实现一个多人在线聊天室,支持:
    1. 用户进入时输入昵称
    2. 发送消息所有人可见
    3. 显示当前在线人数
    4. 用户离开时广播「xxx 离开了」

  • 功能点:
    1. 服务器维护在线用户列表 {websocket: nickname}
    2. 消息格式:[时间] 昵称: 消息内容
    3. 系统消息(如加入/离开)用不同颜色/前缀区分

  • 加分项:
    1. 支持 /nick 新昵称 命令修改昵称
    2. 支持 /dm 昵称 消息 发送私信

  • 验收标准:

  • 能跑起来(运行 python chat_server.py 不报错)
  • 多开两个客户端能互相收发消息
  • 在线人数统计正确

  • 提交方式: 评论区贴代码或 GitHub 链接


📚 总结 + 资源

本文学到的 3 个核心点:

  1. WebSocket 是双向长连接 —— 一次握手,一直通话,不像 HTTP 每次都要重新建连
  2. 广播机制靠「群成员名单」 —— 用 set() 存所有连接,收到消息遍历发送
  3. 心跳保活不能少 —— 定期发 ping 检测对方是否还活着,超时就断开

延伸学习资源:

  1. websockets 官方文档 —— 最新的 API 和示例
  2. 《Python 网络编程》—— 深入理解 socket、asyncio、异步 I/O
  3. Socket.io vs WebSocket —— 了解什么时候用更高级的方案

互动钩子:

你在项目中用过 WebSocket 吗?比如在线文档协作、游戏联机、实时推送?评论区聊聊你的使用场景,老粉优先回复!

下一章我们要玩点不一样的——告别浏览器,打开终端,用 Python 做一个自己的命令行工具(CLI),带颜色、带进度条、还能交互输入。学完你就可以说「我会做工具了」而不是「我只会在 IDE 里跑代码」。


(全文约 5200 字)

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。