uni.$emit 与 mitt 跨页通讯:事件总线是什么玩意儿
上回我们聊了 uni.request 封装,你已经能让 A 页面「打电话」给后端服务器了。但现实开发中,还有一个更骚的需求:页面 A 点了一下,页面 B、页面 C、页面 D 同时要知道——这就好比你发了一条朋友圈,你所有好友都能收到通知,而不是你一个个去敲门告诉他们。
这就是今天要解决的核心问题:跨页面通讯。
🎯 开场 3 分钟:为什么需要事件总线?
真实场景
想象你经营一家奶茶店:
- 顾客下单后,收银台要记录订单
- 后厨屏幕要跳出制作提示
- 店长手机要收到新订单通知
- 积分系统要给顾客加积分
没有事件总线的时候,收银台得像打电话一样,逐一通知后厨、店长、积分系统——耦合得一塌糊涂。
有了事件总线,收银台只需要喊一嗓子:「新订单!」,其他人各听各的,爱理不理。
这个「喊一嗓子」的机制,就是事件总线。
痛点问题
- ❌ 父子组件传值靠
props,跨 3 层就写死人\n\n
\n\n
\n\n - ❌ 非父子组件通讯靠
defineModel,找不到北 - ❌ 页面 A 想知道页面 B 干了啥,靠
onShow轮询?卡死你
学完本文,你就能用事件总线优雅地解决「一个页面动、其他页面跟着动」的问题。
🧱 基础 25 分钟:核心概念拆解
什么是事件总线?
类比:广播电台
事件总线就像一座广播电台:
- 订阅者:打开收音机,调到某个频道(FM107.8),等待节目
- 发布者:走进直播间,对着麦克风说话
- 广播塔(事件总线):把声音信号发射出去,所有订阅这个频道的人都能听到
在代码里:
# 事件总线就是个「传声筒」
class EventBus:
def __init__(self):
self.listeners = {} # 频道:监听器列表
# 订阅(开通收音机)
def on(self, event, callback):
if event not in self.listeners:
self.listeners[event] = []
self.listeners[event].append(callback)
# 发布(开始广播)
def emit(self, event, *args):
if event in self.listeners:
for callback in self.listeners[event]:
callback(*args)
# 取消订阅(关掉收音机)
def off(self, event, callback=None):
if event not in self.listeners:
return
if callback is None:
self.listeners[event] = []
else:
self.listeners[event].remove(callback)
解释一下:self.listeners 是个字典,key 是事件名(比如 "new_order"),value 是回调函数列表(所有订阅这个事件的人)。
为什么要用事件总线?
解决三大场景:
- 跨组件通讯:A 组件点了,B/C/D 组件同时响应
- 跨页面通讯:A 页面操作,B 页面自动刷新
- 解耦模块:发消息的不关心谁收,接收的不关心谁发
怎么用?(Python 版 mitt)
先安装一个轻量库 pymitter(Python 界的 mitt):
pip install pymitter
最小demo:
from pymitter import EventEmitter
# 创建事件总线实例(全局唯一的广播塔)
event_bus = EventEmitter()
# 订阅者 A:订阅 "order" 频道
def handle_new_order(data):
print(f"后厨收到订单:{data['name']},要{sizeof(data['size'])}杯")
event_bus.on("order", handle_new_order)
# 订阅者 B:订阅 "order" 频道
def send_notification(data):
print(f"推送通知:客户{data['name']}下单了")
event_bus.on("order", send_notification)
# 发布者:收银台下单
event_bus.emit("order", {"name": "小明", "size": "大"})
# 输出:
# 后厨收到订单:小明,要大杯
# 推送通知:客户小明下单了
解释一下:调用 emit("order", ...) 后,所有订阅了 "order" 的函数都会被调用,像广播一样。
一次性订阅(once)
有时候你只想要通知一次,比如「首次加载完成」:
def on_first_load():
print("首次加载!只说一次!")
event_bus.once("app_start", on_first_load)
event_bus.emit("app_start") # 输出:首次加载!只说一次!
event_bus.emit("app_start") # 不会输出,因为已经解绑了
解释一下:once 注册的回调只执行一次,执行后自动解绑。
取消订阅(off)
def handler(data):
print(f"收到:{data}")
event_bus.on("msg", handler)
event_bus.emit("msg", "你好") # 输出:收到:你好
event_bus.off("msg", handler) # 取消这个订阅
event_bus.emit("msg", "再见") # 不会输出,因为已经取消订阅
解释一下:off 就是「关掉收音机」,取消后就不再收到广播了。
mitt 风格的 Python 实现(手写版)
如果你不想用第三方库,10 行代码就能写一个简化版 mitt:
class Mitt:
def __init__(self):
self._events = {}
def on(self, event, handler):
self._events.setdefault(event, []).append(handler)
return lambda: self.off(event, handler) # 返回解绑函数
def emit(self, event, *args):
for handler in self._events.get(event, []):
handler(*args)
def off(self, event, handler):
if event in self._events:
self._events[event].remove(handler)
# 全局实例
mitt = Mitt()
# 订阅
unsub = mitt.on("user_login", lambda user: print(f"欢迎 {user}"))
mitt.on("user_login", lambda user: print(f"管理员知道了 {user}"))
# 发布
mitt.emit("user_login", "小明")
# 输出:
# 欢迎 小明
# 管理员知道了 小明
# 解绑
unsub() # 执行返回的 lambda,解绑第一个订阅
mitt.emit("user_login", "小红")
# 输出:管理员知道了 小红(第一个回调已解绑)
解释一下:这里 on 返回一个解绑函数,调用起来更方便。
🔥 实战 35 分钟:3 个递进项目
项目 1(5 分钟):课堂点名系统
场景:老师点一个名字,所有学生(终端)同时显示「到!」
from pymitter import EventEmitter
class ClassEventBus:
def __init__(self):
self.bus = EventEmitter()
def call_name(self, student_name):
print(f"\n📢 老师喊道:{student_name}")
self.bus.emit("call_name", student_name)
def student_join(self, name):
def response():
print(f" ✋ {name}:到!")
self.bus.on("call_name", response)
# 模拟 3 个学生上线
event_bus = ClassEventBus()
event_bus.student_join("小红")
event_bus.student_join("小明")
event_bus.student_join("小刚")
# 老师点名
event_bus.call_name("小明")
event_bus.call_name("小红")
event_bus.call_name("小刚")
# 预期输出:
# 📢 老师喊道:小明
# ✋ 小明:到!
# 📢 老师喊道:小红
# ✋ 小红:到!
# ✋ 小刚:到!
# 📢 老师喊道:小刚
# ✋ 小刚:到!
解释一下:老师发一次广播,所有订阅的学生都能收到并响应。
项目 2(15 分钟):股票行情实时通知
场景:从 CSV 读取股票数据,当价格波动超过 5% 时,通知所有「关注这只股票」的订阅者。
from pymitter import EventEmitter
class StockMonitor:
def __init__(self):
self.bus = EventEmitter()
self.subscriptions = {} # 股票代码:订阅者列表
self.last_prices = {} # 上一次价格
def watch_stock(self, code, callback):
"""订阅某只股票的波动通知"""
if code not in self.subscriptions:
self.subscriptions[code] = []
self.subscriptions[code].append(callback)
def update_price(self, code, new_price):
"""更新股价,超过5%波动就通知"""
old_price = self.last_prices.get(code, new_price)
self.last_prices[code] = new_price
if old_price != new_price:
change_pct = (new_price - old_price) / old_price * 100
if abs(change_pct) >= 5:
self.bus.emit("price_alert", {
"code": code,
"old": old_price,
"new": new_price,
"change_pct": change_pct
})
def process_data(self, csv_data):
"""处理 CSV 数据:每行格式 股票代码,价格"""
for line in csv_data.strip().split("\n"):
code, price = line.split(",")
self.update_price(code.strip(), float(price.strip()))
# 模拟数据
csv_data = """TSLA,250
TSLA,262
TSLA,245
AAPL,180
AAPL,189
AAPL,180"""
# 创建监控器
monitor = StockMonitor()
# 小明关注特斯拉
monitor.watch_stock("TSLA", lambda alert:
print(f"🚨 小明:{alert['code']} 波动 {alert['change_pct']:+.1f}%,${alert['old']} → ${alert['new']}"))
# 小红关注苹果
monitor.watch_stock("AAPL", lambda alert:
print(f"🚨 小红:{alert['code']} 波动 {alert['change_pct']:+.1f}%,${alert['old']} → ${alert['new']}"))
# 处理数据
print("📊 开始处理行情数据...\n")
monitor.process_data(csv_data)
# 预期输出:
# 📊 开始处理行情数据...
# 🚨 小明:TSLA 波动 +4.8%,$250 → $262(不触发,没到5%)
# 🚨 小明:TSLA 波动 -6.5%,$262 → $245(触发!)
# 🚨 小红:AAPL 波动 +5.0%,$180 → $189(触发!)
# 🚨 小红:AAPL 波动 -4.8%,$189 → $180(不触发)
解释一下:同一个事件 "price_alert",不同订阅者只收到自己关注股票的通知。
项目 3(15 分钟):待办清单 + 实时同步
场景:写一个命令行待办清单,添加/完成待办时,所有在线的「客户端」都能收到同步通知。
from pymitter import EventEmitter
class TodoServer:
def __init__(self):
self.bus = EventEmitter()
self.todos = []
def add_todo(self, task):
self.todos.append({"task": task, "done": False})
self.bus.emit("todo_added", task)
def complete_todo(self, task):
for todo in self.todos:
if todo["task"] == task:
todo["done"] = True
self.bus.emit("todo_completed", task)
return
self.bus.emit("todo_not_found", task)
def list_todos(self):
return self.todos
class TodoClient:
def __init__(self, name, server):
self.name = name
self.server = server
# 订阅各种事件
server.bus.on("todo_added", lambda task:
print(f"[{self.name}] 📝 新待办:{task}"))
server.bus.on("todo_completed", lambda task:
print(f"[{self.name}] ✅ 完成待办:{task}"))
server.bus.on("todo_not_found", lambda task:
print(f"[{self.name}] ❌ 找不到待办:{task}"))
# 启动服务器
server = TodoServer()
# 客户端上线(手机 A 和手机 B)
phone_a = TodoClient("手机A", server)
phone_b = TodoClient("手机B", server)
# 操作
print("=== 手机A 添加待办 ===")
server.add_todo("买牛奶")
server.add_todo("写作业")
print("\n=== 手机B 完成待办 ===")
server.complete_todo("买牛奶")
print("\n=== 尝试完成不存在的待办 ===")
server.complete_todo("洗衣服")
print("\n=== 当前待办列表 ===")
for todo in server.list_todos():
status = "✅" if todo["done"] else "⬜"
print(f"{status} {todo['task']}")
# 预期输出:
# === 手机A 添加待办 ===
# [手机B] 📝 新待办:买牛奶
# [手机B] 📝 新待办:写作业
# === 手机B 完成待办 ===
# [手机A] ✅ 完成待办:买牛奶
# === 尝试完成不存在的待办 ===
# [手机A] ❌ 找不到待办:洗衣服
# === 当前待办列表 ===
# ⬜ 写作业
# ✅ 买牛奶
解释一下:这就是一个「发布-订阅」模式的典型应用,服务器发布事件,各客户端自行响应。
💪 进阶 20 分钟:常见坑 + 性能小贴士
坑 1:内存泄漏——忘记取消订阅
# ❌ 错误示例:每次渲染都订阅,不解绑
def on_mount():
event_bus.on("data_update", handle_data) # 每次 mount 都订阅一次
# 页面切换 10 次 = 订阅了 10 次 = 内存泄漏
# ✅ 正确示例:组件销毁时解绑
def on_mount():
unsub = event_bus.on("data_update", handle_data)
return unsub # 组件销毁时调用 unsub()
# 或者用 once 替代 on(如果只需要一次)
event_bus.once("app_ready", on_app_ready)
坑 2:this 丢失(Python 里不存在,但 JS 里常见)
在 uniapp 的 mitt 中要注意:
// ❌ 错误:在回调里用 this
mitt.on('event', function(data) {
this.setData({ value: data }) // this 是 undefined!
})
// ✅ 正确:用箭头函数或 bind
mitt.on('event', (data) => {
this.setData({ value: data }) // this 指向组件实例
})
坑 3:同名事件被覆盖
# ❌ 错误:假设只绑定一次
mitt.on("message", handler1)
mitt.on("message", handler2) # 你以为会同时调用?
# 实际上会同时调用,没问题
# 但如果写成:
mitt.on("message", handler) # 绑定 handler
mitt.on("message", handler) # 再绑定同一个 handler
# 会调用两次!
# ✅ 正确:同一个 handler 只绑定一次
handlers = set()
def my_handler(data):
pass
if my_handler not in handlers:
mitt.on("message", my_handler)
handlers.add(my_handler)
坑 4:emit 放在订阅之前
# ❌ 错误:先广播,再开收音机,消息丢失
event_bus.emit("order", {"name": "小明"})
event_bus.on("order", lambda d: print(d))
# 小明什么都收不到!
# ✅ 正确:先订阅,再广播
event_bus.on("order", lambda d: print(d))
event_bus.emit("order", {"name": "小明"})
# 小明能收到
坑 5:传递可变对象被意外修改
# ❌ 危险示例:直接传递列表引用
data = ["a", "b"]
event_bus.emit("update", data)
data.append("c") # 修改了原数据
# 所有订阅者手里的数据也被改了!
# ✅ 正确:传递副本
import copy
event_bus.emit("update", copy.copy(data))
性能小贴士:大批量订阅用通配符
如果你的事件名有规律,可以用正则匹配:
# 订阅所有 user.* 事件
event_bus.on("user.*", lambda event, data:
print(f"收到 {event} 事件:{data}"))
event_bus.emit("user.login", {"id": 1})
event_bus.emit("user.logout", {"id": 1})
# 两个都会触发
调试技巧:打印所有事件
class DebugEventEmitter(EventEmitter):
def emit(self, event, *args):
print(f"📡 emit: {event}, args: {args}")
super().emit(event, *args)
def on(self, event, handler):
print(f"🔔 on: {event}")
return super().on(event, handler)
bus = DebugEventEmitter()
bus.on("test", lambda x: print(x))
bus.emit("test", "hello")
# 📡 emit: test, args: ('hello',)
# 🔔 on: test
# hello
✏️ 练习题
练习 1(2 分钟):基础抄改
- 输入:把项目 1 的学生名从
["小红", "小明", "小刚"]改成["Alice", "Bob", "Charlie"] - 预期输出:点名时显示英文名
- 提示:直接在代码里全局替换字符串即可
练习 2(2 分钟):加个条件判断
- 输入:在项目 1 中,只有当
student_name长度 > 2 时才广播 - 预期输出:点「小」开头的名字时,其他学生没反应
- 提示:在
call_name方法里加个 if 判断
练习 3(5 分钟):处理新数据
- 输入:用项目 2 的
StockMonitor处理以下数据:
BTC,50000
BTC,52500
BTC,47500
ETH,3000
ETH,3150
- 预期输出:显示波动超过 5% 的通知
- 提示:注意 BTC 从 52500 跌到 47500 是多少百分比
练习 4(8 分钟):串个项目 2 和 3
- 输入:把股票波动通知(项目 2)和待办清单(项目 3)串起来:当某只股票波动 > 5% 时,自动添加一条待办「关注 BTC 行情」
- 预期输出:波动时既打印通知,又添加待办
- 提示:在
update_price的 emit 前,调用server.add_todo()
练习 5(10 分钟):分析报错
- 输入:以下代码运行后会输出什么?
from pymitter import EventEmitter
bus = EventEmitter()
def handler(msg):
print(f"收到:{msg}")
bus.on("msg", handler)
bus.emit("msg", "第一次")
bus.off("msg") # 注意:没传 handler
bus.emit("msg", "第二次")
- 预期输出:分析为什么会这样
- 提示:
off(event)不传 handler 会清除所有订阅
作业:做一个「多人协作记事本」
需求描述:
做一个命令行版的多人协作记事本,模拟多个用户同时在线,当一个用户添加/删除/查看记事时,其他在线用户能收到实时通知。
功能点:
1. add_note(content) - 添加记事,所有人收到通知
2. del_note(note_id) - 删除记事,所有人收到通知
3. list_notes() - 查看当前所有记事
加分项:
1. 支持「私密记事」(只有指定用户能看)
2. 添加记事时 @ 某个用户,该用户收到特殊提示
验收标准:
- 能跑起来
- 操作时有广播通知输出
- 代码有注释
📚 总结 + 资源
本文学了 3 件事:
1. 事件总线就是个「广播塔」,发布者发消息,订阅者收消息
2. 订阅用 on(),发布用 emit(),取消用 off()
3. 解耦是关键:发送方不关心谁收,接收方不关心谁发
延伸资源:
- pymitter 官方文档 - Python 事件发射器库
- 《设计模式》- 订阅发布模式(23 种设计模式之一)
- Vue 3 官方文档 - 依赖注入 - 另一种跨组件通讯方式
你在项目里遇到过「页面 A 动了,页面 B 必须跟着动」的场景吗?怎么解决的?评论区聊聊,老粉优先回复!
下一章我们要聊的是「3.3 uView UI 组件库」——学完事件总线,你会知道为什么 uView 的很多组件能「即插即用」,它们内部靠的就是事件总线在通讯。敬请期待!

评论(0)