uni.$emit 与 mitt 跨页通讯:事件总线是什么玩意儿

上回我们聊了 uni.request 封装,你已经能让 A 页面「打电话」给后端服务器了。但现实开发中,还有一个更骚的需求:页面 A 点了一下,页面 B、页面 C、页面 D 同时要知道——这就好比你发了一条朋友圈,你所有好友都能收到通知,而不是你一个个去敲门告诉他们。

这就是今天要解决的核心问题:跨页面通讯


🎯 开场 3 分钟:为什么需要事件总线?

真实场景

想象你经营一家奶茶店:

  • 顾客下单后,收银台要记录订单
  • 后厨屏幕要跳出制作提示
  • 店长手机要收到新订单通知
  • 积分系统要给顾客加积分

没有事件总线的时候,收银台得像打电话一样,逐一通知后厨、店长、积分系统——耦合得一塌糊涂。

有了事件总线,收银台只需要喊一嗓子:「新订单!」,其他人各听各的,爱理不理。

这个「喊一嗓子」的机制,就是事件总线。

痛点问题

  • ❌ 父子组件传值靠 props,跨 3 层就写死人\n\nSimple tech illustration expla\n\nAI comic creation scene, creat\n\n
  • ❌ 非父子组件通讯靠 defineModel,找不到北
  • ❌ 页面 A 想知道页面 B 干了啥,靠 onShow 轮询?卡死你

学完本文,你就能用事件总线优雅地解决「一个页面动、其他页面跟着动」的问题。


🧱 基础 25 分钟:核心概念拆解

什么是事件总线?

类比:广播电台

事件总线就像一座广播电台:
- 订阅者:打开收音机,调到某个频道(FM107.8),等待节目
- 发布者:走进直播间,对着麦克风说话
- 广播塔(事件总线):把声音信号发射出去,所有订阅这个频道的人都能听到

在代码里:

# 事件总线就是个「传声筒」
class EventBus:
def __init__(self):
    self.listeners = {}  # 频道:监听器列表


# 订阅(开通收音机)
def on(self, event, callback):
    if event not in self.listeners:
        self.listeners[event] = []
    self.listeners[event].append(callback)

# 发布(开始广播)
def emit(self, event, *args):
    if event in self.listeners:
        for callback in self.listeners[event]:
            callback(*args)

# 取消订阅(关掉收音机)
def off(self, event, callback=None):
    if event not in self.listeners:
        return
    if callback is None:
        self.listeners[event] = []
    else:
        self.listeners[event].remove(callback)

解释一下:self.listeners 是个字典,key 是事件名(比如 "new_order"),value 是回调函数列表(所有订阅这个事件的人)。

为什么要用事件总线?

解决三大场景:

  1. 跨组件通讯:A 组件点了,B/C/D 组件同时响应
  2. 跨页面通讯:A 页面操作,B 页面自动刷新
  3. 解耦模块:发消息的不关心谁收,接收的不关心谁发

怎么用?(Python 版 mitt)

先安装一个轻量库 pymitter(Python 界的 mitt):

pip install pymitter

最小demo:

from pymitter import EventEmitter

# 创建事件总线实例(全局唯一的广播塔)
event_bus = EventEmitter()

# 订阅者 A:订阅 "order" 频道
def handle_new_order(data):
print(f"后厨收到订单:{data['name']},要{sizeof(data['size'])}杯")

event_bus.on("order", handle_new_order)

# 订阅者 B:订阅 "order" 频道  
def send_notification(data):
print(f"推送通知:客户{data['name']}下单了")

event_bus.on("order", send_notification)

# 发布者:收银台下单
event_bus.emit("order", {"name": "小明", "size": "大"})
# 输出:
# 后厨收到订单:小明,要大杯
# 推送通知:客户小明下单了

解释一下:调用 emit("order", ...) 后,所有订阅了 "order" 的函数都会被调用,像广播一样。

一次性订阅(once)

有时候你只想要通知一次,比如「首次加载完成」:

def on_first_load():
print("首次加载!只说一次!")

event_bus.once("app_start", on_first_load)

event_bus.emit("app_start")  # 输出:首次加载!只说一次!
event_bus.emit("app_start")  # 不会输出,因为已经解绑了

解释一下:once 注册的回调只执行一次,执行后自动解绑。

取消订阅(off)

def handler(data):
print(f"收到:{data}")

event_bus.on("msg", handler)
event_bus.emit("msg", "你好")  # 输出:收到:你好

event_bus.off("msg", handler)  # 取消这个订阅
event_bus.emit("msg", "再见")  # 不会输出,因为已经取消订阅

解释一下:off 就是「关掉收音机」,取消后就不再收到广播了。

mitt 风格的 Python 实现(手写版)

如果你不想用第三方库,10 行代码就能写一个简化版 mitt:

class Mitt:
def __init__(self):
    self._events = {}

def on(self, event, handler):
    self._events.setdefault(event, []).append(handler)
    return lambda: self.off(event, handler)  # 返回解绑函数

def emit(self, event, *args):
    for handler in self._events.get(event, []):
        handler(*args)

def off(self, event, handler):
    if event in self._events:
        self._events[event].remove(handler)

# 全局实例
mitt = Mitt()

# 订阅
unsub = mitt.on("user_login", lambda user: print(f"欢迎 {user}"))
mitt.on("user_login", lambda user: print(f"管理员知道了 {user}"))

# 发布
mitt.emit("user_login", "小明")
# 输出:
# 欢迎 小明
# 管理员知道了 小明

# 解绑
unsub()  # 执行返回的 lambda,解绑第一个订阅
mitt.emit("user_login", "小红")
# 输出:管理员知道了 小红(第一个回调已解绑)

解释一下:这里 on 返回一个解绑函数,调用起来更方便。


🔥 实战 35 分钟:3 个递进项目

项目 1(5 分钟):课堂点名系统

场景:老师点一个名字,所有学生(终端)同时显示「到!」

from pymitter import EventEmitter

class ClassEventBus:
def __init__(self):
    self.bus = EventEmitter()

def call_name(self, student_name):
    print(f"\n📢 老师喊道:{student_name}")
    self.bus.emit("call_name", student_name)

def student_join(self, name):
    def response():
        print(f"  ✋ {name}:到!")
    self.bus.on("call_name", response)

# 模拟 3 个学生上线
event_bus = ClassEventBus()
event_bus.student_join("小红")
event_bus.student_join("小明")
event_bus.student_join("小刚")

# 老师点名
event_bus.call_name("小明")
event_bus.call_name("小红")
event_bus.call_name("小刚")

# 预期输出:
# 📢 老师喊道:小明
#   ✋ 小明:到!
# 📢 老师喊道:小红
#   ✋ 小红:到!
#   ✋ 小刚:到!
# 📢 老师喊道:小刚
#   ✋ 小刚:到!

解释一下:老师发一次广播,所有订阅的学生都能收到并响应。


项目 2(15 分钟):股票行情实时通知

场景:从 CSV 读取股票数据,当价格波动超过 5% 时,通知所有「关注这只股票」的订阅者。

from pymitter import EventEmitter

class StockMonitor:
def __init__(self):
    self.bus = EventEmitter()
    self.subscriptions = {}  # 股票代码:订阅者列表
    self.last_prices = {}    # 上一次价格

def watch_stock(self, code, callback):
    """订阅某只股票的波动通知"""
    if code not in self.subscriptions:
        self.subscriptions[code] = []
    self.subscriptions[code].append(callback)

def update_price(self, code, new_price):
    """更新股价,超过5%波动就通知"""
    old_price = self.last_prices.get(code, new_price)
    self.last_prices[code] = new_price

    if old_price != new_price:
        change_pct = (new_price - old_price) / old_price * 100
        if abs(change_pct) >= 5:
            self.bus.emit("price_alert", {
                "code": code,
                "old": old_price,
                "new": new_price,
                "change_pct": change_pct
            })

def process_data(self, csv_data):
    """处理 CSV 数据:每行格式 股票代码,价格"""
    for line in csv_data.strip().split("\n"):
        code, price = line.split(",")
        self.update_price(code.strip(), float(price.strip()))

# 模拟数据
csv_data = """TSLA,250
TSLA,262
TSLA,245
AAPL,180
AAPL,189
AAPL,180"""

# 创建监控器
monitor = StockMonitor()

# 小明关注特斯拉
monitor.watch_stock("TSLA", lambda alert: 
print(f"🚨 小明:{alert['code']} 波动 {alert['change_pct']:+.1f}%,${alert['old']} → ${alert['new']}"))

# 小红关注苹果
monitor.watch_stock("AAPL", lambda alert:
print(f"🚨 小红:{alert['code']} 波动 {alert['change_pct']:+.1f}%,${alert['old']} → ${alert['new']}"))

# 处理数据
print("📊 开始处理行情数据...\n")
monitor.process_data(csv_data)

# 预期输出:
# 📊 开始处理行情数据...
# 🚨 小明:TSLA 波动 +4.8%,$250 → $262(不触发,没到5%)
# 🚨 小明:TSLA 波动 -6.5%,$262 → $245(触发!)
# 🚨 小红:AAPL 波动 +5.0%,$180 → $189(触发!)
# 🚨 小红:AAPL 波动 -4.8%,$189 → $180(不触发)

解释一下:同一个事件 "price_alert",不同订阅者只收到自己关注股票的通知。


项目 3(15 分钟):待办清单 + 实时同步

场景:写一个命令行待办清单,添加/完成待办时,所有在线的「客户端」都能收到同步通知。

from pymitter import EventEmitter

class TodoServer:
def __init__(self):
    self.bus = EventEmitter()
    self.todos = []

def add_todo(self, task):
    self.todos.append({"task": task, "done": False})
    self.bus.emit("todo_added", task)

def complete_todo(self, task):
    for todo in self.todos:
        if todo["task"] == task:
            todo["done"] = True
            self.bus.emit("todo_completed", task)
            return
    self.bus.emit("todo_not_found", task)

def list_todos(self):
    return self.todos

class TodoClient:
def __init__(self, name, server):
    self.name = name
    self.server = server

    # 订阅各种事件
    server.bus.on("todo_added", lambda task: 
        print(f"[{self.name}] 📝 新待办:{task}"))
    server.bus.on("todo_completed", lambda task:
        print(f"[{self.name}] ✅ 完成待办:{task}"))
    server.bus.on("todo_not_found", lambda task:
        print(f"[{self.name}] ❌ 找不到待办:{task}"))

# 启动服务器
server = TodoServer()

# 客户端上线(手机 A 和手机 B)
phone_a = TodoClient("手机A", server)
phone_b = TodoClient("手机B", server)

# 操作
print("=== 手机A 添加待办 ===")
server.add_todo("买牛奶")
server.add_todo("写作业")

print("\n=== 手机B 完成待办 ===")
server.complete_todo("买牛奶")

print("\n=== 尝试完成不存在的待办 ===")
server.complete_todo("洗衣服")

print("\n=== 当前待办列表 ===")
for todo in server.list_todos():
status = "✅" if todo["done"] else "⬜"
print(f"{status} {todo['task']}")

# 预期输出:
# === 手机A 添加待办 ===
# [手机B] 📝 新待办:买牛奶
# [手机B] 📝 新待办:写作业
# === 手机B 完成待办 ===
# [手机A] ✅ 完成待办:买牛奶
# === 尝试完成不存在的待办 ===
# [手机A] ❌ 找不到待办:洗衣服
# === 当前待办列表 ===
# ⬜ 写作业
# ✅ 买牛奶

解释一下:这就是一个「发布-订阅」模式的典型应用,服务器发布事件,各客户端自行响应。


💪 进阶 20 分钟:常见坑 + 性能小贴士

坑 1:内存泄漏——忘记取消订阅

# ❌ 错误示例:每次渲染都订阅,不解绑
def on_mount():
event_bus.on("data_update", handle_data)  # 每次 mount 都订阅一次

# 页面切换 10 次 = 订阅了 10 次 = 内存泄漏
# ✅ 正确示例:组件销毁时解绑
def on_mount():
unsub = event_bus.on("data_update", handle_data)
return unsub  # 组件销毁时调用 unsub()

# 或者用 once 替代 on(如果只需要一次)
event_bus.once("app_ready", on_app_ready)

坑 2:this 丢失(Python 里不存在,但 JS 里常见)

在 uniapp 的 mitt 中要注意:

// ❌ 错误:在回调里用 this
mitt.on('event', function(data) {
this.setData({ value: data })  // this 是 undefined!
})

// ✅ 正确:用箭头函数或 bind
mitt.on('event', (data) => {
this.setData({ value: data })  // this 指向组件实例
})

坑 3:同名事件被覆盖

# ❌ 错误:假设只绑定一次
mitt.on("message", handler1)
mitt.on("message", handler2)  # 你以为会同时调用?
# 实际上会同时调用,没问题
# 但如果写成:
mitt.on("message", handler)  # 绑定 handler
mitt.on("message", handler)  # 再绑定同一个 handler
# 会调用两次!
# ✅ 正确:同一个 handler 只绑定一次
handlers = set()
def my_handler(data):
pass

if my_handler not in handlers:
mitt.on("message", my_handler)
handlers.add(my_handler)

坑 4:emit 放在订阅之前

# ❌ 错误:先广播,再开收音机,消息丢失
event_bus.emit("order", {"name": "小明"})
event_bus.on("order", lambda d: print(d))
# 小明什么都收不到!
# ✅ 正确:先订阅,再广播
event_bus.on("order", lambda d: print(d))
event_bus.emit("order", {"name": "小明"})
# 小明能收到

坑 5:传递可变对象被意外修改

# ❌ 危险示例:直接传递列表引用
data = ["a", "b"]
event_bus.emit("update", data)
data.append("c")  # 修改了原数据
# 所有订阅者手里的数据也被改了!
# ✅ 正确:传递副本
import copy
event_bus.emit("update", copy.copy(data))

性能小贴士:大批量订阅用通配符

如果你的事件名有规律,可以用正则匹配:

# 订阅所有 user.* 事件
event_bus.on("user.*", lambda event, data: 
print(f"收到 {event} 事件:{data}"))

event_bus.emit("user.login", {"id": 1})
event_bus.emit("user.logout", {"id": 1})
# 两个都会触发

调试技巧:打印所有事件

class DebugEventEmitter(EventEmitter):
def emit(self, event, *args):
    print(f"📡 emit: {event}, args: {args}")
    super().emit(event, *args)

def on(self, event, handler):
    print(f"🔔 on: {event}")
    return super().on(event, handler)

bus = DebugEventEmitter()
bus.on("test", lambda x: print(x))
bus.emit("test", "hello")
# 📡 emit: test, args: ('hello',)
# 🔔 on: test
# hello

✏️ 练习题

练习 1(2 分钟):基础抄改

  • 输入:把项目 1 的学生名从 ["小红", "小明", "小刚"] 改成 ["Alice", "Bob", "Charlie"]
  • 预期输出:点名时显示英文名
  • 提示:直接在代码里全局替换字符串即可

练习 2(2 分钟):加个条件判断

  • 输入:在项目 1 中,只有当 student_name 长度 > 2 时才广播
  • 预期输出:点「小」开头的名字时,其他学生没反应
  • 提示:在 call_name 方法里加个 if 判断

练习 3(5 分钟):处理新数据

  • 输入:用项目 2 的 StockMonitor 处理以下数据:
BTC,50000
BTC,52500
BTC,47500
ETH,3000
ETH,3150
  • 预期输出:显示波动超过 5% 的通知
  • 提示:注意 BTC 从 52500 跌到 47500 是多少百分比

练习 4(8 分钟):串个项目 2 和 3

  • 输入:把股票波动通知(项目 2)和待办清单(项目 3)串起来:当某只股票波动 > 5% 时,自动添加一条待办「关注 BTC 行情」
  • 预期输出:波动时既打印通知,又添加待办
  • 提示:在 update_price 的 emit 前,调用 server.add_todo()

练习 5(10 分钟):分析报错

  • 输入:以下代码运行后会输出什么?
from pymitter import EventEmitter
bus = EventEmitter()

def handler(msg):
  print(f"收到:{msg}")

bus.on("msg", handler)
bus.emit("msg", "第一次")
bus.off("msg")  # 注意:没传 handler
bus.emit("msg", "第二次")
  • 预期输出:分析为什么会这样
  • 提示off(event) 不传 handler 会清除所有订阅

作业:做一个「多人协作记事本」

需求描述
做一个命令行版的多人协作记事本,模拟多个用户同时在线,当一个用户添加/删除/查看记事时,其他在线用户能收到实时通知。

功能点
1. add_note(content) - 添加记事,所有人收到通知
2. del_note(note_id) - 删除记事,所有人收到通知
3. list_notes() - 查看当前所有记事

加分项
1. 支持「私密记事」(只有指定用户能看)
2. 添加记事时 @ 某个用户,该用户收到特殊提示

验收标准
- 能跑起来
- 操作时有广播通知输出
- 代码有注释


📚 总结 + 资源

本文学了 3 件事:
1. 事件总线就是个「广播塔」,发布者发消息,订阅者收消息
2. 订阅用 on()发布用 emit()取消用 off()
3. 解耦是关键:发送方不关心谁收,接收方不关心谁发

延伸资源:
- pymitter 官方文档 - Python 事件发射器库
- 《设计模式》- 订阅发布模式(23 种设计模式之一)
- Vue 3 官方文档 - 依赖注入 - 另一种跨组件通讯方式


你在项目里遇到过「页面 A 动了,页面 B 必须跟着动」的场景吗?怎么解决的?评论区聊聊,老粉优先回复!

下一章我们要聊的是「3.3 uView UI 组件库」——学完事件总线,你会知道为什么 uView 的很多组件能「即插即用」,它们内部靠的就是事件总线在通讯。敬请期待!

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。