第9章 9.2 实时通信:WebSocket
🎯 开场:为什么你的App需要"一直在线的电话"?
上一章我们搞定了 uniCloud 云开发,学会了怎么把数据存到云上。但你有没有遇到过这种情况:
- 聊天软件:消息来了,App居然要等好几秒甚至更久才能收到
- 股票行情:价格变了,屏幕上还是旧数据,眼睁睁看着机会溜走
- 在线游戏:明明按了技能,人物却慢半拍才动,被队友骂成"猪队友"
这些问题,本质上都是一个原因:你的App只能用" запрос-响应"模式——就像你发微信,得等对方回,你才能看到新消息。但有些场景,你需要对方主动找你,不需要你一次次去问"在吗在吗在吗"。
WebSocket 就是来解决这个问题的——它让你的App和服务器之间保持一个"一直在线的电话",服务器有新消息可以直接"打电话"告诉你,不用你不停拨过去问"有新消息吗"。
这一章学完,你就能写出:
- 一个聊天室,消息秒收秒发
- 一个实时股价监控小工具
- 带心跳自动重连的稳定连接
\n\n
\n\n
\n\n
🧱 基础:WebSocket 到底是什么?
9.2.1 生活类比:座机 vs 微信语音
想象一下通信方式的发展:
| 方式 | 类比 | 工作方式 |
|---|---|---|
| HTTP 请求 | 发短信 | 你发一条,对方回一条,你再发一条...每次都得等 |
| WebSocket | 微信语音通话 | 连接一次,双方可以随时互相说话,直到挂断 |
WebSocket 的核心特点:
- ✅ 双向通信:服务器能主动推消息给客户端
- ✅ 保持连接:一次握手,持续通信,不用反复建立连接
- ✅ 低延迟:消息来了马上到,不用等轮询
9.2.2 uniapp 里怎么用 WebSocket?
uniapp 给我们封装好了 uni.connectSocket 系列 API,用起来非常方便。
先来看最基础的连接代码:
# 引入 uniCloud 相关的 WebSocket 模块
# 注意:这是 uniapp 的 JS API,但原理是相通的
// 建立连接
const socket = uni.connectSocket({
url: 'ws://localhost:8080/chat', // WebSocket 地址,ws:// 开头
success: () => {
console.log('连接成功!');
},
fail: (err) => {
console.log('连接失败:', err);
}
});
// 监听消息
socket.onMessage((res) => {
console.log('收到消息:', res.data);
});
// 监听关闭
socket.onClose(() => {
console.log('连接断开了');
});
这段代码干了3件事:
1. connectSocket 建立一个 WebSocket 连接
2. onMessage 监听服务器发来的消息
3. onClose 监听连接关闭事件
9.2.3 发送消息:就像发微信
连接成功后,发送消息特别简单:
// 发送文本消息
socket.send({
data: '你好,我是小明!', // 要发送的内容
success: () => {
console.log('发送成功');
}
});
发送 JSON 数据也很常见:
// 发送 JSON 格式的消息
socket.send({
data: JSON.stringify({
type: 'message',
content: '大家好啊',
from: 'xiaoming',
timestamp: Date.now()
})
});
9.2.4 心跳机制:为什么需要"保活"?
你有没有过这种经历:微信开着但很久没操作,再发消息就提示"连接断开,请检查网络"?
WebSocket 连接也是一样的道理。如果长时间没有数据交换,服务器或防火墙可能会认为连接"死"了,自动把它断掉。
心跳机制就是解决这个问题:用一个小小的" ping "消息,定期告诉对方"我还活着"。
// 定时发送心跳
let heartbeatTimer = null;
function startHeartbeat() {
heartbeatTimer = setInterval(() => {
socket.send({
data: JSON.stringify({ type: 'ping' })
});
}, 30000); // 每30秒发一次心跳
}
function stopHeartbeat() {
if (heartbeatTimer) {
clearInterval(heartbeatTimer);
heartbeatTimer = null;
}
}
// 连接成功时启动心跳
socket.onOpen(() => {
startHeartbeat();
});
// 断开时停止心跳
socket.onClose(() => {
stopHeartbeat();
});
9.2.5 自动重连:断了还能自动连上
网络不稳定时,连接可能会意外断开。好的程序应该能自动重连:
let reconnectTimer = null;
let reconnectAttempts = 0;
const MAX_RECONNECT_ATTEMPTS = 5;
function connect() {
const socket = uni.connectSocket({
url: 'ws://localhost:8080/chat'
});
// 监听关闭事件
socket.onClose(() => {
// 如果还没达到最大重试次数,就尝试重连
if (reconnectAttempts < MAX_RECONNECT_ATTEMPTS) {
reconnectAttempts++;
console.log(`连接断开,${reconnectAttempts}秒后尝试第${reconnectAttempts}次重连...`);
reconnectTimer = setTimeout(() => {
connect(); // 递归调用,重新连接
}, 1000 * reconnectAttempts); // 间隔时间递增
} else {
console.log('重连次数用完,放弃治疗');
}
});
// 连接成功时重置计数
socket.onOpen(() => {
reconnectAttempts = 0;
console.log('连接恢复!');
});
}
为什么要间隔递增?如果你一直用1秒的间隔狂轰滥炸,服务器可能会把你当成攻击者拉黑。
🔥 实战:3个递进小项目
项目1:最简聊天室(5分钟)
先跑起来,感受一下 WebSocket 的魔力。
场景:你和朋友在命令行里聊天(用本地测试服务器模拟)
# server.py - WebSocket 服务器(用 Python 的 websockets 库)
# 运行:pip install websockets
# 然后:python server.py
import asyncio
import websockets
from datetime import datetime
async def chat_handler(websocket, path):
"""处理每个客户端的聊天消息"""
client_addr = websocket.remote_address
print(f"新玩家上线:{client_addr}")
try:
async for message in websocket:
print(f"收到 {client_addr} 的消息:{message}")
# 广播给所有在线客户端(简单起见,这里只返回给发送者)
response = f"[{datetime.now().strftime('%H:%M:%S')}] 你说:{message}"
await websocket.send(response)
except websockets.exceptions.ConnectionClosed:
print(f"玩家 {client_addr} 离开了")
# 启动服务器
start_server = websockets.serve(chat_handler, "localhost", 8080)
print("聊天室服务器启动,端口 8080...")
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
客户端代码(uniapp 端):
// chat_client.js - uniapp WebSocket 聊天客户端
// 建立连接
const socket = uni.connectSocket({
url: 'ws://localhost:8080'
});
// 连接成功
socket.onOpen(() => {
console.log('已进入聊天室');
uni.showToast({ title: '进入聊天室', icon: 'success' });
});
// 收到消息
socket.onMessage((res) => {
console.log('收到消息:', res.data);
// 可以把消息显示到页面上
});
// 发送消息函数
function sendMessage(content) {
socket.send({
data: content
});
}
// 示例:发送"你好"
sendMessage('你好');
// 监听关闭
socket.onClose(() => {
console.log('连接已断开');
});
预期输出(服务器端):
聊天室服务器启动,端口 8080...
新玩家上线:('127.0.0.1', 52345)
收到 ('127.0.0.1', 52345) 的消息:你好
一句话解释:服务器收到消息后,会加上时间戳再返回给你,这就是"实时对话"的感觉。
项目2:实时股票价格监控(15分钟)
场景:你持有某只股票,想盯着价格,到某个点位就提醒你卖出。
这个项目从 CSV 文件读取股票代码,实时监控价格(我们用模拟数据,实际项目中可以接真实 API)。
# stock_monitor.py - 股票价格实时监控
import asyncio
import websockets
import json
import random
from datetime import datetime
# 模拟股票数据(实际项目中可以从数据库或API获取)
STOCK_CODES = ['AAPL', 'GOOGL', 'MSFT', 'AMZN', 'TSLA']
def generate_stock_price(code):
"""模拟生成股票价格(实际项目替换为真实API调用)"""
base_prices = {'AAPL': 175, 'GOOGL': 140, 'MSFT': 380, 'AMZN': 180, 'TSLA': 250}
price = base_prices.get(code, 100)
# 模拟价格波动 ±2%
动 = random.uniform(-0.02, 0.02)
return round(price * (1 + 变动), 2)
async def stock_client():
"""WebSocket 客户端:连接服务器获取股票价格"""
uri = "ws://localhost:8081"
# 设置报警阈值
alert_threshold = 0.5 # 价格变化超过0.5%就报警
async with websockets.connect(uri) as websocket:
print(f"已连接股票行情服务器")
for _ in range(10): # 模拟接收10次行情
# 接收服务器推送的行情数据
data = await websocket.recv()
stock_data = json.loads(data)
current_time = datetime.now().strftime('%H:%M:%S')
code = stock_data['code']
price = stock_data['price']
change = stock_data['change_percent']
# 打印行情
color_indicator = '↑' if change > 0 else '↓'
print(f"[{current_time}] {code}: ${price} {color_indicator}{abs(change):.2f}%")
# 检查是否需要报警
if abs(change) > alert_threshold:
print(f"🚨 报警!{code} 价格变动超过阈值!")
await asyncio.sleep(1) # 每秒更新一次
# 以下是简单服务器实现(推送模拟行情)
async def stock_server(websocket, path):
"""服务器:定期推送股票行情"""
for _ in range(100):
# 随机选一只股票推送
code = random.choice(STOCK_CODES)
price = generate_stock_price(code)
prev_price = price * random.uniform(0.98, 1.02)
change_percent = ((price - prev_price) / prev_price) * 100
data = json.dumps({
'code': code,
'price': price,
'change_percent': change_percent,
'timestamp': datetime.now().isoformat()
})
await websocket.send(data)
await asyncio.sleep(0.5)
async def main():
# 启动服务器(在另一个线程或进程中运行)
# 这里简化处理,假设服务器已经在运行
print("开始监控股票行情...")
await stock_client()
# 运行客户端
asyncio.run(main())
预期输出:
开始监控股票行情...
已连接股票行情服务器
[14:30:01] AAPL: $174.23 ↑0.15%
[14:30:02] TSLA: $248.50 ↓0.62%
🚨 报警!TSLA 价格变动超过阈值!
[14:30:03] GOOGL: $141.20 ↑0.85%
[14:30:04] MSFT: $379.80 ↓0.05%
...
一句话解释:WebSocket 让服务器能主动把最新的股票价格"推"给你,不用你每秒发一次请求去"问"价格。
项目3:待办清单实时同步工具(15分钟)
场景:你和小明一起做一个项目,想共享一个待办清单,任何一个人加了新任务,另一个人马上能看到。
这个项目组合了项目1的通信基础 + 项目2的数据结构,是个小而美的协作工具。
# todo_sync.py - 实时同步待办清单
import asyncio
import websockets
import json
from datetime import datetime
class TodoList:
"""待办清单类"""
def __init__(self):
self.items = [] # 存储待办项
self.listeners = [] # 存储所有在线客户端
def add_item(self, text, priority='normal'):
"""添加待办项"""
item = {
'id': len(self.items) + 1,
'text': text,
'priority': priority, # high, normal, low
'done': False,
'created_at': datetime.now().isoformat()
}
self.items.append(item)
return item
def complete_item(self, item_id):
"""标记完成"""
for item in self.items:
if item['id'] == item_id:
item['done'] = True
item['completed_at'] = datetime.now().isoformat()
return True
return False
def get_all(self):
"""获取所有待办"""
return self.items
def to_json(self):
return json.dumps({
'type': 'sync',
'data': self.items
})
# 全局待办清单实例
todo_list = TodoList()
# 初始化一些待办
todo_list.add_item('完成WebSocket教程', priority='high')
todo_list.add_item('写练习题', priority='normal')
todo_list.add_item('录视频讲解', priority='low')
async def handle_client(websocket, path):
"""处理客户端连接"""
client_id = id(websocket)
print(f"客户端 {client_id} 连接")
# 发送当前所有待办给新连接的客户端
await websocket.send(todo_list.to_json())
try:
async for message in websocket:
# 解析客户端发来的命令
command = json.loads(message)
action = command.get('action')
if action == 'add':
# 添加新待办
text = command.get('text', '')
priority = command.get('priority', 'normal')
new_item = todo_list.add_item(text, priority)
print(f"添加待办:{text}")
# 广播给所有客户端
for listener in todo_list.listeners:
try:
await listener.send(todo_list.to_json())
except:
pass
elif action == 'complete':
# 标记完成
item_id = command.get('id')
if todo_list.complete_item(item_id):
print(f"待办 {item_id} 已完成")
# 广播更新
for listener in todo_list.listeners:
try:
await listener.send(todo_list.to_json())
except:
pass
elif action == 'get':
# 客户端请求最新数据
await websocket.send(todo_list.to_json())
except websockets.exceptions.ConnectionClosed:
print(f"客户端 {client_id} 断开")
finally:
if websocket in todo_list.listeners:
todo_list.listeners.remove(websocket)
async main():
# 启动服务器
async with websockets.serve(handle_client, "localhost", 8082):
print("待办清单同步服务器启动!")
print("监听端口 8082...")
await asyncio.Future() # 永久运行
asyncio.run(main())
客户端测试代码:
# test_todo_client.py - 测试客户端
import asyncio
import websockets
import json
async def test_client():
uri = "ws://localhost:8082"
async with websockets.connect(uri) as socket:
# 接收初始数据
initial_data = await socket.recv()
print("当前待办清单:")
print(initial_data)
# 添加一个新待办
await socket.send(json.dumps({
'action': 'add',
'text': '测试添加的待办',
'priority': 'high'
}))
print("已发送添加请求")
# 等待同步更新
await asyncio.sleep(1)
updated_data = await socket.recv()
print("更新后的待办:")
print(updated_data)
asyncio.run(test_client())
预期输出:
当前待办清单:
{"type": "sync", "data": [{"id": 1, "text": "完成WebSocket教程", ...}, ...]}
已发送添加请求
更新后的待办:
{"type": "sync", "data": [..., {"id": 4, "text": "测试添加的待办", ...}]}
一句话解释:任何客户端修改了待办清单,服务器就会把最新数据"广播"给所有在线的人,实现实时同步。
💪 进阶:新手容易踩的坑
坑1:WebSocket 和 HTTP 端口混用
❌ 错误:用 http:// 或 https:// 开头
uni.connectSocket({
url: 'http://localhost:8080/chat' // ❌ 这是HTTP,不是WebSocket
})
✅ 正确:用 ws:// 或 wss:// 开头
uni.connectSocket({
url: 'ws://localhost:8080/chat' // ✅ 正确
})
// 线上环境用 wss://(加密)
uni.connectSocket({
url: 'wss://yourserver.com/chat' // ✅ 加密更安全
})
为什么重要?HTTP 和 WebSocket 是两种完全不同的协议,就像电话和电报虽然都能传递信息,但线路完全不同。
坑2:忘记处理连接断开的情况
❌ 错误:连接断了还继续发消息
// 断开后还尝试发送
socket.onClose(() => {
console.log('连接断开');
});
// 后续的 send() 调用都会失败,但没有任何处理
✅ 正确:断开时提示用户,修复后自动重连
let isConnected = false;
socket.onClose(() => {
isConnected = false;
console.log('连接已断开');
uni.showToast({
title: '连接已断开,正在重连...',
icon: 'none'
});
// 触发重连逻辑
});
socket.onOpen(() => {
isConnected = true;
});
// 发送前检查连接状态
function safeSend(data) {
if (isConnected) {
socket.send({ data });
} else {
console.log('连接未建立,消息未发送');
}
}
坑3:心跳间隔太短或太长
❌ 错误:心跳间隔太短,浪费流量
setInterval(() => {
socket.send({ data: 'ping' });
}, 1000); // ❌ 每秒发一次,太频繁了
❌ 错误:心跳间隔太长,连接可能被断
setInterval(() => {
socket.send({ data: 'ping' });
}, 600000); // ❌ 10分钟一次,基本没用
✅ 正确:一般 25-60 秒比较合适
setInterval(() => {
socket.send({ data: JSON.stringify({ type: 'ping' }) });
}, 30000); // ✅ 30秒,平衡之选
坑4:JSON 消息没有序列化就发送
❌ 错误:发送对象而不是字符串
socket.send({
data: { type: 'message', content: '你好' } // ❌ 对象不会自动转JSON
});
✅ 正确:先序列化
socket.send({
data: JSON.stringify({ type: 'message', content: '你好' }) // ✅
});
坑5:onMessage 里处理太多事情
❌ 错误:一个 onMessage 处理所有类型的消息
socket.onMessage((res) => {
// 100行代码处理各种情况,新手根本看不懂
if (res.data.type === 'message') { ... }
if (res.data.type === 'stock') { ... }
if (res.data.type === 'notification') { ... }
// 越来越长,维护困难
});
✅ 正确:分发到不同的处理器
// 消息分发器
const handlers = {
'message': handleChatMessage,
'stock': handleStockUpdate,
'notification': handleNotification
};
socket.onMessage((res) => {
const msg = JSON.parse(res.data);
const handler = handlers[msg.type];
if (handler) {
handler(msg); // 分发给专人处理
}
});
function handleChatMessage(msg) {
// 只处理聊天消息
}
function handleStockUpdate(msg) {
// 只处理股票更新
}
调试技巧:巧用日志
WebSocket 的问题很难调试,因为很多事情是"异步"发生的。学会打日志是关键:
import logging
# 配置日志
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
async def chat_handler(websocket, path):
logger.info(f"新客户端连接")
try:
async for message in websocket:
logger.debug(f"收到消息:{message}")
# 处理消息...
except websockets.exceptions.ConnectionClosed as e:
logger.warning(f"客户端异常断开:{e}")
运行时会看到详细的日志:
2024-01-15 14:30:01 - INFO - 新客户端连接
2024-01-15 14:30:02 - DEBUG - 收到消息:你好
2024-01-15 14:30:05 - WARNING - 客户端异常断开
✏️ 练习题
练习1(2分钟):改改心跳间隔
- 输入:给定一个心跳间隔为 60 秒的代码
- 预期输出:把心跳间隔改成 25 秒
- 提示:找到
setInterval那行,把 60000 改成 25000
练习2(2分钟):加个判断
- 输入:发送消息的代码没有判断连接状态
- 预期输出:发送前检查
isConnected是否为 true - 提示:用
if (isConnected)包裹发送逻辑
练习3(3分钟):换个数据源
- 输入:项目2中硬编码的股票代码列表
['AAPL', 'GOOGL', ...] - 预期输出:换成你自己的股票代码列表
- 提示:直接修改
STOCK_CODES数组即可
练习4(5分钟):串接两个项目
- 输入:项目2的股票监控 + 项目3的广播机制
- 预期输出:当股票价格变动超过阈值时,广播一条提醒给所有在线客户端
- 提示:在价格检查的地方,调用广播函数而不是只
print
练习5(5分钟):分析报错
- 输入:以下报错信息
WebSocket connection to 'ws://localhost:8080/chat' failed:
Error in connection establishment: net::ERR_CONNECTION_REFUSED
- 预期输出:说明问题原因,给出修复方法
- 提示:检查服务器是否启动,端口是否正确
作业:做一个「WebSocket 实时投票系统」
需求描述:
做一个多人实时投票工具,主持人发起投票,参与者实时看到票数变化。
功能点:
1. 主持人可以发起投票(输入问题 + 选项)
2. 参与者可以投票,票数实时更新给所有人
3. 投票结束后,主持人可以结束投票,显示最终结果
加分项:
1. 防止同一个人重复投票
2. 显示每个选项的实时百分比
验收标准:
- 能跑起来
- 多人打开页面能看到实时同步
- 代码有适当注释
提交方式:评论区贴代码或 GitHub 链接
📚 总结
这一章我们学了3个核心点:
1. WebSocket 是什么:一种保持连接、双向通信的技术,就像"一直在线的电话"
2. uniapp 怎么用:用 uni.connectSocket 建立连接,send 发消息,onMessage 收消息
3. 心跳 + 重连:保证连接稳定性的两个关键技术
推荐资源:
- uniapp 官方 WebSocket 文档
- WebSocket 协议详解 - MDN
- 《Python 网络编程》相关章节
互动钩子:
你的项目有没有遇到过"消息延迟"的问题?是用什么方法解决的?评论区聊聊,老粉优先回复!
下一章我们要进入一个更激动人心的领域——AI 接入。学完 WebSocket,你已经知道了怎么和服务器保持"一直在线的电话",下一章我们就用这个能力,让你的 App 能"打电话给 AI",接入 ChatGPT、通义千问、文心一言,做一个真正的智能助手!

评论(0)