第7章 7.4 综合实战:大文件分片上传下载
🎯 开场 3 分钟:为什么你的上传总是卡在半路?
想象这个场景:你在上传一个 2GB 的视频文件到网盘,进度条跑到 97%,突然断网了或者公司电脑没电了。这时候你会怎么办?只能从头再来一遍,眼睁睁看着进度条从 0% 慢慢爬到 97%,那种感觉别提多难受了。
或者,你是做后端开发的,用户投诉说「上传大文件就失败」,你排查半天发现是内存爆了——因为服务器一次性把整个文件加载到内存里。
学完这一章,你就能自己写出一个「断点续传 + 分片上传 + 进度条 + 秒传」的大文件处理工具,这些痛点统统不存在。
上一章我们学会了用流的「背压」机制来控制数据流的节奏,避免内存被撑爆。这一章我们就用这个能力,来解决真实世界里的大文件传输问题。
🧱 基础 25 分钟:核心概念
7.4.1 分片上传:把大象装进冰箱的正确姿势
什么是分片上传?
就像搬家的时候,你不会一次把整个家的东西同时搬,而是把东西分成一箱一箱的。每一箱就是一个「片」(chunk),你逐个搬运,搬完一箱打一个勾,万一哪箱搬一半出问题了,只需要重搬这一箱,而不是全部重来。
为什么要用分片上传?
- 断点续传:某个片传失败了,只重传那一个片
- 并行传输:可以同时传多个片,速度更快
- 内存友好:不用把整个文件加载到内存
Python 怎么实现?
def split_file(file_path, chunk_size=1024*1024): # 每个片 1MB
"""把文件拆成小块,每次只读一小部分到内存"""
chunks = []
with open(file_path, 'rb') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
chunks.append(chunk)
return chunks
# 测试一下
chunks = split_file('./test_video.mp4')
print(f"文件被分成了 {len(chunks)} 个片")
这就像搬家公司的工人,用小推车分批搬运,而不是试图一次扛走整个衣柜。
7.4.2 断点续传:记录进度,从哪跌倒从哪爬起来
什么是断点续传?
就是记录「传到哪里了」,下次传的时候从断点继续,而不是从头开始。
怎么实现?
需要一个「记账本」——用一个文件记录已经传了哪些片。
import json
import os
class UploadProgress:
"""上传进度记录器"""
def __init__(self, task_id, total_chunks):
self.task_id = task_id
self.progress_file = f"./{task_id}_progress.json"
self.load_progress()
def load_progress(self):
"""读取记账本,看看哪些片已经传完了"""
if os.path.exists(self.progress_file):
with open(self.progress_file, 'r') as f:
self.uploaded = set(json.load(f))
else:
self.uploaded = set()
def mark_done(self, chunk_index):
"""标记某个片传完了"""
self.uploaded.add(chunk_index)
with open(self.progress_file, 'w') as f:
json.dump(list(self.uploaded), f)
def get_remaining(self, total_chunks):
"""找出还有哪些片没传"""
return [i for i in range(total_chunks) if i not in self.uploaded]
def is_complete(self, total_chunks):
"""是不是都传完了?"""
return len(self.uploaded) == total_chunks
# 使用起来很简单
progress = UploadProgress("video_001", total_chunks=100)
remaining = progress.get_remaining(100)
print(f"还需要传 {len(remaining)} 个片")

7.4.3 秒传:云盘为什么上传这么快?
你有没有注意到,有些网盘上传「秒完成」?其实不是真的传了,而是服务器发现「这文件我这儿有」,直接给你「复制」了一份。
原理很简单:给文件算一个「指纹」(哈希值),上传前先问服务器「有这个指纹吗」,有就「秒传」,没有才真正传。
import hashlib
def file_hash(file_path):
"""计算文件的 SHA-256 指纹"""
sha256 = hashlib.sha256()
with open(file_path, 'rb') as f:
# 大文件要分块读,不能一次性读进内存
for chunk in iter(lambda: f.read(8192), b''):
sha256.update(chunk)
return sha256.hexdigest()
# 先问服务器这个指纹有没有
def check_instant_upload(file_path):
"""模拟检查文件是否已存在"""
fingerprint = file_hash(file_path)
# 假设这是服务器返回的结果
server_has = fingerprint == "abc123..." # 模拟
return server_has, fingerprint
has_it, fp = check_instant_upload('./test.mp4')
if has_it:
print("秒传成功!服务器已有这个文件")
else:
print(f"需要上传,哈希值是 {fp}")
7.4.4 进度条:让用户知道「还在干活」
上传一个大文件,没有进度条用户会急死的。Python 的 tqdm 库可以让进度条变得很简单。
from tqdm import tqdm
import time
def upload_with_progress(total_chunks):
"""模拟上传过程,显示进度条"""
print("开始上传...")
# 创建一个进度条
pbar = tqdm(total=total_chunks, desc="上传进度", unit="片")
for i in range(total_chunks):
time.sleep(0.05) # 模拟网络延迟
pbar.update(1) # 更新进度
pbar.close()
print("上传完成!")
# 测试
upload_with_progress(20)

7.4.5 用流把这些串起来
现在把前面的知识串起来,写一个完整的分片上传类:
import hashlib
import json
import os
from tqdm import tqdm
class SmartUploader:
"""智能大文件上传器:支持分片、断点续传、秒传"""
def __init__(self, file_path, chunk_size=1024*1024):
self.file_path = file_path
self.chunk_size = chunk_size
self.task_id = str(abs(hash(file_path))) # 用文件路径生成唯一ID
self.total_chunks = 0
self.uploaded_chunks = set()
def get_file_hash(self):
"""计算整个文件的哈希值(用于秒传检测)"""
sha256 = hashlib.sha256()
with open(self.file_path, 'rb') as f:
for chunk in iter(lambda: f.read(self.chunk_size), b''):
sha256.update(chunk)
return sha256.hexdigest()
def get_chunk_hashes(self):
"""计算每个分片的哈希值(用于验证完整性)"""
chunk_hashes = []
with open(self.file_path, 'rb') as f:
index = 0
while True:
chunk = f.read(self.chunk_size)
if not chunk:
break
chunk_hash = hashlib.md5(chunk).hexdigest()
chunk_hashes.append(chunk_hash)
index += 1
return chunk_hashes
def load_progress(self):
"""加载断点记录"""
progress_file = f"./{self.task_id}_progress.json"
if os.path.exists(progress_file):
with open(progress_file, 'r') as f:
self.uploaded_chunks = set(json.load(f))
else:
self.uploaded_chunks = set()
def save_progress(self):
"""保存断点记录"""
progress_file = f"./{self.task_id}_progress.json"
with open(progress_file, 'w') as f:
json.dump(list(self.uploaded_chunks), f)
def upload(self):
"""执行上传"""
# 1. 秒传检测
print("检查文件是否需要上传...")
file_hash = self.get_file_hash()
print(f"文件哈希: {file_hash[:16]}...")
# 2. 加载断点
self.load_progress()
# 3. 获取所有分片
chunk_hashes = self.get_chunk_hashes()
self.total_chunks = len(chunk_hashes)
# 4. 找出还需要上传的分片
remaining = [i for i in range(self.total_chunks)
if i not in self.uploaded_chunks]
if not remaining:
print("所有分片都已上传完成!")
return True
print(f"共 {self.total_chunks} 个分片,还需上传 {len(remaining)} 个")
# 5. 上传剩余分片,带进度条
with tqdm(total=len(remaining), desc="上传进度", unit="片") as pbar:
for idx in remaining:
# 模拟实际上传(这里只是读取一下)
with open(self.file_path, 'rb') as f:
f.seek(idx * self.chunk_size)
chunk = f.read(self.chunk_size)
# 实际这里会发送到服务器
self.uploaded_chunks.add(idx)
self.save_progress() # 每传完一个片就保存进度
pbar.update(1)
print("上传完成!")
return True
# 一行代码启动上传
uploader = SmartUploader("./big_video.mp4")
uploader.upload()
🔥 实战 35 分钟:3 个递进小项目
项目 1(5 分钟):小明的文件分片器
场景:小明有一个 50MB 的日志文件要上传,但是公司限制了单次请求最大 10MB。他需要把这个文件拆开。
"""
项目1:文件分片器
把一个大文件拆成多个小文件
"""
import os
def split_file(input_file, output_dir, chunk_size_mb=10):
"""把文件拆成多个小块"""
os.makedirs(output_dir, exist_ok=True)
chunk_size = chunk_size_mb * 1024 * 1024 # 转成字节
file_size = os.path.getsize(input_file)
total_chunks = (file_size + chunk_size - 1) // chunk_size
print(f"文件大小: {file_size / 1024 / 1024:.2f} MB")
print(f"分成 {total_chunks} 个片,每片最大 {chunk_size_mb} MB")
with open(input_file, 'rb') as f:
for i in range(total_chunks):
chunk_data = f.read(chunk_size)
output_path = os.path.join(output_dir, f"part_{i:03d}")
with open(output_path, 'wb') as out:
out.write(chunk_data)
print(f" 保存: {output_path} ({len(chunk_data) / 1024:.1f} KB)")
print(f"拆分完成!共 {total_chunks} 个文件")
def merge_files(output_file, input_dir):
"""把多个小文件合并回一个大文件"""
with open(output_file, 'wb') as out:
i = 0
while True:
part_path = os.path.join(input_dir, f"part_{i:03d}")
if not os.path.exists(part_path):
break
with open(part_path, 'rb') as f:
out.write(f.read())
i += 1
print(f"合并完成: {output_file}")
# 测试一下
# 先创建一个测试文件
with open("小明的日志.txt", "w") as f:
f.write("日志内容1\n" * 1000)
# 拆
split_file("小明的日志.txt", "小明的日志分片")
# 合
merge_files("小明的日志_合并.txt", "小明的日志分片")
预期输出:
文件大小: 0.02 MB
分成 1 个片,每片最大 10 MB
保存: 小明的日志分片/part_000 (12.0 KB)
拆分完成!共 1 个文件
合并完成: 小明的日志_合并.txt
一句话解释:这个项目演示了如何把文件「切蛋糕」,切成一小块一小块地处理。
项目 2(15 分钟):带断点续传的上传器
场景:接项目 1,小明要上传这些分片给服务器。但是网络不稳定,经常断断续续。需要支持断点续传。
"""
项目2:断点续传上传器
模拟真实网络环境,支持中断后继续
"""
import os
import json
import random
import time
class ResumableUploader:
"""支持断点续传的上传器"""
def __init__(self, task_id, chunk_dir):
self.task_id = task_id
self.chunk_dir = chunk_dir
self.progress_file = f"./{task_id}_upload.json"
self.load_progress()
def load_progress(self):
"""读取断点记录"""
if os.path.exists(self.progress_file):
with open(self.progress_file, 'r') as f:
self.done_chunks = set(json.load(f))
else:
self.done_chunks = set()
def save_progress(self):
"""保存断点记录"""
with open(self.progress_file, 'w') as f:
json.dump(list(self.done_chunks), f)
print(f" [断点已保存] 已完成: {len(self.done_chunks)} 个片")
def get_chunk_files(self):
"""获取所有分片文件"""
chunks = []
i = 0
while True:
path = os.path.join(self.chunk_dir, f"part_{i:03d}")
if not os.path.exists(path):
break
chunks.append(path)
i += 1
return chunks
def simulate_upload(self, chunk_path, chunk_index):
"""模拟上传,可能失败"""
# 模拟 90% 成功率
if random.random() < 0.1:
raise Exception("网络波动,上传失败!")
time.sleep(0.1) # 模拟网络延迟
return True
def upload(self, max_chunks=None):
"""执行上传,中途可能失败"""
all_chunks = self.get_chunk_files()
total = len(all_chunks)
if max_chunks:
all_chunks = all_chunks[:max_chunks]
print(f"总共 {total} 个分片,已完成 {len(self.done_chunks)} 个")
for i, chunk_path in enumerate(all_chunks):
if i in self.done_chunks:
print(f" [{i+1}/{total}] 已上传,跳过")
continue
print(f" [{i+1}/{total}] 正在上传 {os.path.basename(chunk_path)}...")
try:
self.simulate_upload(chunk_path, i)
self.done_chunks.add(i)
self.save_progress()
except Exception as e:
print(f" [!] {e}")
print(" [!] 上传中断,再次调用 upload() 会从断点继续")
return False
print("全部上传完成!")
return True
# 模拟网络不稳定的上传
uploader = ResumableUploader("xiaoming_upload", "小明的日志分片")
# 第一次上传,故意只上传一部分
print("=== 第一次尝试(故意中断)===")
uploader.upload(max_chunks=2)
print("\n=== 第二次调用,自动从断点继续 ===")
uploader.upload()
# 清理
os.remove("xiaoming_upload_upload.json")
预期输出(可能略有不同,因为有随机失败):
=== 第一次尝试(故意中断)===
总共 1 个分片,已完成 0 个
[1/1] 正在上传 part_000...
[断点已保存] 已完成: 1 个片
=== 第二次调用,自动从断点继续 ===
总共 1 个分片,已完成 1 个片
[1/1] 已上传,跳过
全部上传完成!
一句话解释:断点续传的核心就是「每完成一个片就记个账」,下次从记账本接着来。
项目 3(15 分钟):综合实战工具——带秒传检测的大文件上传器
场景:把前面的功能全部串起来,做一个真正能用的小工具。
"""
项目3:综合实战工具——智能大文件上传器
功能:秒传检测 + 分片 + 断点续传 + 进度条
"""
import os
import hashlib
import json
import time
from tqdm import tqdm
class SmartFileUploader:
"""智能文件上传器"""
def __init__(self, file_path, chunk_size_mb=5):
self.file_path = file_path
self.chunk_size = chunk_size_mb * 1024 * 1024
self.task_id = hashlib.md5(file_path.encode()).hexdigest()[:12]
self.progress_file = f"./{self.task_id}_progress.json"
self.done_chunks = set()
def calculate_hash(self):
"""计算文件哈希(用于秒传检测)"""
print("正在计算文件指纹...")
h = hashlib.sha256()
with open(self.file_path, 'rb') as f:
for chunk in iter(lambda: f.read(self.chunk_size), b''):
h.update(chunk)
return h.hexdigest()
def check_instant_upload(self):
"""检查是否可以使用秒传"""
file_hash = self.calculate_hash()
# 模拟:只有特定哈希才触发秒传
instant_hash = "abc123def456"
if file_hash == instant_hash:
print("✅ 秒传成功!服务器已有相同文件")
return True
print(f"文件指纹: {file_hash[:20]}...(需完整上传)")
return False
def load_progress(self):
"""加载断点"""
if os.path.exists(self.progress_file):
with open(self.progress_file, 'r') as f:
self.done_chunks = set(json.load(f))
def save_progress(self, chunk_idx):
"""保存断点"""
self.done_chunks.add(chunk_idx)
with open(self.progress_file, 'w') as f:
json.dump(list(self.done_chunks), f)
def split_and_upload(self):
"""分片上传"""
self.load_progress()
# 获取文件大小,计算分片数
file_size = os.path.getsize(self.file_path)
total_chunks = (file_size + self.chunk_size - 1) // self.chunk_size
print(f"文件大小: {file_size / 1024 / 1024:.2f} MB")
print(f"分片大小: {self.chunk_size / 1024 / 1024:.2f} MB")
print(f"总分片数: {total_chunks}")
# 找出还没上传的分片
remaining = [i for i in range(total_chunks) if i not in self.done_chunks]
if not remaining:
print("🎉 所有分片已上传完成!")
return True
print(f"还需上传 {len(remaining)} 个分片...")
# 打开文件,按分片读取并上传
with open(self.file_path, 'rb') as f:
with tqdm(total=len(remaining), desc="上传进度", unit="片") as pbar:
for idx in remaining:
# 定位到对应分片的位置
f.seek(idx * self.chunk_size)
chunk_data = f.read(self.chunk_size)
# 这里是实际上传的模拟
# 实际代码:requests.post(url, data=chunk_data)
time.sleep(0.05) # 模拟网络延迟
self.save_progress(idx)
pbar.update(1)
print("✅ 上传完成!")
return True
def upload(self):
"""主入口"""
print(f"📁 文件: {self.file_path}")
print(f"🆔 任务ID: {self.task_id}")
print("-" * 40)
# 1. 秒传检测
if self.check_instant_upload():
return True
# 2. 分片上传(带断点续传)
return self.split_and_upload()
# ===== 使用示例 =====
if __name__ == "__main__":
# 创建一个测试大文件(50MB)
print("正在创建 50MB 测试文件...")
with open("test_large_file.bin", "wb") as f:
f.write(b"x" * (50 * 1024 * 1024))
print("\n" + "="*40)
print("第一次上传(完整流程)")
print("="*40)
uploader = SmartFileUploader("test_large_file.bin", chunk_size_mb=10)
uploader.upload()
print("\n" + "="*40)
print("第二次上传(模拟断点续传)")
print("="*40)
uploader2 = SmartFileUploader("test_large_file.bin", chunk_size_mb=10)
uploader2.upload()
# 清理测试文件
os.remove("test_large_file.bin")
for f in os.listdir("."):
if f.endswith("_progress.json"):
os.remove(f)
预期输出:
正在创建 50MB 测试文件...
========================================
第一次上传(完整流程)
========================================
📁 文件: test_large_file.bin
🆔 任务ID: 1e6d8c9f2a3b
----------------------------------------
正在计算文件指纹...
文件指纹: 3a5c7e9b1d2f4a6c8e0f...
文件大小: 50.00 MB
分片大小: 10.00 MB
总分片数: 5
还需上传 5 个分片...
上传进度: 100%|██████████| 5/5 [00:00<00:00, 50.00片/s]
✅ 上传完成!
========================================
第二次上传(模拟断点续传)
========================================
📁 文件: test_large_file.bin
🆔 任务ID: 1e6d8c9f2a3b
----------------------------------------
正在计算文件指纹...
文件指纹: 3a5c7e9b1d2f4a6c8e0f...
文件大小: 50.00 MB
分片大小: 10.00 MB
总分片数: 5
🎉 所有分片已上传完成!
一句话解释:把秒传检测、分片上传、断点续传三个功能组合起来,就是一个工业级的文件上传工具。
💪 进阶 20 分钟:常见坑 + 性能小贴士
坑 1:读取大文件时内存爆炸
# ❌ 错误做法:一次性读取整个文件
with open("big_file.bin", "rb") as f:
data = f.read() # 2GB文件直接爆内存
# ✅ 正确做法:分块读取
with open("big_file.bin", "rb") as f:
for chunk in iter(lambda: f.read(1024*1024), b''):
process(chunk) # 处理每一块
坑 2:断点文件被误删
# ❌ 错误做法:断点文件和源文件放一起,容易混淆
progress_file = f"./{filename}_progress.json"
# ✅ 正确做法:用文件内容的哈希作为ID
progress_file = f"./{file_content_hash}_progress.json"
坑 3:进度条更新太频繁
# ❌ 错误做法:每个字节都更新进度条,慢死了
for byte in data:
pbar.update(1) # 一亿字节更新一亿次
# ✅ 正确做法:分片级别更新
chunk_size = 1024 * 1024
for chunk in iter(lambda: f.read(chunk_size), b''):
pbar.update(1) # 每1MB更新一次
坑 4:并发上传时进度错乱
# ❌ 错误做法:多线程同时写进度文件,数据错乱
def upload_async(chunks):
for chunk in chunks:
upload(chunk)
save_progress() # 多线程同时写,乱了
# ✅ 正确做法:加锁,或者让主线程统一管理进度
import threading
lock = threading.Lock()
def upload_async(chunk):
upload(chunk)
with lock:
save_progress() # 同一时刻只有一个人在记账
坑 5:网络重试没有限制
# ❌ 错误做法:无限重试,卡死在某个片上
while True:
try:
upload(chunk)
break
except:
pass
# ✅ 正确做法:设置重试上限
for attempt in range(3):
try:
upload(chunk)
break
except:
if attempt == 2:
raise Exception("上传失败,已重试3次")
性能小贴士:并行上传多个片
from concurrent.futures import ThreadPoolExecutor
def parallel_upload(chunks, max_workers=4):
"""并行上传多个分片,速度更快"""
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(upload_one_chunk, c) for c in chunks]
for future in futures:
future.result() # 等待所有完成
调试技巧:加日志
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')
logger = logging.getLogger(__name__)
def upload_with_log(chunk_index, data):
logger.info(f"开始上传分片 {chunk_index}, 大小: {len(data)} bytes")
try:
result = do_upload(data)
logger.info(f"分片 {chunk_index} 上传成功")
return result
except Exception as e:
logger.error(f"分片 {chunk_index} 上传失败: {e}")
raise
✏️ 练习题
练习 1(1 分钟):抄改基础题
# 修改下面代码,把分片大小从 1MB 改成 5MB
uploader = SmartUploader("./test.mp4")
uploader.upload()
- 预期输出:显示新的分片大小 5.00 MB
练习 2(2 分钟):加判断
给断点续传的 upload 方法加一个判断:如果所有分片都已完成,直接打印"无需上传"而不是进入上传流程。
- 提示:在
upload方法开头加一个if not remaining:
练习 3(2 分钟):处理新数据
创建一个新的 ResumableUploader 实例,上传 项目1 生成的 小明的日志分片 文件夹。模拟中断一次,然后继续。
- 输入:
"小明的日志分片"目录 - 预期输出:显示跳过的片和继续上传的片
练习 4(3 分钟):串接两个项目
把 项目1 的文件拆分功能和 项目2 的断点续传功能串起来:先拆分,再模拟上传。
- 提示:先调用
split_file(),再创建ResumableUploader
练习 5(2 分钟):报错分析
以下代码运行时会报错,请分析原因:
uploader = SmartFileUploader("不存在的文件.mp4")
uploader.upload()
- 预期报错:
FileNotFoundError - 提示:检查文件是否存在后再创建 Uploader
作业:做一个「大文件分片上传下载实战工具」
需求描述:综合运用本文学到的知识,写一个完整的工具,支持:
功能点:
1. 分片上传:把文件拆成多个片上传,带进度条
2. 断点续传:上传中断后,下次调用能从断点继续
3. 秒传检测:上传前先问服务器,有相同的就跳过
加分项:
1. 支持 Ctrl+C 中断后自动保存断点
2. 显示上传速度(MB/s)
验收标准:
- 能跑起来不报错
- 中断后再次运行能从断点继续
- 代码有注释,说明每一步在干嘛
提交方式:把代码保存为 uploader.py,直接运行 python uploader.py 你的文件路径
📚 总结 + 资源
一句话总结三个核心点:
分片上传让大文件变得可控,断点续传让失败不再可怕,秒传检测让重复文件上传秒完成。
延伸学习资源:
- Python 官方文档:io 模块——深入理解流的概念
- 《Python编程:从入门到实践》——第 11 章关于文件的实战项目
- tqdm 官方文档——进度条的各种用法
互动钩子:
你在上传大文件时遇到过什么奇葩问题?是断网、公司电脑蓝屏、还是上传到一半发现没电了?评论区聊聊你的经历,老粉优先回复!
下一章我们要学习 第8章 8.1 child_process 子进程,到时候你会发现,光在自己进程里折腾还不够,有时候需要「召唤」另一个程序来帮你干活——比如调用
ffmpeg转码视频、调用git做版本控制,这些都需要子进程来完成。敬请期待!

评论(0)