第5章 5.5 综合实战:博客系统全栈
🎯 开场:为什么你的博客打开这么慢?
上完 Redis 缓存那一章,你现在手里有了一把"加速神器"。
但是——光有神器不够,你还得知道往哪儿挥。光会缓存,不知道整个系统怎么跑,缓存就用不到刀刃上。
你有没有遇到过这种情况:
- 博客首页加载要 3 秒,等得想砸电脑
- 刷新一下文章,阅读量居然没变化(缓存闹的)
- 服务器一多台,session 就乱了
这一章,我们把 Redis 缓存塞进一个真实的博客系统里,让它真正跑起来。你会看到:数据怎么从 MySQL 读出来,怎么被 Redis 缓存住,下次请求怎么直接从内存拿——3 秒变 0.3 秒,就是这么夸张。
学完这章,你能自己搭一个带缓存的完整博客后端,为下一章"socket 编程"打基础——那时候我们要让博客支持实时推送,比如新评论马上弹出来。
🧱 基础 25 分钟:博客系统的骨架
什么是"全栈"?先打个比方
把一个网站想象成一家餐厅:
- 后端(Node.js/Python)是厨房,负责炒菜(处理逻辑)
- 数据库(MySQL)是冰箱,存着所有食材(数据)
- 缓存(Redis)是前台服务员手里的备用盘子,常客爱吃的菜提前备好,不用每次都去冰箱拿
- 前端是盘子上的装饰,让菜看起来更好看
这一章我们先专注后端 + 数据库 + 缓存这三位主角,让它们配合起来。
项目结构:先看看我们有什么
blog-system/
├── app.py # 主程序入口
├── config.py # 配置文件
├── models.py # 数据模型
├── cache_utils.py # 缓存工具
└── data.json # 测试数据
先别急着敲代码,先把这几个模块的关系搞清楚。
核心概念 1:什么是 MVC 架构?
MVC 就是三个抽屉,每个抽屉装不同的东西:
| 抽屉 | 职责 | 类比 |
|---|---|---|
| Model 模型 | 和数据库打交道,存取数据 | 仓库管理员 |
| View 视图 | 返回数据给用户 | 打包员,把东西装盒 |
| Controller 控制器 | 接收请求,调度逻辑 | 前台接待,决定谁来处理 |
为什么要分开? 就像你不会让仓库管理员去打包快递一样——各管各的,代码才不乱。

核心概念 2:Flask 快速上手
Flask 是 Python 里最轻量的 Web 框架, 5 分钟能搭一个能跑的服务器。
from flask import Flask, jsonify
app = Flask(__name__)
@app.route('/')
def index():
return jsonify({'message': '博客首页'})
if __name__ == '__main__':
app.run(port=3000)
运行后访问 http://localhost:3000/,你会看到:
{"message": "博客首页"}
这行在干嘛:定义了一个路由 /,访问根路径就返回一段 JSON。
核心概念 3:MySQL 数据库连接
用 pymysql 库连 MySQL,就像打电话给仓库:
import pymysql
def get_db_connection():
return pymysql.connect(
host='localhost',
user='root',
password='your_password',
database='blog_db',
charset='utf8mb4'
)
# 查询所有文章
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('SELECT id, title FROM posts')
results = cursor.fetchall()
conn.close()
for post in results:
print(f"ID: {post[0]}, 标题: {post[1]}")
这行在干嘛:连接数据库,执行 SQL,拿到结果后关闭连接(用完记得挂电话)。
核心概念 4:Redis 缓存封装
缓存的核心逻辑就三句话:查缓存 → 有就返回 → 没有就查数据库 → 存缓存 → 返回。
import redis
import json
r = redis.Redis(host='localhost', port=6379, db=0)
def get_cached_posts():
"""从缓存获取文章列表"""
cached = r.get('posts:all')
if cached:
print("从缓存读取,速度起飞!")
return json.loads(cached)
# 缓存没有,查数据库(这里用模拟数据)
posts = [{'id': 1, 'title': 'Redis真香'}, {'id': 2, 'title': '缓存实战'}]
# 存进缓存,设置 300 秒过期
r.setex('posts:all', 300, json.dumps(posts))
print("从数据库读取,已存入缓存")
return posts
print(get_cached_posts())
第一次运行输出:从数据库读取,已存入缓存
第二次运行输出:从缓存读取,速度起飞!
这行在干嘛:setex 是 SET with EXpiry,设置 300 秒后自动过期,省得我们手动删。
🔥 实战 35 分钟:3 个递进项目
项目 1(5 分钟):博客首页 API
目标:搭一个能返回文章列表的 API,体会请求是怎么处理的。
from flask import Flask, jsonify
import redis
import json
app = Flask(__name__)
r = redis.Redis(host='localhost', port=6379, db=0)
# 模拟数据库数据
POSTS = [
{'id': 1, 'title': 'Redis缓存入门', 'views': 100},
{'id': 2, 'title': 'Python协程那些事', 'views': 230},
{'id': 3, 'title': 'MySQL优化实战', 'views': 89},
]
@app.route('/api/posts')
def get_posts():
"""获取文章列表,带缓存"""
# 尝试从缓存获取
cached = r.get('posts:list')
if cached:
return jsonify({'source': 'cache', 'data': json.loads(cached)})
# 缓存没有,查"数据库"并缓存
r.setex('posts:list', 60, json.dumps(POSTS))
return jsonify({'source': 'db', 'data': POSTS})
if __name__ == '__main__':
app.run(port=3000, debug=True)
预期输出:
第一次访问 {"source": "db", "data": [...]}
刷新一下 {"source": "cache", "data": [...]}——看到没,"source" 变了!
一句话解释:注释掉的 r.get 先查缓存,有就直接返回,没有才查数据再存缓存。
项目 2(15 分钟):文章详情页 + 阅读量统计
目标:读取单篇文章,每次访问阅读量 +1,但要防止刷阅读量(同一用户短时间内不重复计数)。
这是真实项目中很常见的场景:缓存 + 防刷 + 数据库持久化。
from flask import Flask, jsonify, request
import redis
import json
import time
app = Flask(__name__)
r = redis.Redis(host='localhost', port=6379, db=0)
# 模拟数据库文章
ARTICLES = {
1: {'id': 1, 'title': '深入理解Redis缓存', 'content': '缓存是万金油...', 'views': 0},
2: {'id': 2, 'title': 'Flask快速上手', 'content': 'Flask很轻量...', 'views': 0},
}
@app.route('/api/article/<int:article_id>')
def get_article(article_id):
"""获取文章详情,带防刷阅读量统计"""
if article_id not in ARTICLES:
return jsonify({'error': '文章不存在'}), 404
article = ARTICLES[article_id].copy()
# 防刷:检查这个用户(用IP模拟)是否在5分钟内看过
user_id = request.remote_addr or 'anonymous'
view_key = f'view:{article_id}:{user_id}'
if not r.exists(view_key):
# 没人看过,阅读量+1,更新"数据库"
ARTICLES[article_id]['views'] += 1
# 记录这个用户看过,设置5分钟过期
r.setex(view_key, 300, '1')
# 同时更新缓存中的阅读量
r.hset(f'article:{article_id}', 'views', ARTICLES[article_id]['views'])
# 从缓存读阅读量(缓存里也有)
cached_views = r.hget(f'article:{article_id}', 'views')
if cached_views:
article['views'] = int(cached_views)
return jsonify(article)
if __name__ == '__main__':
app.run(port=3000)
预期输出:
{"id": 1, "title": "深入理解Redis缓存", "content": "缓存是万金油...", "views": 1}
连续刷新同一文章,views 不会再涨。等 5 分钟后再刷,才会 +1。
一句话解释:用 Redis 的 setex 记录「用户看过这篇文章」,key 包含用户ID和文章ID,5分钟内重复访问会被拦截。
项目 3(15 分钟):评论系统 + 实时缓存失效
目标:添加评论功能,评论发布后立即让缓存失效,下次访问直接读新数据。
这是缓存的经典难题:更新数据后怎么更新缓存? 两种策略:
1. 删除缓存(_cache aside):更新数据库后删掉缓存,下次访问重新存
2. 更新缓存:直接改缓存内容
我们用第一种,更安全。
from flask import Flask, jsonify, request
import redis
import json
app = Flask(__name__)
r = redis.Redis(host='localhost', port=6379, db=0)
# 模拟数据
COMMENTS = {} # {article_id: [comments]}
ARTICLES = {
1: {'id': 1, 'title': '深入理解Redis缓存', 'views': 100},
}
def invalidate_cache(article_id):
"""删除文章相关的所有缓存"""
r.delete(f'article:{article_id}')
r.delete('posts:list') # 文章列表缓存也要删
print(f"缓存已失效:article:{article_id}")
@app.route('/api/article/<int:article_id>/comments', methods=['GET'])
def get_comments(article_id):
"""获取评论,先查缓存"""
cache_key = f'comments:{article_id}'
cached = r.get(cache_key)
if cached:
return jsonify({'source': 'cache', 'data': json.loads(cached)})
comments = COMMENTS.get(article_id, [])
r.setex(cache_key, 180, json.dumps(comments))
return jsonify({'source': 'db', 'data': comments})
@app.route('/api/article/<int:article_id>/comments', methods=['POST'])
def add_comment(article_id):
"""添加评论,发布后立即失效缓存"""
data = request.json
comment = {
'id': len(COMMENTS.get(article_id, [])) + 1,
'author': data.get('author', '匿名'),
'content': data.get('content', ''),
'created_at': '刚刚'
}
if article_id not in COMMENTS:
COMMENTS[article_id] = []
COMMENTS[article_id].append(comment)
# 核心!评论发布后立即删除缓存
invalidate_cache(article_id)
return jsonify({'message': '评论成功', 'comment': comment})
if __name__ == '__main__':
app.run(port=3000)
测试步骤:
1. GET /api/article/1/comments → 返回 {"source": "db", "data": []}
2. POST /api/article/1/comments 带 {"author": "小明", "content": "写得真好!"}
3. 再次 GET /api/article/1/comments → 返回 {"source": "db", "data": [{"id": 1, ...}]}
4. 再刷一次 → {"source": "cache", ...}
一句话解释:评论发布后调用 invalidate_cache 删掉缓存,下次读就会重新从数据库(这里模拟的)拿,顺便把新评论塞进去。

💪 进阶 20 分钟:常见坑 + 性能小贴士
坑 1:缓存雪崩——所有数据同时过期
场景:你设置了所有缓存 300 秒过期,结果 300 秒一到,所有请求同时打到数据库,数据库直接炸了。
# ❌ 错误示例:所有key同时过期
r.setex('posts', 300, data)
# ✅ 正确示例:加随机秒数,分散过期
r.setex('posts', 300 + random.randint(0, 60), data)
坑 2:缓存穿透——请求不存在的数据
场景:用户访问一篇不存在的文章,每次都绕过缓存直接查数据库,攻击者可能利用这个刷你数据库。
# ❌ 错误示例:查不到也缓存空值(浪费缓存空间)
r.setex('article:999', 300, 'null')
# ✅ 正确示例:用特殊标记,并且设置短一点的过期时间
r.setex('article:999:null', 60, '1') # 60秒后再放行
坑 3:Redis 连接不释放
场景:每次请求都 redis.Redis() 新建连接,高并发下连接数爆表。
# ❌ 错误示例:每次请求都创建新连接
def get_redis():
return redis.Redis(host='localhost', port=6379)
# ✅ 正确示例:全局单例 + 连接池
pool = redis.ConnectionPool(host='localhost', port=6379, db=0)
r = redis.Redis(connection_pool=pool)
坑 4:忽视缓存大小——内存爆炸
场景:文章列表缓存了几万条,Redis 内存飙升。
# ✅ 正确示例:设置maxmemory-policy
# 在redis.conf里:maxmemory 256mb
# maxmemory-policy allkeys-lru # 内存满时删除最近最少使用的key
坑 5:JSON序列化慢——换MessagePack
# ❌ 慢:每次存取都要json.dumps/loads
r.setex('key', 300, json.dumps(data))
# ✅ 快:MessagePack二进制序列化,快3-5倍
import msgpack
r.setex('key', 300, msgpack.packb(data))
性能小贴士:Pipeline 批量操作
要存一批 key 时,别一条一条发命令,用 pipeline 一次发过去。
# ❌ 慢:10次网络往返
for i in range(10):
r.set(f'key:{i}', value)
# ✅ 快:1次网络往返
pipe = r.pipeline()
for i in range(10):
pipe.set(f'key:{i}', value)
pipe.execute()
调试技巧:Redis Monitor 实时看命令
在终端输入:
redis-cli monitor
所有发到 Redis 的命令都会实时打印出来,缓存有没有生效一目了然。
✏️ 练习题 + 作业题
练习题(5道,10分钟内完成)
练习 1(2分钟):改缓存key前缀
- 输入:将项目1的缓存key从 posts:list 改成 blog:posts:all
- 预期输出:代码正常运行,第二次访问 source 变成 cache
- 提示:r.get('posts:list') → r.get('blog:posts:all'),两处都要改
练习 2(2分钟):加个if判断
- 输入:在项目1里加个判断,如果文章列表为空,返回空数组而不是查缓存
- 预期输出:{"source": "db", "data": []}
- 提示:if not POSTS: return jsonify(...)
练习 3(2分钟):换个数据源
- 输入:把项目2的 ARTICLES 字典换成从 JSON 文件读取
- 预期输出:代码正常运行,功能不变
- 提示:用 json.load(open('articles.json'))
练习 4(2分钟):串起两个项目
- 输入:在项目2的详情页里,同时显示评论数(调用项目3的逻辑)
- 预期输出:{"id": 1, "title": "...", "views": 1, "comment_count": 0}
- 提示:直接用 len(COMMENTS.get(article_id, []))
练习 5(2分钟):分析报错
- 输入:运行代码时遇到 redis.exceptions.ConnectionError: Error -3 connecting to localhost:6379
- 预期输出:说出原因并修复
- 提示:检查 Redis 服务有没有启动 redis-server
作业:做一个「博客系统全栈工具」
需求描述:
用 Flask + Redis 缓存 + MySQL(或文件模拟),做一个命令行博客管理工具,可以在终端里发文章、刷缓存、看统计。
功能点:
1. 查看文章列表:带缓存,标注来自 DB 还是 Cache
2. 发布新文章:标题+内容,发布后自动失效缓存
3. 查看单篇文章:带阅读量统计(防刷)
4. 清除指定缓存:输入 key 名,删除它
加分项:
1. 支持批量清除缓存(如 clear:posts* 清除所有文章相关缓存)
2. 带统计面板:显示缓存命中率
验收标准:
- 能跑起来不报错
- 发布文章后,列表页能立刻看到新文章
- 同一用户5分钟内重复访问同一文章,阅读量不增加
提交方式:评论区贴代码或 GitHub 链接
📚 总结 + 资源
本文学到的 3 个核心点:
1. Redis 缓存三步走:查缓存 → 有就返回 → 没有查库 → 存缓存 → 返回
2. 防刷阅读量:用 Redis 的 setex 记录「用户看过」,设置过期时间拦截重复访问
3. 缓存失效策略:更新数据后立即删除缓存(invalidate),下次访问自动重建
延伸学习资源:
- Redis 官方文档 - Data Types(英文,但例子很清楚)
- 《Redis设计与实现》黄健宏著(中文书里讲得最透的)
- Flask 文档(快速查 API 用)
互动钩子:
你的博客有没有遇到过"刷新阅读量不涨"的奇怪bug?或者你有别的缓存奇葩经历?评论区聊聊,帮你诊断!
下一章我们要进入 socket 编程,实现博客的实时功能——比如新评论不用刷新页面就弹出来。听起来很酷对吧?但在那之前,得先搞懂网络 socket 是什么——我们下一章见!

评论(0)