第5章 5.4 数据库连接池与 ORM
🎯 开场:为什么你家的「水管」总是漏水?
上一章我们学会了用 mysqli 两种方式连接数据库——面向过程和面向对象。那时候我们每操作一次数据库,就开一条新「水管」,用完就扔掉。
问题来了:想象你开了一家奶茶店,每来一个顾客你就买一台净水器,用完就扔。下一个人来了再买新的。这不是疯了吗?
这就是没有连接池时数据库的处境——每次查询都建立新连接,数据库和你都累得喘粗气。
两个痛点你肯定遇到过:
- 网站一搞活动,数据库就崩——连接数爆了
- 每次查询都要写一堆
new mysqli()、connect()、close(),烦不烦?
这一章我们干两件事:① 建一个「净水器租借处」(连接池)重复用连接 ② 找个「智能管家」(ORM)帮你写 SQL。学完你就能让数据库稳如老狗,代码少写一半。
🧱 基础:连接池和 ORM 到底是什么?
连接池:公共厕所的灵感
你去公共厕所,不用每次都找工人新建一个。而是有一个固定的「池子」里放着若干个马桶,\n\n
\n\n
\n\n谁来谁用,用完不清洗但也不拆掉,下一个人继续用。
连接池就是这个道理:事先创建好 N 条数据库连接放在池子里,应用要查询就借一条,用完归还,不关闭。这样就省掉了反复创建和销毁连接的开销。
在 Python 里我们用 dbutils 库的 PooledDB 来做这件事:
from dbutils.pooled_db import PooledDB
import pymysql
# 建一个连接池,最多5条连接
pool = PooledDB(
creator=pymysql, # 用什么库连接
maxconnections=5, # 最多5条连接
mincached=2, # 最少保2条
host='localhost',
user='root',
password='123456',
database='blog'
)
# 借一条连接来用
conn = pool.connection()
cursor = conn.cursor()
cursor.execute("SELECT * FROM posts WHERE id = 1")
result = cursor.fetchone()
print(result)
# 用完了,归还连接(不是关闭!)
conn.close()
代码解析:
- PooledDB(...) 就是建了一个「连接池」
- pool.connection() 从池子里借一条连接
- conn.close() 不是真的关闭,只是还回去
ORM:翻译官的故事
你跟外国人说话需要翻译吧?你说「给我来杯咖啡」,翻译跟服务员说「one coffee please」。
ORM 就是数据库的翻译官。你跟它说「给我查用户表里所有博主」,它帮你翻成 SELECT * FROM users WHERE role = 'author'。
Python 里最流行的 ORM 是 SQLAlchemy。举个例子:
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# 创建一个「翻译官」实例
engine = create_engine('mysql+pymysql://root:123456@localhost/blog')
Base = declarative_base()
# 告诉 ORM:「用户」这张表长这样
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String(50))
role = Column(String(20))
# 让 ORM 根据我们描述的结构自动建表
Base.metadata.create_all(engine)
# 创建一个「会话」,用来操作数据库
Session = sessionmaker(bind=engine)
session = Session()
# 查所有博主——不用写 SQL 了!
authors = session.query(User).filter_by(role='author').all()
for author in authors:
print(f"博主: {author.username}")
代码解析:
- create_engine(...) 建立数据库连接(这背后其实用了连接池)
- class User(Base) 定义了「用户表长什么样」
- session.query(User).filter_by(...) 是在查询,不用写 SELECT * FROM users
单例模式封装:你的私人秘书
每次都要写一长串连接代码烦不烦?搞个「单例模式」封装一下,就像雇个秘书帮你处理杂事。
单例模式的核心:不管你调用多少次,我只给你同一个实例。
class Database:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
# 第一次才真正建连接池
cls._instance.engine = create_engine(
'mysql+pymysql://root:123456@localhost/blog',
pool_size=5, max_overflow=10
)
cls._instance.Session = sessionmaker(bind=cls._instance.engine)
return cls._instance
def get_session(self):
return self.Session()
# 无论调用多少次,拿到的都是同一个 Database 实例
db = Database()
session1 = db.get_session()
session2 = db.get_session()
print(session1 is session2) # True,说明是同一个实例
代码解析:
- __new__ 是 Python 创建对象时第一个被调用的方法
- cls._instance 存储唯一实例,第一次创建后就不再变
🔥 实战:3 个项目带你飞
项目 1:5 分钟搞定连接池查询
场景:你有个博客系统,要频繁查文章列表。用连接池让查询快 3 倍。
from dbutils.pooled_db import PooledDB
import pymysql
# 建连接池
pool = PooledDB(
creator=pymysql,
maxconnections=10,
host='localhost',
user='root',
password='123456',
database='blog'
)
# 模拟100次查询
for i in range(100):
conn = pool.connection()
cursor = conn.cursor()
cursor.execute("SELECT title FROM posts LIMIT 5")
results = cursor.fetchall()
conn.close() # 归还连接,不是关闭!
if i == 0:
for row in results:
print(row[0])
预期输出:
第一篇文章标题
第二篇文章标题
第三篇文章标题
第四篇文章标题
第五篇文章标题
一句话:连接池复用 100 次查询只用了 10 条连接,省去 90 次创建销毁的开销。
项目 2:15 分钟用 ORM 做一个「文章管理器」
场景:从 JSON 文件读取文章数据,批量存入数据库,然后按条件查询。
先准备一个 articles.json 文件:
[
{"title": "Python入门", "content": "这是一篇Python教程", "author_id": 1},
{"title": "PHP入门", "content": "这是一篇PHP教程", "author_id": 1},
{"title": "数据库原理", "content": "这是数据库教程", "author_id": 2}
]
完整可运行代码:
from sqlalchemy import create_engine, Column, Integer, String, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import json
Base = declarative_base()
# 定义文章表结构
class Article(Base):
__tablename__ = 'articles'
id = Column(Integer, primary_key=True, autoincrement=True)
title = Column(String(200))
content = Column(Text)
author_id = Column(Integer)
# 初始化数据库和ORM
engine = create_engine('mysql+pymysql://root:123456@localhost/blog')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
# 从JSON读取文章
with open('articles.json', 'r', encoding='utf-8') as f:
articles_data = json.load(f)
# 批量写入数据库
for item in articles_data:
article = Article(
title=item['title'],
content=item['content'],
author_id=item['author_id']
)
session.add(article)
session.commit()
# 查询作者1的所有文章
my_articles = session.query(Article).filter_by(author_id=1).all()
print(f"作者1共写了 {len(my_articles)} 篇文章:")
for a in my_articles:
print(f" - {a.title}")
session.close()
预期输出:
作者1共写了 2 篇文章:
- Python入门
- PHP入门
一句话:JSON 数据直接变成数据库记录,ORM 帮你搞定 SQL,你只管写 Python 逻辑。
项目 3:15 分钟做一个「博客数据清洗脚本」
场景:你的博客数据库里有很多「垃圾数据」——标题太短、内容为空的文章。把它们揪出来删掉。
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class Post(Base):
__tablename__ = 'posts'
id = Column(Integer, primary_key=True)
title = Column(String(200))
content = Column(Text)
status = Column(String(20))
# 连接数据库
engine = create_engine('mysql+pymysql://root:123456@localhost/blog')
Session = sessionmaker(bind=engine)
session = Session()
# 找出问题文章
print("=== 开始数据清洗 ===\n")
# 1. 找出标题太短的文章(少于5个字)
short_titles = session.query(Post).filter(
db.func.length(Post.title) < 5
).all()
print(f"标题太短(<5字)的文章:{len(short_titles)} 篇")
for p in short_titles:
print(f" ID {p.id}: {p.title}")
# 2. 找出空内容的文章
empty_content = session.query(Post).filter(
db.or_(Post.content == None, Post.content == '')
).all()
print(f"\n空内容的文章:{len(empty_content)} 篇")
# 3. 删除垃圾数据
if short_titles or empty_content:
print("\n正在删除垃圾数据...")
for p in short_titles + empty_content:
session.delete(p)
session.commit()
print("删除完成!")
else:
print("\n数据很干净,不需要清洗!")
session.close()
预期输出:
=== 开始数据清洗 ===
标题太短(<5字)的文章:2 篇
ID 3: 好
ID 7: 嗯
空内容的文章:1 篇
正在删除垃圾数据...
删除完成!
一句话:这个脚本就是你的「数据保洁阿姨」,自动把数据库里的垃圾文章扫干净。
💪 进阶:5 个坑和 3 个技巧
坑 1:连接忘记归还
# ❌ 错误:借了不还,池子会被掏空
conn = pool.connection()
cursor = conn.cursor()
# 程序崩了,conn永远不会 close()
# ✅ 正确:用 try...finally 一定归还
try:
conn = pool.connection()
cursor = conn.cursor()
cursor.execute("SELECT 1")
finally:
conn.close() # 无论如何都会执行
坑 2:ORM 查询不回滚
# ❌ 错误:出错了但还是 commit 了
session.add(post1)
session.add(post2)
session.commit() # 如果这里出错,前面的也提交了
# ✅ 正确:捕获异常,手动回滚
try:
session.add(post1)
session.add(post2)
session.commit()
except Exception as e:
session.rollback() # 撤销所有未提交的操作
print(f"出错了: {e}")
坑 3:ORM 性能陷阱——N+1 查询
# ❌ 错误:循环里每次查一次作者,100篇文章查100次
posts = session.query(Post).limit(100).all()
for post in posts:
author = session.query(User).filter_by(id=post.author_id).first() # 每次都查!
print(post.title, author.name)
# ✅ 正确:用 join 一次查完
posts = session.query(Post, User).join(User, Post.author_id == User.id).limit(100).all()
for post, author in posts:
print(post.title, author.name)
坑 4:连接池配置过大
# ❌ 错误:以为越大越好,结果数据库连接数爆了
pool = PooledDB(creator=pymysql, maxconnections=1000, ...)
# ✅ 正确:根据服务器配置来,MySQL默认 max_connections=151
pool = PooledDB(creator=pymysql, maxconnections=50, ...)
坑 5:Session 线程混用
# ❌ 错误:多线程共用一个 session
session = Session()
def worker():
session.query(User).all() # 线程安全问题是噩梦
# ✅ 正确:每个线程用自己的 session
def worker():
session = Session() # 线程本地变量
session.query(User).all()
调试技巧:print 大法
# 打开 SQL 日志,看看 ORM 到底执行了什么
engine = create_engine('mysql+pymysql://root:123456@localhost/blog', echo=True)
# 之后所有 SQL 都会打印出来
session = Session()
session.query(User).filter_by(id=1).first()
# 输出: SELECT users.id, users.username ... FROM users WHERE users.id = 1
✏️ 练习题
练习 1(2 分钟):连接池借还
- 输入:把项目 1 的 maxconnections 改成 3
- 预期输出:程序正常运行,不报错
- 提示:连接池最小能复用的连接数就是 3 条
练习 2(3 分钟):加个判断
- 输入:在项目 1 里加一个 if len(results) > 0: 判断
- 预期输出:没有查询结果时不打印空行
- 提示:加在 for row in results 外面
练习 3(5 分钟):换个数据源
- 输入:把 articles.json 换成你自己的数据(3条记录)
- 预期输出:能正常导入并查询
- 提示:格式要保持 {"title": "...", "content": "...", "author_id": 数字}
练习 4(10 分钟):串项目 2 和 3
- 输入:用项目 2 写入数据,用项目 3 的清洗逻辑删除标题带「入门」的文章
- 预期输出:先看到「Python入门」「PHP入门」被查出,再被删除
- 提示:把项目 3 的筛选条件从 length < 5 改成 title.like('%入门%')
练习 5(5 分钟):分析报错
- 输入:运行以下代码,看报错信息
session.query(User).filter_by(status='active').all()
- 报错:
AttributeError: 'User' object has no attribute 'status' - 预期输出:说出为什么报错,以及怎么修
- 提示:检查
User类里有没有定义status字段
作业:做一个「博客数据备份工具」
- 需求描述:做一个命令行工具,把博客数据库里的数据导出成 JSON 文件,以后可以再导回去
- 功能点:
1. 导出:把所有文章和用户数据导出到backup.json
2. 导入:把backup.json的数据导入数据库(先清空再导入)
3. 查看:显示备份文件里有多少条记录 - 加分项:
1. 导出时加上时间戳到文件名(如backup_20240101_120000.json)
2. 导入时跳过已存在的记录(不重复插入) - 验收标准:
1. 能运行python backup.py export导出数据
2. 能运行python backup.py import backup.json导入数据
3. 代码有注释,说明每一步在干嘛 - 提交方式:评论区贴代码或 GitHub 链接
📚 总结
这一章学了 3 件事:
- 连接池:建一个「净水器租借处」,复用数据库连接,省去反复创建的损耗
- ORM:找个翻译官,把 Python 代码翻译成 SQL,你不用写 SQL 也能操作数据库
- 单例封装:一次创建反复使用,一个
Database()实例走天下
推荐资源:
- SQLAlchemy 官方文档(英文,但例子超多)
- 《Python 高手之路》—— 里面有专门讲 SQLAlchemy 高级用法的章节
- 视频:B 站搜索「SQLAlchemy 教程」,挑播放量最高的看
互动钩子:你在项目里用过连接池或 ORM 吗?有没有遇到过什么奇葩 bug?评论区聊聊,老粉优先回复!
📌 下章预告:学了连接池和 ORM,是时候真刀真枪干一票了——下一章我们做一个「博客系统后端」,用户管理、文章增删改查、登录验证,全部自己写!

评论(0)