第9章 9.2 实时通信:WebSocket

🎯 开场:为什么你的App需要"一直在线的电话"?

上一章我们搞定了 uniCloud 云开发,学会了怎么把数据存到云上。但你有没有遇到过这种情况:

  • 聊天软件:消息来了,App居然要等好几秒甚至更久才能收到
  • 股票行情:价格变了,屏幕上还是旧数据,眼睁睁看着机会溜走
  • 在线游戏:明明按了技能,人物却慢半拍才动,被队友骂成"猪队友"

这些问题,本质上都是一个原因:你的App只能用" запрос-响应"模式——就像你发微信,得等对方回,你才能看到新消息。但有些场景,你需要对方主动找你,不需要你一次次去问"在吗在吗在吗"。

WebSocket 就是来解决这个问题的——它让你的App和服务器之间保持一个"一直在线的电话",服务器有新消息可以直接"打电话"告诉你,不用你不停拨过去问"有新消息吗"。

这一章学完,你就能写出:
- 一个聊天室,消息秒收秒发
- 一个实时股价监控小工具
- 带心跳自动重连的稳定连接


\n\nSimple tech illustration expla\n\nAI comic creation scene, creat\n\n

🧱 基础:WebSocket 到底是什么?

9.2.1 生活类比:座机 vs 微信语音

想象一下通信方式的发展:

方式 类比 工作方式
HTTP 请求 发短信 你发一条,对方回一条,你再发一条...每次都得等
WebSocket 微信语音通话 连接一次,双方可以随时互相说话,直到挂断

WebSocket 的核心特点
- ✅ 双向通信:服务器能主动推消息给客户端
- ✅ 保持连接:一次握手,持续通信,不用反复建立连接
- ✅ 低延迟:消息来了马上到,不用等轮询

9.2.2 uniapp 里怎么用 WebSocket?

uniapp 给我们封装好了 uni.connectSocket 系列 API,用起来非常方便。

先来看最基础的连接代码

# 引入 uniCloud 相关的 WebSocket 模块
# 注意:这是 uniapp 的 JS API,但原理是相通的

// 建立连接
const socket = uni.connectSocket({
url: 'ws://localhost:8080/chat',  // WebSocket 地址,ws:// 开头
success: () => {
console.log('连接成功!');
},
fail: (err) => {
console.log('连接失败:', err);
}
});

// 监听消息
socket.onMessage((res) => {
console.log('收到消息:', res.data);
});

// 监听关闭
socket.onClose(() => {
console.log('连接断开了');
});

这段代码干了3件事:
1. connectSocket 建立一个 WebSocket 连接
2. onMessage 监听服务器发来的消息
3. onClose 监听连接关闭事件

9.2.3 发送消息:就像发微信

连接成功后,发送消息特别简单:

// 发送文本消息
socket.send({
data: '你好,我是小明!',  // 要发送的内容
success: () => {
console.log('发送成功');
}
});

发送 JSON 数据也很常见:

// 发送 JSON 格式的消息
socket.send({
data: JSON.stringify({
type: 'message',
content: '大家好啊',
from: 'xiaoming',
timestamp: Date.now()
})
});

9.2.4 心跳机制:为什么需要"保活"?

你有没有过这种经历:微信开着但很久没操作,再发消息就提示"连接断开,请检查网络"?

WebSocket 连接也是一样的道理。如果长时间没有数据交换,服务器或防火墙可能会认为连接"死"了,自动把它断掉。

心跳机制就是解决这个问题:用一个小小的" ping "消息,定期告诉对方"我还活着"。

// 定时发送心跳
let heartbeatTimer = null;

function startHeartbeat() {
heartbeatTimer = setInterval(() => {
socket.send({
  data: JSON.stringify({ type: 'ping' })
});
}, 30000);  // 每30秒发一次心跳
}

function stopHeartbeat() {
if (heartbeatTimer) {
clearInterval(heartbeatTimer);
heartbeatTimer = null;
}
}

// 连接成功时启动心跳
socket.onOpen(() => {
startHeartbeat();
});

// 断开时停止心跳
socket.onClose(() => {
stopHeartbeat();
});

9.2.5 自动重连:断了还能自动连上

网络不稳定时,连接可能会意外断开。好的程序应该能自动重连:

let reconnectTimer = null;
let reconnectAttempts = 0;
const MAX_RECONNECT_ATTEMPTS = 5;

function connect() {
const socket = uni.connectSocket({
url: 'ws://localhost:8080/chat'
});

// 监听关闭事件
socket.onClose(() => {
// 如果还没达到最大重试次数,就尝试重连
if (reconnectAttempts < MAX_RECONNECT_ATTEMPTS) {
  reconnectAttempts++;
  console.log(`连接断开,${reconnectAttempts}秒后尝试第${reconnectAttempts}次重连...`);

  reconnectTimer = setTimeout(() => {
    connect();  // 递归调用,重新连接
  }, 1000 * reconnectAttempts);  // 间隔时间递增
} else {
  console.log('重连次数用完,放弃治疗');
}
});

// 连接成功时重置计数
socket.onOpen(() => {
reconnectAttempts = 0;
console.log('连接恢复!');
});
}

为什么要间隔递增?如果你一直用1秒的间隔狂轰滥炸,服务器可能会把你当成攻击者拉黑。


🔥 实战:3个递进小项目

项目1:最简聊天室(5分钟)

先跑起来,感受一下 WebSocket 的魔力。

场景:你和朋友在命令行里聊天(用本地测试服务器模拟)

# server.py - WebSocket 服务器(用 Python 的 websockets 库)
# 运行:pip install websockets
# 然后:python server.py

import asyncio
import websockets
from datetime import datetime

async def chat_handler(websocket, path):
"""处理每个客户端的聊天消息"""
client_addr = websocket.remote_address
print(f"新玩家上线:{client_addr}")

try:
    async for message in websocket:
        print(f"收到 {client_addr} 的消息:{message}")

        # 广播给所有在线客户端(简单起见,这里只返回给发送者)
        response = f"[{datetime.now().strftime('%H:%M:%S')}] 你说:{message}"
        await websocket.send(response)

except websockets.exceptions.ConnectionClosed:
    print(f"玩家 {client_addr} 离开了")

# 启动服务器
start_server = websockets.serve(chat_handler, "localhost", 8080)
print("聊天室服务器启动,端口 8080...")
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

客户端代码(uniapp 端):

// chat_client.js - uniapp WebSocket 聊天客户端

// 建立连接
const socket = uni.connectSocket({
url: 'ws://localhost:8080'
});

// 连接成功
socket.onOpen(() => {
console.log('已进入聊天室');
uni.showToast({ title: '进入聊天室', icon: 'success' });
});

// 收到消息
socket.onMessage((res) => {
console.log('收到消息:', res.data);
// 可以把消息显示到页面上
});

// 发送消息函数
function sendMessage(content) {
socket.send({
data: content
});
}

// 示例:发送"你好"
sendMessage('你好');

// 监听关闭
socket.onClose(() => {
console.log('连接已断开');
});

预期输出(服务器端):

聊天室服务器启动,端口 8080...
新玩家上线:('127.0.0.1', 52345)
收到 ('127.0.0.1', 52345) 的消息:你好

一句话解释:服务器收到消息后,会加上时间戳再返回给你,这就是"实时对话"的感觉。


项目2:实时股票价格监控(15分钟)

场景:你持有某只股票,想盯着价格,到某个点位就提醒你卖出。

这个项目从 CSV 文件读取股票代码,实时监控价格(我们用模拟数据,实际项目中可以接真实 API)。

# stock_monitor.py - 股票价格实时监控

import asyncio
import websockets
import json
import random
from datetime import datetime

# 模拟股票数据(实际项目中可以从数据库或API获取)
STOCK_CODES = ['AAPL', 'GOOGL', 'MSFT', 'AMZN', 'TSLA']

def generate_stock_price(code):
"""模拟生成股票价格(实际项目替换为真实API调用)"""
base_prices = {'AAPL': 175, 'GOOGL': 140, 'MSFT': 380, 'AMZN': 180, 'TSLA': 250}
price = base_prices.get(code, 100)
# 模拟价格波动 ±2%
动 = random.uniform(-0.02, 0.02)
return round(price * (1 + 变动), 2)

async def stock_client():
"""WebSocket 客户端:连接服务器获取股票价格"""
uri = "ws://localhost:8081"

# 设置报警阈值
alert_threshold = 0.5  # 价格变化超过0.5%就报警

async with websockets.connect(uri) as websocket:
    print(f"已连接股票行情服务器")

    for _ in range(10):  # 模拟接收10次行情
        # 接收服务器推送的行情数据
        data = await websocket.recv()
        stock_data = json.loads(data)

        current_time = datetime.now().strftime('%H:%M:%S')
        code = stock_data['code']
        price = stock_data['price']
        change = stock_data['change_percent']

        # 打印行情
        color_indicator = '↑' if change > 0 else '↓'
        print(f"[{current_time}] {code}: ${price} {color_indicator}{abs(change):.2f}%")

        # 检查是否需要报警
        if abs(change) > alert_threshold:
            print(f"🚨 报警!{code} 价格变动超过阈值!")

        await asyncio.sleep(1)  # 每秒更新一次

# 以下是简单服务器实现(推送模拟行情)
async def stock_server(websocket, path):
"""服务器:定期推送股票行情"""
for _ in range(100):
    # 随机选一只股票推送
    code = random.choice(STOCK_CODES)
    price = generate_stock_price(code)
    prev_price = price * random.uniform(0.98, 1.02)
    change_percent = ((price - prev_price) / prev_price) * 100

    data = json.dumps({
        'code': code,
        'price': price,
        'change_percent': change_percent,
        'timestamp': datetime.now().isoformat()
    })

    await websocket.send(data)
    await asyncio.sleep(0.5)

async def main():
# 启动服务器(在另一个线程或进程中运行)
# 这里简化处理,假设服务器已经在运行
print("开始监控股票行情...")
await stock_client()

# 运行客户端
asyncio.run(main())

预期输出

开始监控股票行情...
已连接股票行情服务器
[14:30:01] AAPL: $174.23 ↑0.15%
[14:30:02] TSLA: $248.50 ↓0.62%
🚨 报警!TSLA 价格变动超过阈值!
[14:30:03] GOOGL: $141.20 ↑0.85%
[14:30:04] MSFT: $379.80 ↓0.05%
...

一句话解释:WebSocket 让服务器能主动把最新的股票价格"推"给你,不用你每秒发一次请求去"问"价格。


项目3:待办清单实时同步工具(15分钟)

场景:你和小明一起做一个项目,想共享一个待办清单,任何一个人加了新任务,另一个人马上能看到。

这个项目组合了项目1的通信基础 + 项目2的数据结构,是个小而美的协作工具。

# todo_sync.py - 实时同步待办清单

import asyncio
import websockets
import json
from datetime import datetime

class TodoList:
"""待办清单类"""
def __init__(self):
    self.items = []  # 存储待办项
    self.listeners = []  # 存储所有在线客户端

def add_item(self, text, priority='normal'):
    """添加待办项"""
    item = {
        'id': len(self.items) + 1,
        'text': text,
        'priority': priority,  # high, normal, low
        'done': False,
        'created_at': datetime.now().isoformat()
    }
    self.items.append(item)
    return item

def complete_item(self, item_id):
    """标记完成"""
    for item in self.items:
        if item['id'] == item_id:
            item['done'] = True
            item['completed_at'] = datetime.now().isoformat()
            return True
    return False

def get_all(self):
    """获取所有待办"""
    return self.items

def to_json(self):
    return json.dumps({
        'type': 'sync',
        'data': self.items
    })

# 全局待办清单实例
todo_list = TodoList()

# 初始化一些待办
todo_list.add_item('完成WebSocket教程', priority='high')
todo_list.add_item('写练习题', priority='normal')
todo_list.add_item('录视频讲解', priority='low')

async def handle_client(websocket, path):
"""处理客户端连接"""
client_id = id(websocket)
print(f"客户端 {client_id} 连接")

# 发送当前所有待办给新连接的客户端
await websocket.send(todo_list.to_json())

try:
    async for message in websocket:
        # 解析客户端发来的命令
        command = json.loads(message)
        action = command.get('action')

        if action == 'add':
            # 添加新待办
            text = command.get('text', '')
            priority = command.get('priority', 'normal')
            new_item = todo_list.add_item(text, priority)
            print(f"添加待办:{text}")

            # 广播给所有客户端
            for listener in todo_list.listeners:
                try:
                    await listener.send(todo_list.to_json())
                except:
                    pass

        elif action == 'complete':
            # 标记完成
            item_id = command.get('id')
            if todo_list.complete_item(item_id):
                print(f"待办 {item_id} 已完成")
                # 广播更新
                for listener in todo_list.listeners:
                    try:
                        await listener.send(todo_list.to_json())
                    except:
                        pass

        elif action == 'get':
            # 客户端请求最新数据
            await websocket.send(todo_list.to_json())

except websockets.exceptions.ConnectionClosed:
    print(f"客户端 {client_id} 断开")
finally:
    if websocket in todo_list.listeners:
        todo_list.listeners.remove(websocket)

async main():
# 启动服务器
async with websockets.serve(handle_client, "localhost", 8082):
    print("待办清单同步服务器启动!")
    print("监听端口 8082...")
    await asyncio.Future()  # 永久运行

asyncio.run(main())

客户端测试代码

# test_todo_client.py - 测试客户端

import asyncio
import websockets
import json

async def test_client():
uri = "ws://localhost:8082"

async with websockets.connect(uri) as socket:
    # 接收初始数据
    initial_data = await socket.recv()
    print("当前待办清单:")
    print(initial_data)

    # 添加一个新待办
    await socket.send(json.dumps({
        'action': 'add',
        'text': '测试添加的待办',
        'priority': 'high'
    }))
    print("已发送添加请求")

    # 等待同步更新
    await asyncio.sleep(1)
    updated_data = await socket.recv()
    print("更新后的待办:")
    print(updated_data)

asyncio.run(test_client())

预期输出

当前待办清单:
{"type": "sync", "data": [{"id": 1, "text": "完成WebSocket教程", ...}, ...]}
已发送添加请求
更新后的待办:
{"type": "sync", "data": [..., {"id": 4, "text": "测试添加的待办", ...}]}

一句话解释:任何客户端修改了待办清单,服务器就会把最新数据"广播"给所有在线的人,实现实时同步。


💪 进阶:新手容易踩的坑

坑1:WebSocket 和 HTTP 端口混用

错误:用 http://https:// 开头

uni.connectSocket({
url: 'http://localhost:8080/chat'  // ❌ 这是HTTP,不是WebSocket
})

正确:用 ws://wss:// 开头

uni.connectSocket({
url: 'ws://localhost:8080/chat'   // ✅ 正确
})

// 线上环境用 wss://(加密)
uni.connectSocket({
url: 'wss://yourserver.com/chat'  // ✅ 加密更安全
})

为什么重要?HTTP 和 WebSocket 是两种完全不同的协议,就像电话和电报虽然都能传递信息,但线路完全不同。


坑2:忘记处理连接断开的情况

错误:连接断了还继续发消息

// 断开后还尝试发送
socket.onClose(() => {
console.log('连接断开');
});
// 后续的 send() 调用都会失败,但没有任何处理

正确:断开时提示用户,修复后自动重连

let isConnected = false;

socket.onClose(() => {
isConnected = false;
console.log('连接已断开');
uni.showToast({
title: '连接已断开,正在重连...',
icon: 'none'
});
// 触发重连逻辑
});

socket.onOpen(() => {
isConnected = true;
});

// 发送前检查连接状态
function safeSend(data) {
if (isConnected) {
socket.send({ data });
} else {
console.log('连接未建立,消息未发送');
}
}

坑3:心跳间隔太短或太长

错误:心跳间隔太短,浪费流量

setInterval(() => {
socket.send({ data: 'ping' });
}, 1000);  // ❌ 每秒发一次,太频繁了

错误:心跳间隔太长,连接可能被断

setInterval(() => {
socket.send({ data: 'ping' });
}, 600000);  // ❌ 10分钟一次,基本没用

正确:一般 25-60 秒比较合适

setInterval(() => {
socket.send({ data: JSON.stringify({ type: 'ping' }) });
}, 30000);  // ✅ 30秒,平衡之选

坑4:JSON 消息没有序列化就发送

错误:发送对象而不是字符串

socket.send({
data: { type: 'message', content: '你好' }  // ❌ 对象不会自动转JSON
});

正确:先序列化

socket.send({
data: JSON.stringify({ type: 'message', content: '你好' })  // ✅
});

坑5:onMessage 里处理太多事情

错误:一个 onMessage 处理所有类型的消息

socket.onMessage((res) => {
// 100行代码处理各种情况,新手根本看不懂
if (res.data.type === 'message') { ... }
if (res.data.type === 'stock') { ... }
if (res.data.type === 'notification') { ... }
// 越来越长,维护困难
});

正确:分发到不同的处理器

// 消息分发器
const handlers = {
'message': handleChatMessage,
'stock': handleStockUpdate,
'notification': handleNotification
};

socket.onMessage((res) => {
const msg = JSON.parse(res.data);
const handler = handlers[msg.type];
if (handler) {
handler(msg);  // 分发给专人处理
}
});

function handleChatMessage(msg) {
// 只处理聊天消息
}

function handleStockUpdate(msg) {
// 只处理股票更新
}

调试技巧:巧用日志

WebSocket 的问题很难调试,因为很多事情是"异步"发生的。学会打日志是关键:

import logging

# 配置日志
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s'
)

logger = logging.getLogger(__name__)

async def chat_handler(websocket, path):
logger.info(f"新客户端连接")

try:
    async for message in websocket:
        logger.debug(f"收到消息:{message}")
        # 处理消息...

except websockets.exceptions.ConnectionClosed as e:
    logger.warning(f"客户端异常断开:{e}")

运行时会看到详细的日志:

2024-01-15 14:30:01 - INFO - 新客户端连接
2024-01-15 14:30:02 - DEBUG - 收到消息:你好
2024-01-15 14:30:05 - WARNING - 客户端异常断开

✏️ 练习题

练习1(2分钟):改改心跳间隔

  • 输入:给定一个心跳间隔为 60 秒的代码
  • 预期输出:把心跳间隔改成 25 秒
  • 提示:找到 setInterval 那行,把 60000 改成 25000

练习2(2分钟):加个判断

  • 输入:发送消息的代码没有判断连接状态
  • 预期输出:发送前检查 isConnected 是否为 true
  • 提示:用 if (isConnected) 包裹发送逻辑

练习3(3分钟):换个数据源

  • 输入:项目2中硬编码的股票代码列表 ['AAPL', 'GOOGL', ...]
  • 预期输出:换成你自己的股票代码列表
  • 提示:直接修改 STOCK_CODES 数组即可

练习4(5分钟):串接两个项目

  • 输入:项目2的股票监控 + 项目3的广播机制
  • 预期输出:当股票价格变动超过阈值时,广播一条提醒给所有在线客户端
  • 提示:在价格检查的地方,调用广播函数而不是只 print

练习5(5分钟):分析报错

  • 输入:以下报错信息
WebSocket connection to 'ws://localhost:8080/chat' failed: 
Error in connection establishment: net::ERR_CONNECTION_REFUSED
  • 预期输出:说明问题原因,给出修复方法
  • 提示:检查服务器是否启动,端口是否正确

作业:做一个「WebSocket 实时投票系统」

需求描述
做一个多人实时投票工具,主持人发起投票,参与者实时看到票数变化。

功能点
1. 主持人可以发起投票(输入问题 + 选项)
2. 参与者可以投票,票数实时更新给所有人
3. 投票结束后,主持人可以结束投票,显示最终结果

加分项
1. 防止同一个人重复投票
2. 显示每个选项的实时百分比

验收标准
- 能跑起来
- 多人打开页面能看到实时同步
- 代码有适当注释

提交方式:评论区贴代码或 GitHub 链接


📚 总结

这一章我们学了3个核心点:
1. WebSocket 是什么:一种保持连接、双向通信的技术,就像"一直在线的电话"
2. uniapp 怎么用:用 uni.connectSocket 建立连接,send 发消息,onMessage 收消息
3. 心跳 + 重连:保证连接稳定性的两个关键技术

推荐资源
- uniapp 官方 WebSocket 文档
- WebSocket 协议详解 - MDN
- 《Python 网络编程》相关章节

互动钩子

你的项目有没有遇到过"消息延迟"的问题?是用什么方法解决的?评论区聊聊,老粉优先回复!

下一章我们要进入一个更激动人心的领域——AI 接入。学完 WebSocket,你已经知道了怎么和服务器保持"一直在线的电话",下一章我们就用这个能力,让你的 App 能"打电话给 AI",接入 ChatGPT、通义千问、文心一言,做一个真正的智能助手!

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。