第1章 1.5 自定义事件与组件通信

🎯 开场:为什么你的代码总是「各管各的」?

上一章我们学了 props,知道了父组件怎么把数据「塞给」子组件。就像你给小明递了一张购物清单,清单上的东西是小明负责买的。

但你有没有遇到过这种情况:

  • 小明买完菜,想告诉你「我回来了,快做饭」,但你不知道他什么时候买完
  • 厨房说「菜做好了」,但客厅的人没听到,大家各忙各的
  • 小明和妈妈都要通知你「冰箱门没关」,但你只收到了一个通知

这就是组件通信的问题——组件之间「说话」不畅通,各管各的。

学完这一章,你能解决:

  • 怎么让子组件反向通知父组件(不只是父→子单向传数据)
  • 怎么让完全不相关的组件也能互相通信
  • 怎么做出一个「消息广播系统」,让 N 个组件都能收到通知

🧱 基础:先搞懂「观察者模式」是什么

生活中的类比:电视台和观众

想象一下:

  • 电视台(发布者):只管拍节目,不知道谁在看
  • (订阅者):打开电视,等着看节目
  • \n\nSimple tech illustration expla\n\nAI comic creation scene, creat\n\n有线电视网络(中间件):负责把信号传给你

Vue/React 的组件通信,本质上就是这个模式——发布-订阅模式(Observer Pattern)。

在 Python 里,我们用一个叫 EventEmitter 的东西来模拟这个过程。

第一个可运行程序:自己的「消息广播站」

# 事件中心:相当于电视台的信号塔
class EventBus:
def __init__(self):
    self.listeners = {}  # 监听器仓库,格式:{'事件名': [回调函数列表]}

# 订阅:登记谁想收我的消息
def on(self, event_name, callback):
    if event_name not in self.listeners:
        self.listeners[event_name] = []
    self.listeners[event_name].append(callback)

# 发布:广播消息
def emit(self, event_name, data=None):
    if event_name in self.listeners:
        for callback in self.listeners[event_name]:
            callback(data)

# 创建一个全局事件总线
event_bus = EventBus()

# 定义一个「观众」函数
def 小明收到通知(message):
print(f"🏠 小明收到了:{message}")

def 妈妈收到通知(message):
print(f"👩 妈妈收到了:{message}")

# 订阅事件(相当于打开电视)
event_bus.on('有事宣布', 小明收到通知)
event_bus.on('有事宣布', 妈妈收到通知)

# 发布事件(电视台开始广播)
event_bus.emit('有事宣布', '明天大扫除!')

预期输出:

🏠 小明收到了:明天大扫除!
👩 妈妈收到了:明天大扫除!

两行输出!一个 emit,两个 callback 同时触发。这就是发布-订阅模式的核心。

进阶:带参数的事件

刚才的例子,emit 只传了一个参数。实际开发中,事件往往带更多数据:

# 订阅时定义的回调函数,参数要匹配 emit 传的
def 处理订单事件(order_data):
print(f"收到订单:{order_data['商品']},数量:{order_data['数量']}")

# 多个参数怎么传?用元组
def 处理订单事件_v2(order_id, amount, status):
print(f"订单#{order_id}: {amount}元,状态={status}")

event_bus.on('新订单', 处理订单事件)
event_bus.emit('新订单', {'商品': '手机', '数量': 2})

# 如果要传多个参数,用 **kwargs 字典更灵活
event_bus.on('支付成功', 处理订单事件_v2)
event_bus.emit('支付成功', 1001, 5999.00, '已支付')

再进一步:只监听一次

有时候你只需要知道「第一次发生就够了」,后面就不管了:

def 快递到了(data):
print(f"📦 签收:{data}")

# on 是持续监听,once 只听一次
event_bus.once('快递通知', 快递到了)

event_bus.emit('快递通知', '来自淘宝的包裹')
event_bus.emit('快递通知', '来自京东的包裹')  # 这条不会触发上面的函数了

预期输出:

📦 签收:来自淘宝的包裹

第二次 emit 被忽略了,因为 once 只生效一次。


🔥 实战:3 个递进项目

项目 1(5 分钟):简单的「点赞通知系统」

做一个点赞系统,当文章被点赞时,通知作者和评论区。

class EventBus:
def __init__(self):
    self.listeners = {}

def on(self, event, callback):
    if event not in self.listeners:
        self.listeners[event] = []
    self.listeners[event].append(callback)

def emit(self, event, *args, **kwargs):
    if event in self.listeners:
        for cb in self.listeners[event]:
            cb(*args, **kwargs)

bus = EventBus()

def 通知作者(文章标题, 点赞者):
print(f"✉️ 作者你好!{点赞者} 点赞了你的文章《{文章标题}》")

def 更新评论区(文章标题, 点赞者):
print(f"💬 评论区:大家快去看《{文章标题}》,{点赞者}觉得写得好!")

# 订阅
bus.on('点赞', 通知作者)
bus.on('点赞', 更新评论区)

# 模拟点赞
bus.emit('点赞', 'Python入门教程', '小明')

预期输出:

✉️ 作者你好!小明 点赞了你的文章《Python入门教程》
💬 评论区:大家快去看《Python入门教程》,小明觉得写得好!

一句话解释:emit 一次,两个订阅者都收到了消息——这就是组件间「一对多」通信。


项目 2(15 分钟):从 JSON 文件读取数据的「商品促销系统」

现在来点真实的:从 JSON 文件读取商品数据,当价格变动时自动通知「关注的用户」。

先创建测试数据文件 products.json

# 先别管这行干嘛的,先运行它创建测试文件
import json
with open('products.json', 'w') as f:
json.dump([
    {"id": 1, "name": "iPhone 15", "price": 6999, "watchers": ["小明", "张三"]},
    {"id": 2, "name": "MacBook Pro", "price": 12999, "watchers": ["李四", "王五"]},
    {"id": 3, "name": "AirPods", "price": 1899, "watchers": ["小明", "赵六"]},
], f, ensure_ascii=False)
print("✅ 测试数据已创建")

现在写主程序:

import json

class EventBus:
def __init__(self):
    self.listeners = {}

def on(self, event, callback):
    if event not in self.listeners:
        self.listeners[event] = []
    self.listeners[event].append(callback)

def emit(self, event, *args, **kwargs):
    if event in self.listeners:
        for cb in self.listeners[event]:
            cb(*args, **kwargs)

bus = EventBus()

def 读取商品数据():
with open('products.json', 'r', encoding='utf-8') as f:
    return json.load(f)

def 价格变动通知(商品, 旧价格, 新价格, 关注者列表):
降幅 = 旧价格 - 新价格
降幅百分比 = 降幅 / 旧价格 * 100
for 用户 in 关注者列表:
    print(f"📢 {用户},好消息!《{商品}》降价了!")
    print(f"   原价 {旧价格} → 现价 {新价格},省了 {降幅} 元 ({降幅百分比:.1f}% off)")

def 检查价格变动():
"""模拟价格检查:如果发现降价,触发通知"""
商品 = {"name": "iPhone 15", "price": 6999, "watchers": ["小明", "张三"]}
原价 = 6999
新价 = 5999  # 假设双十一降价了

if 新价 < 原价:
    print(f"\n🔍 检测到价格变动!")
    bus.emit('价格变动', 商品['name'], 原价, 新价, 商品['watchers'])

# 订阅
bus.on('价格变动', 价格变动通知)

# 运行
检查价格变动()

预期输出:

🔍 检测到价格变动!
📢 小明,好消息!《iPhone 15》降价了!
价 6999 → 现价 5999,省了 1000 元 (14.3% off)
📢 张三,好消息!《iPhone 15》降价了!
价 6999 → 现价 5999,省了 1000 元 (14.3% off)

一句话解释:核心是「关注者列表」——谁关注了这个商品,价格变时就通知谁。


项目 3(15 分钟):组合做个「待办事项 + 事件日志」

把前两个项目的能力组合起来,做一个带事件记录的待办清单:

  • 添加/完成待办时,触发事件
  • 事件被记录到日志文件中
  • 同时通知「统计模块」更新计数
import json
from datetime import datetime

class EventBus:
def __init__(self):
    self.listeners = {}


def on(self, event, callback):
    if event not in self.listeners:
        self.listeners[event] = []
    self.listeners[event].append(callback)

def emit(self, event, *args, **kwargs):
    if event in self.listeners:
        for cb in self.listeners[event]:
            cb(*args, **kwargs)

class TodoList:
def __init__(self):
    self.bus = EventBus()
    self.todos = []
    self.stats = {"添加": 0, "完成": 0}

def 添加任务(self, 任务名):
    todo = {"id": len(self.todos) + 1, "name": 任务名, "done": False}
    self.todos.append(todo)
    # 触发事件:传递任务信息和统计数据
    self.bus.emit('任务添加', todo, self.stats)

def 完成任务(self, 任务id):
    for todo in self.todos:
        if todo["id"] == 任务id:
            todo["done"] = True
            self.bus.emit('任务完成', todo, self.stats)
            break

def 显示统计(self):
    print(f"\n📊 当前统计:添加了 {self.stats['添加']} 个任务,完成了 {self.stats['完成']} 个")

# ========== 事件处理器 ==========

def 记录到日志(todo, stats):
"""把事件写入日志文件"""
时间戳 = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
日志内容 = f"[{时间戳}] 任务添加:{todo['name']} | 累计:添加{stats['添加']} 完成{stats['完成']}\n"
with open('todo_log.txt', 'a', encoding='utf-8') as f:
    f.write(日志内容)
print(f"📝 已记录到日志")

def 更新统计(todo, stats):
"""统计模块:更新计数"""
stats["添加"] += 1
print(f"📈 统计更新:添加数 +1")

def 完成通知(todo, stats):
"""完成时发送通知"""
print(f"✅ 任务完成:{todo['name']} | 完成数:{stats['完成']}")

def 完成统计(todo, stats):
stats["完成"] += 1

# ========== 运行 ==========

todo_list = TodoList()

# 订阅事件
todo_list.bus.on('任务添加', 记录到日志)
todo_list.bus.on('任务添加', 更新统计)
todo_list.bus.on('任务完成', 完成通知)
todo_list.bus.on('任务完成', 完成统计)

# 操作
todo_list.添加任务("买菜")
todo_list.添加任务("做饭")
todo_list.完成任务(1)
todo_list.显示统计()

# 显示日志
print("\n📄 日志文件内容:")
with open('todo_log.txt', 'r', encoding='utf-8') as f:
print(f.read())

预期输出:

📝 已记录到日志
📈 统计更新:添加数 +1
📝 已记录到日志
📈 统计更新:添加数 +1
✅ 任务完成:买菜 | 完成数:1

📊 当前统计:添加了 2 个任务,完成了 1 个

📄 日志文件内容:
[2024-01-15 10:30:00] 任务添加:买菜 | 累计:添加1 完成0
[2024-01-15 10:30:00] 任务添加:做饭 | 累计:添加2 完成0

一句话解释:一个 emit 触发多个 callback——添加任务时同时写日志+更新统计,完成任务时通知+更新统计。


💪 进阶:新手最容易踩的 5 个坑

坑 1:回调函数参数不匹配

# ❌ 错误示例:emit 传 2 个参数,但 on 只接收 1 个
def 回调(数据):
print(f"收到:{数据}")

bus.on('事件A', 回调)
bus.emit('事件A', 'Hello', 'World')  # 报错!多余的参数没人接

# ✅ 正确示例:参数个数要匹配
def 回调(数据1, 数据2):
print(f"收到:{数据1} 和 {数据2}")

bus.on('事件A', 回调)
bus.emit('事件A', 'Hello', 'World')  # 正常

坑 2:先 emit 后 on(监听太晚)

# ❌ 错误示例:事件先发生了,再订阅就收不到了
bus.emit('事件B', '这个消息会丢')  # 先广播了
bus.on('事件B', lambda x: print(x))  # 后订阅,晚了

# ✅ 正确示例:先订阅,再操作
bus.on('事件C', lambda x: print(f"收到:{x}"))
bus.emit('事件C', '这个消息能收到')  # 后广播,完美

注意:这个坑在 Vue 的 onMounted 钩子里特别容易踩到——组件还没挂载呢,你就 emit 了。

坑 3:off 取消订阅写错名字

# ❌ 错误示例:off 的事件名写错了
bus.on('事件D', 回调A)
bus.off('事件d', 回调A)  # 大小写不一致,取消了个寂寞

# ✅ 正确示例:名称要一模一样
bus.off('事件D', 回调A)  # 成功取消

坑 4:忘记检查 listeners 是否存在

# ❌ 错误示例:没订阅过的事件 emit 会报错
bus.emit('不存在的事件', 'test')  # KeyError

# ✅ 正确示例:EventBus 实现时要检查
def emit(self, event, *args, **kwargs):
if event in self.listeners:  # 先检查有没有
    for cb in self.listeners[event]:
        cb(*args, **kwargs)

坑 5:内存泄漏——监听器不断累积

# ❌ 错误示例:组件反复挂载时,每次 on 都会加一个新的监听器
class 组件:
def 挂载(self):
    # 每次挂载都加监听器,不移除的话会越积越多
    bus.on('刷新', self.刷新界面)

def 卸载(self):
    pass  # 忘了写 off!监听器一直留着

# ✅ 正确示例:卸载时要取消订阅
def 卸载(self):
bus.off('刷新', self.刷新界面)  # 记得清理

调试技巧:打印事件日志

在 emit 里加一行 print,随时知道「谁发了什么」:

def emit(self, event, *args, **kwargs):
print(f"🔔 [DEBUG] emit '{event}' with args={args}, kwargs={kwargs}")
if event in self.listeners:
    for cb in self.listeners[event]:
        cb(*args, **kwargs)

✏️ 练习题

练习 1(2 分钟):改个名字就能跑
- 输入:在项目 1 代码中,把 event_bus.emit('点赞', ...) 改成 event_bus.emit('收藏', ...)
- 预期输出:🏠 小明收到了:明天大扫除! 变成 🏠 小明收到了:xxx(你填的内容)
- 提示:改两个地方就行——emit 的事件名,和 on 的事件名

练习 2(2 分钟):加个 if 判断
- 输入:在项目 1 的 通知作者 函数里,只有当点赞者是「小明」时才打印
- 预期输出:其他人的点赞不打印通知
- 提示:在函数开头加 if 点赞者 != "小明": return

练习 3(3 分钟):换一份新数据
- 输入:把项目 2 的 products.json 改成 3 个你自己的商品,运行代码看结果
- 预期输出:看到你商品的降价通知
- 提示:json 格式要对——[{"id": x, "name": "xxx", "price": 100, "watchers": ["用户1"]}]

练习 4(5 分钟):串项目 2 和 3
- 输入:在项目 3 的待办系统里,加入「任务完成时检查是否全部完成」的功能
- 预期输出:当所有任务都完成时,打印「🎉 全部搞定!」
- 提示:在 完成任务 函数里检查 len([t for t in self.todos if not t['done']]) == 0

练习 5(3 分钟):找 bug
- 输入:下面的代码运行会报错,找出原因

bus = EventBus()
def 回调(x, y):
print(x + y)
bus.on('加法', 回调)
bus.emit('加法', 5)  # 少了第二个参数
  • 预期输出:TypeError 或类似报错
  • 提示:数一数 emit 给了几个参数,回调要几个参数

作业:做一个「消息订阅中心」

  • 需求描述:做一个类似微信公众平台的消息订阅系统,用户可以订阅感兴趣的话题,有新内容时自动推送
  • 功能点
    1. 用户可以订阅话题(至少 3 个话题,如「技术」「生活」「娱乐」)
    2. 发布者发布消息时,只有订阅了相关话题的用户能收到
    3. 每个用户收到消息后,记录到自己的「收件箱」
  • 加分项
    1. 用户可以取消订阅
    2. 显示每个用户的未读消息数量
  • 验收标准:能跑起来 + 至少 2 个用户 + 每个用户有不同的订阅 + 发布消息后用户收到的不一样
  • 提交方式:评论区贴代码或 GitHub 链接

📚 总结

这一章学了 3 个核心点:

  1. EventBus(事件总线):用一个中间人让组件不直接对话,通过「订阅-发布」解耦
  2. 一对多通信:一个 emit 可以触发多个 callback,实现广播效果
  3. 实际应用场景:点赞通知、价格监控、待办清单都可以用这套思路

互动钩子:你在项目里遇到过「组件之间不知道彼此存在,但需要通信」的场景吗?评论区说说,老博主优先回复!

下一章我们要聊的话题是——有了这些事件机制,怎么用它来组织代码?下一章「第 2 章 2.1 Composition API 基础」告诉你答案。

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。