第1章 1.5 自定义事件与组件通信
🎯 开场:为什么你的代码总是「各管各的」?
上一章我们学了 props,知道了父组件怎么把数据「塞给」子组件。就像你给小明递了一张购物清单,清单上的东西是小明负责买的。
但你有没有遇到过这种情况:
- 小明买完菜,想告诉你「我回来了,快做饭」,但你不知道他什么时候买完
- 厨房说「菜做好了」,但客厅的人没听到,大家各忙各的
- 小明和妈妈都要通知你「冰箱门没关」,但你只收到了一个通知
这就是组件通信的问题——组件之间「说话」不畅通,各管各的。
学完这一章,你能解决:
- 怎么让子组件反向通知父组件(不只是父→子单向传数据)
- 怎么让完全不相关的组件也能互相通信
- 怎么做出一个「消息广播系统」,让 N 个组件都能收到通知
🧱 基础:先搞懂「观察者模式」是什么
生活中的类比:电视台和观众
想象一下:
- 电视台(发布者):只管拍节目,不知道谁在看
- 你(订阅者):打开电视,等着看节目
- \n\n
\n\n
\n\n有线电视网络(中间件):负责把信号传给你
Vue/React 的组件通信,本质上就是这个模式——发布-订阅模式(Observer Pattern)。
在 Python 里,我们用一个叫 EventEmitter 的东西来模拟这个过程。
第一个可运行程序:自己的「消息广播站」
# 事件中心:相当于电视台的信号塔
class EventBus:
def __init__(self):
self.listeners = {} # 监听器仓库,格式:{'事件名': [回调函数列表]}
# 订阅:登记谁想收我的消息
def on(self, event_name, callback):
if event_name not in self.listeners:
self.listeners[event_name] = []
self.listeners[event_name].append(callback)
# 发布:广播消息
def emit(self, event_name, data=None):
if event_name in self.listeners:
for callback in self.listeners[event_name]:
callback(data)
# 创建一个全局事件总线
event_bus = EventBus()
# 定义一个「观众」函数
def 小明收到通知(message):
print(f"🏠 小明收到了:{message}")
def 妈妈收到通知(message):
print(f"👩 妈妈收到了:{message}")
# 订阅事件(相当于打开电视)
event_bus.on('有事宣布', 小明收到通知)
event_bus.on('有事宣布', 妈妈收到通知)
# 发布事件(电视台开始广播)
event_bus.emit('有事宣布', '明天大扫除!')
预期输出:
🏠 小明收到了:明天大扫除!
👩 妈妈收到了:明天大扫除!
两行输出!一个 emit,两个 callback 同时触发。这就是发布-订阅模式的核心。
进阶:带参数的事件
刚才的例子,emit 只传了一个参数。实际开发中,事件往往带更多数据:
# 订阅时定义的回调函数,参数要匹配 emit 传的
def 处理订单事件(order_data):
print(f"收到订单:{order_data['商品']},数量:{order_data['数量']}")
# 多个参数怎么传?用元组
def 处理订单事件_v2(order_id, amount, status):
print(f"订单#{order_id}: {amount}元,状态={status}")
event_bus.on('新订单', 处理订单事件)
event_bus.emit('新订单', {'商品': '手机', '数量': 2})
# 如果要传多个参数,用 **kwargs 字典更灵活
event_bus.on('支付成功', 处理订单事件_v2)
event_bus.emit('支付成功', 1001, 5999.00, '已支付')
再进一步:只监听一次
有时候你只需要知道「第一次发生就够了」,后面就不管了:
def 快递到了(data):
print(f"📦 签收:{data}")
# on 是持续监听,once 只听一次
event_bus.once('快递通知', 快递到了)
event_bus.emit('快递通知', '来自淘宝的包裹')
event_bus.emit('快递通知', '来自京东的包裹') # 这条不会触发上面的函数了
预期输出:
📦 签收:来自淘宝的包裹
第二次 emit 被忽略了,因为 once 只生效一次。
🔥 实战:3 个递进项目
项目 1(5 分钟):简单的「点赞通知系统」
做一个点赞系统,当文章被点赞时,通知作者和评论区。
class EventBus:
def __init__(self):
self.listeners = {}
def on(self, event, callback):
if event not in self.listeners:
self.listeners[event] = []
self.listeners[event].append(callback)
def emit(self, event, *args, **kwargs):
if event in self.listeners:
for cb in self.listeners[event]:
cb(*args, **kwargs)
bus = EventBus()
def 通知作者(文章标题, 点赞者):
print(f"✉️ 作者你好!{点赞者} 点赞了你的文章《{文章标题}》")
def 更新评论区(文章标题, 点赞者):
print(f"💬 评论区:大家快去看《{文章标题}》,{点赞者}觉得写得好!")
# 订阅
bus.on('点赞', 通知作者)
bus.on('点赞', 更新评论区)
# 模拟点赞
bus.emit('点赞', 'Python入门教程', '小明')
预期输出:
✉️ 作者你好!小明 点赞了你的文章《Python入门教程》
💬 评论区:大家快去看《Python入门教程》,小明觉得写得好!
一句话解释:emit 一次,两个订阅者都收到了消息——这就是组件间「一对多」通信。
项目 2(15 分钟):从 JSON 文件读取数据的「商品促销系统」
现在来点真实的:从 JSON 文件读取商品数据,当价格变动时自动通知「关注的用户」。
先创建测试数据文件 products.json:
# 先别管这行干嘛的,先运行它创建测试文件
import json
with open('products.json', 'w') as f:
json.dump([
{"id": 1, "name": "iPhone 15", "price": 6999, "watchers": ["小明", "张三"]},
{"id": 2, "name": "MacBook Pro", "price": 12999, "watchers": ["李四", "王五"]},
{"id": 3, "name": "AirPods", "price": 1899, "watchers": ["小明", "赵六"]},
], f, ensure_ascii=False)
print("✅ 测试数据已创建")
现在写主程序:
import json
class EventBus:
def __init__(self):
self.listeners = {}
def on(self, event, callback):
if event not in self.listeners:
self.listeners[event] = []
self.listeners[event].append(callback)
def emit(self, event, *args, **kwargs):
if event in self.listeners:
for cb in self.listeners[event]:
cb(*args, **kwargs)
bus = EventBus()
def 读取商品数据():
with open('products.json', 'r', encoding='utf-8') as f:
return json.load(f)
def 价格变动通知(商品, 旧价格, 新价格, 关注者列表):
降幅 = 旧价格 - 新价格
降幅百分比 = 降幅 / 旧价格 * 100
for 用户 in 关注者列表:
print(f"📢 {用户},好消息!《{商品}》降价了!")
print(f" 原价 {旧价格} → 现价 {新价格},省了 {降幅} 元 ({降幅百分比:.1f}% off)")
def 检查价格变动():
"""模拟价格检查:如果发现降价,触发通知"""
商品 = {"name": "iPhone 15", "price": 6999, "watchers": ["小明", "张三"]}
原价 = 6999
新价 = 5999 # 假设双十一降价了
if 新价 < 原价:
print(f"\n🔍 检测到价格变动!")
bus.emit('价格变动', 商品['name'], 原价, 新价, 商品['watchers'])
# 订阅
bus.on('价格变动', 价格变动通知)
# 运行
检查价格变动()
预期输出:
🔍 检测到价格变动!
📢 小明,好消息!《iPhone 15》降价了!
价 6999 → 现价 5999,省了 1000 元 (14.3% off)
📢 张三,好消息!《iPhone 15》降价了!
价 6999 → 现价 5999,省了 1000 元 (14.3% off)
一句话解释:核心是「关注者列表」——谁关注了这个商品,价格变时就通知谁。
项目 3(15 分钟):组合做个「待办事项 + 事件日志」
把前两个项目的能力组合起来,做一个带事件记录的待办清单:
- 添加/完成待办时,触发事件
- 事件被记录到日志文件中
- 同时通知「统计模块」更新计数
import json
from datetime import datetime
class EventBus:
def __init__(self):
self.listeners = {}
def on(self, event, callback):
if event not in self.listeners:
self.listeners[event] = []
self.listeners[event].append(callback)
def emit(self, event, *args, **kwargs):
if event in self.listeners:
for cb in self.listeners[event]:
cb(*args, **kwargs)
class TodoList:
def __init__(self):
self.bus = EventBus()
self.todos = []
self.stats = {"添加": 0, "完成": 0}
def 添加任务(self, 任务名):
todo = {"id": len(self.todos) + 1, "name": 任务名, "done": False}
self.todos.append(todo)
# 触发事件:传递任务信息和统计数据
self.bus.emit('任务添加', todo, self.stats)
def 完成任务(self, 任务id):
for todo in self.todos:
if todo["id"] == 任务id:
todo["done"] = True
self.bus.emit('任务完成', todo, self.stats)
break
def 显示统计(self):
print(f"\n📊 当前统计:添加了 {self.stats['添加']} 个任务,完成了 {self.stats['完成']} 个")
# ========== 事件处理器 ==========
def 记录到日志(todo, stats):
"""把事件写入日志文件"""
时间戳 = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
日志内容 = f"[{时间戳}] 任务添加:{todo['name']} | 累计:添加{stats['添加']} 完成{stats['完成']}\n"
with open('todo_log.txt', 'a', encoding='utf-8') as f:
f.write(日志内容)
print(f"📝 已记录到日志")
def 更新统计(todo, stats):
"""统计模块:更新计数"""
stats["添加"] += 1
print(f"📈 统计更新:添加数 +1")
def 完成通知(todo, stats):
"""完成时发送通知"""
print(f"✅ 任务完成:{todo['name']} | 完成数:{stats['完成']}")
def 完成统计(todo, stats):
stats["完成"] += 1
# ========== 运行 ==========
todo_list = TodoList()
# 订阅事件
todo_list.bus.on('任务添加', 记录到日志)
todo_list.bus.on('任务添加', 更新统计)
todo_list.bus.on('任务完成', 完成通知)
todo_list.bus.on('任务完成', 完成统计)
# 操作
todo_list.添加任务("买菜")
todo_list.添加任务("做饭")
todo_list.完成任务(1)
todo_list.显示统计()
# 显示日志
print("\n📄 日志文件内容:")
with open('todo_log.txt', 'r', encoding='utf-8') as f:
print(f.read())
预期输出:
📝 已记录到日志
📈 统计更新:添加数 +1
📝 已记录到日志
📈 统计更新:添加数 +1
✅ 任务完成:买菜 | 完成数:1
📊 当前统计:添加了 2 个任务,完成了 1 个
📄 日志文件内容:
[2024-01-15 10:30:00] 任务添加:买菜 | 累计:添加1 完成0
[2024-01-15 10:30:00] 任务添加:做饭 | 累计:添加2 完成0
一句话解释:一个 emit 触发多个 callback——添加任务时同时写日志+更新统计,完成任务时通知+更新统计。
💪 进阶:新手最容易踩的 5 个坑
坑 1:回调函数参数不匹配
# ❌ 错误示例:emit 传 2 个参数,但 on 只接收 1 个
def 回调(数据):
print(f"收到:{数据}")
bus.on('事件A', 回调)
bus.emit('事件A', 'Hello', 'World') # 报错!多余的参数没人接
# ✅ 正确示例:参数个数要匹配
def 回调(数据1, 数据2):
print(f"收到:{数据1} 和 {数据2}")
bus.on('事件A', 回调)
bus.emit('事件A', 'Hello', 'World') # 正常
坑 2:先 emit 后 on(监听太晚)
# ❌ 错误示例:事件先发生了,再订阅就收不到了
bus.emit('事件B', '这个消息会丢') # 先广播了
bus.on('事件B', lambda x: print(x)) # 后订阅,晚了
# ✅ 正确示例:先订阅,再操作
bus.on('事件C', lambda x: print(f"收到:{x}"))
bus.emit('事件C', '这个消息能收到') # 后广播,完美
注意:这个坑在 Vue 的 onMounted 钩子里特别容易踩到——组件还没挂载呢,你就 emit 了。
坑 3:off 取消订阅写错名字
# ❌ 错误示例:off 的事件名写错了
bus.on('事件D', 回调A)
bus.off('事件d', 回调A) # 大小写不一致,取消了个寂寞
# ✅ 正确示例:名称要一模一样
bus.off('事件D', 回调A) # 成功取消
坑 4:忘记检查 listeners 是否存在
# ❌ 错误示例:没订阅过的事件 emit 会报错
bus.emit('不存在的事件', 'test') # KeyError
# ✅ 正确示例:EventBus 实现时要检查
def emit(self, event, *args, **kwargs):
if event in self.listeners: # 先检查有没有
for cb in self.listeners[event]:
cb(*args, **kwargs)
坑 5:内存泄漏——监听器不断累积
# ❌ 错误示例:组件反复挂载时,每次 on 都会加一个新的监听器
class 组件:
def 挂载(self):
# 每次挂载都加监听器,不移除的话会越积越多
bus.on('刷新', self.刷新界面)
def 卸载(self):
pass # 忘了写 off!监听器一直留着
# ✅ 正确示例:卸载时要取消订阅
def 卸载(self):
bus.off('刷新', self.刷新界面) # 记得清理
调试技巧:打印事件日志
在 emit 里加一行 print,随时知道「谁发了什么」:
def emit(self, event, *args, **kwargs):
print(f"🔔 [DEBUG] emit '{event}' with args={args}, kwargs={kwargs}")
if event in self.listeners:
for cb in self.listeners[event]:
cb(*args, **kwargs)
✏️ 练习题
练习 1(2 分钟):改个名字就能跑
- 输入:在项目 1 代码中,把 event_bus.emit('点赞', ...) 改成 event_bus.emit('收藏', ...)
- 预期输出:🏠 小明收到了:明天大扫除! 变成 🏠 小明收到了:xxx(你填的内容)
- 提示:改两个地方就行——emit 的事件名,和 on 的事件名
练习 2(2 分钟):加个 if 判断
- 输入:在项目 1 的 通知作者 函数里,只有当点赞者是「小明」时才打印
- 预期输出:其他人的点赞不打印通知
- 提示:在函数开头加 if 点赞者 != "小明": return
练习 3(3 分钟):换一份新数据
- 输入:把项目 2 的 products.json 改成 3 个你自己的商品,运行代码看结果
- 预期输出:看到你商品的降价通知
- 提示:json 格式要对——[{"id": x, "name": "xxx", "price": 100, "watchers": ["用户1"]}]
练习 4(5 分钟):串项目 2 和 3
- 输入:在项目 3 的待办系统里,加入「任务完成时检查是否全部完成」的功能
- 预期输出:当所有任务都完成时,打印「🎉 全部搞定!」
- 提示:在 完成任务 函数里检查 len([t for t in self.todos if not t['done']]) == 0
练习 5(3 分钟):找 bug
- 输入:下面的代码运行会报错,找出原因
bus = EventBus()
def 回调(x, y):
print(x + y)
bus.on('加法', 回调)
bus.emit('加法', 5) # 少了第二个参数
- 预期输出:TypeError 或类似报错
- 提示:数一数 emit 给了几个参数,回调要几个参数
作业:做一个「消息订阅中心」
- 需求描述:做一个类似微信公众平台的消息订阅系统,用户可以订阅感兴趣的话题,有新内容时自动推送
- 功能点:
1. 用户可以订阅话题(至少 3 个话题,如「技术」「生活」「娱乐」)
2. 发布者发布消息时,只有订阅了相关话题的用户能收到
3. 每个用户收到消息后,记录到自己的「收件箱」 - 加分项:
1. 用户可以取消订阅
2. 显示每个用户的未读消息数量 - 验收标准:能跑起来 + 至少 2 个用户 + 每个用户有不同的订阅 + 发布消息后用户收到的不一样
- 提交方式:评论区贴代码或 GitHub 链接
📚 总结
这一章学了 3 个核心点:
- EventBus(事件总线):用一个中间人让组件不直接对话,通过「订阅-发布」解耦
- 一对多通信:一个 emit 可以触发多个 callback,实现广播效果
- 实际应用场景:点赞通知、价格监控、待办清单都可以用这套思路
互动钩子:你在项目里遇到过「组件之间不知道彼此存在,但需要通信」的场景吗?评论区说说,老博主优先回复!
下一章我们要聊的话题是——有了这些事件机制,怎么用它来组织代码?下一章「第 2 章 2.1 Composition API 基础」告诉你答案。

评论(0)