第7章 7.4 综合实战:大文件分片上传下载

🎯 开场 3 分钟:为什么你的上传总是卡在半路?

想象这个场景:你在上传一个 2GB 的视频文件到网盘,进度条跑到 97%,突然断网了或者公司电脑没电了。这时候你会怎么办?只能从头再来一遍,眼睁睁看着进度条从 0% 慢慢爬到 97%,那种感觉别提多难受了。

或者,你是做后端开发的,用户投诉说「上传大文件就失败」,你排查半天发现是内存爆了——因为服务器一次性把整个文件加载到内存里。

学完这一章,你就能自己写出一个「断点续传 + 分片上传 + 进度条 + 秒传」的大文件处理工具,这些痛点统统不存在。

上一章我们学会了用流的「背压」机制来控制数据流的节奏,避免内存被撑爆。这一章我们就用这个能力,来解决真实世界里的大文件传输问题。


🧱 基础 25 分钟:核心概念

7.4.1 分片上传:把大象装进冰箱的正确姿势

什么是分片上传?

就像搬家的时候,你不会一次把整个家的东西同时搬,而是把东西分成一箱一箱的。每一箱就是一个「片」(chunk),你逐个搬运,搬完一箱打一个勾,万一哪箱搬一半出问题了,只需要重搬这一箱,而不是全部重来。

为什么要用分片上传?

  • 断点续传:某个片传失败了,只重传那一个片
  • 并行传输:可以同时传多个片,速度更快
  • 内存友好:不用把整个文件加载到内存

Python 怎么实现?

def split_file(file_path, chunk_size=1024*1024):  # 每个片 1MB
"""把文件拆成小块,每次只读一小部分到内存"""
chunks = []
with open(file_path, 'rb') as f:
    while True:
        chunk = f.read(chunk_size)
        if not chunk:
            break
        chunks.append(chunk)
return chunks

# 测试一下
chunks = split_file('./test_video.mp4')
print(f"文件被分成了 {len(chunks)} 个片")

这就像搬家公司的工人,用小推车分批搬运,而不是试图一次扛走整个衣柜。

7.4.2 断点续传:记录进度,从哪跌倒从哪爬起来

什么是断点续传?

就是记录「传到哪里了」,下次传的时候从断点继续,而不是从头开始。

怎么实现?

需要一个「记账本」——用一个文件记录已经传了哪些片。

import json
import os

class UploadProgress:
"""上传进度记录器"""

def __init__(self, task_id, total_chunks):
    self.task_id = task_id
    self.progress_file = f"./{task_id}_progress.json"
    self.load_progress()

def load_progress(self):
    """读取记账本,看看哪些片已经传完了"""
    if os.path.exists(self.progress_file):
        with open(self.progress_file, 'r') as f:
            self.uploaded = set(json.load(f))
    else:
        self.uploaded = set()

def mark_done(self, chunk_index):
    """标记某个片传完了"""
    self.uploaded.add(chunk_index)
    with open(self.progress_file, 'w') as f:
        json.dump(list(self.uploaded), f)

def get_remaining(self, total_chunks):
    """找出还有哪些片没传"""
    return [i for i in range(total_chunks) if i not in self.uploaded]

def is_complete(self, total_chunks):
    """是不是都传完了?"""
    return len(self.uploaded) == total_chunks

# 使用起来很简单
progress = UploadProgress("video_001", total_chunks=100)
remaining = progress.get_remaining(100)
print(f"还需要传 {len(remaining)} 个片")

配图1 - 配图1

7.4.3 秒传:云盘为什么上传这么快?

你有没有注意到,有些网盘上传「秒完成」?其实不是真的传了,而是服务器发现「这文件我这儿有」,直接给你「复制」了一份。

原理很简单:给文件算一个「指纹」(哈希值),上传前先问服务器「有这个指纹吗」,有就「秒传」,没有才真正传。

import hashlib

def file_hash(file_path):
"""计算文件的 SHA-256 指纹"""
sha256 = hashlib.sha256()
with open(file_path, 'rb') as f:
    # 大文件要分块读,不能一次性读进内存
    for chunk in iter(lambda: f.read(8192), b''):
        sha256.update(chunk)
return sha256.hexdigest()

# 先问服务器这个指纹有没有
def check_instant_upload(file_path):
"""模拟检查文件是否已存在"""
fingerprint = file_hash(file_path)
# 假设这是服务器返回的结果
server_has = fingerprint == "abc123..."  # 模拟
return server_has, fingerprint

has_it, fp = check_instant_upload('./test.mp4')
if has_it:
print("秒传成功!服务器已有这个文件")
else:
print(f"需要上传,哈希值是 {fp}")

7.4.4 进度条:让用户知道「还在干活」

上传一个大文件,没有进度条用户会急死的。Python 的 tqdm 库可以让进度条变得很简单。

from tqdm import tqdm
import time

def upload_with_progress(total_chunks):
"""模拟上传过程,显示进度条"""
print("开始上传...")

# 创建一个进度条
pbar = tqdm(total=total_chunks, desc="上传进度", unit="片")

for i in range(total_chunks):
    time.sleep(0.05)  # 模拟网络延迟
    pbar.update(1)  # 更新进度

pbar.close()
print("上传完成!")

# 测试
upload_with_progress(20)

配图2 - 配图2

7.4.5 用流把这些串起来

现在把前面的知识串起来,写一个完整的分片上传类:

import hashlib
import json
import os
from tqdm import tqdm

class SmartUploader:
"""智能大文件上传器:支持分片、断点续传、秒传"""

def __init__(self, file_path, chunk_size=1024*1024):
    self.file_path = file_path
    self.chunk_size = chunk_size
    self.task_id = str(abs(hash(file_path)))  # 用文件路径生成唯一ID
    self.total_chunks = 0
    self.uploaded_chunks = set()

def get_file_hash(self):
    """计算整个文件的哈希值(用于秒传检测)"""
    sha256 = hashlib.sha256()
    with open(self.file_path, 'rb') as f:
        for chunk in iter(lambda: f.read(self.chunk_size), b''):
            sha256.update(chunk)
    return sha256.hexdigest()

def get_chunk_hashes(self):
    """计算每个分片的哈希值(用于验证完整性)"""
    chunk_hashes = []
    with open(self.file_path, 'rb') as f:
        index = 0
        while True:
            chunk = f.read(self.chunk_size)
            if not chunk:
                break
            chunk_hash = hashlib.md5(chunk).hexdigest()
            chunk_hashes.append(chunk_hash)
            index += 1
    return chunk_hashes

def load_progress(self):
    """加载断点记录"""
    progress_file = f"./{self.task_id}_progress.json"
    if os.path.exists(progress_file):
        with open(progress_file, 'r') as f:
            self.uploaded_chunks = set(json.load(f))
    else:
        self.uploaded_chunks = set()

def save_progress(self):
    """保存断点记录"""
    progress_file = f"./{self.task_id}_progress.json"
    with open(progress_file, 'w') as f:
        json.dump(list(self.uploaded_chunks), f)

def upload(self):
    """执行上传"""
    # 1. 秒传检测
    print("检查文件是否需要上传...")
    file_hash = self.get_file_hash()
    print(f"文件哈希: {file_hash[:16]}...")

    # 2. 加载断点
    self.load_progress()

    # 3. 获取所有分片
    chunk_hashes = self.get_chunk_hashes()
    self.total_chunks = len(chunk_hashes)

    # 4. 找出还需要上传的分片
    remaining = [i for i in range(self.total_chunks) 
                 if i not in self.uploaded_chunks]

    if not remaining:
        print("所有分片都已上传完成!")
        return True

    print(f"共 {self.total_chunks} 个分片,还需上传 {len(remaining)} 个")

    # 5. 上传剩余分片,带进度条
    with tqdm(total=len(remaining), desc="上传进度", unit="片") as pbar:
        for idx in remaining:
            # 模拟实际上传(这里只是读取一下)
            with open(self.file_path, 'rb') as f:
                f.seek(idx * self.chunk_size)
                chunk = f.read(self.chunk_size)
                # 实际这里会发送到服务器

            self.uploaded_chunks.add(idx)
            self.save_progress()  # 每传完一个片就保存进度
            pbar.update(1)

    print("上传完成!")
    return True

# 一行代码启动上传
uploader = SmartUploader("./big_video.mp4")
uploader.upload()

🔥 实战 35 分钟:3 个递进小项目

项目 1(5 分钟):小明的文件分片器

场景:小明有一个 50MB 的日志文件要上传,但是公司限制了单次请求最大 10MB。他需要把这个文件拆开。

"""
项目1:文件分片器
把一个大文件拆成多个小文件
"""

import os

def split_file(input_file, output_dir, chunk_size_mb=10):
"""把文件拆成多个小块"""
os.makedirs(output_dir, exist_ok=True)

chunk_size = chunk_size_mb * 1024 * 1024  # 转成字节
file_size = os.path.getsize(input_file)
total_chunks = (file_size + chunk_size - 1) // chunk_size

print(f"文件大小: {file_size / 1024 / 1024:.2f} MB")
print(f"分成 {total_chunks} 个片,每片最大 {chunk_size_mb} MB")

with open(input_file, 'rb') as f:
    for i in range(total_chunks):
        chunk_data = f.read(chunk_size)
        output_path = os.path.join(output_dir, f"part_{i:03d}")
        with open(output_path, 'wb') as out:
            out.write(chunk_data)
        print(f"  保存: {output_path} ({len(chunk_data) / 1024:.1f} KB)")

print(f"拆分完成!共 {total_chunks} 个文件")

def merge_files(output_file, input_dir):
"""把多个小文件合并回一个大文件"""
with open(output_file, 'wb') as out:
    i = 0
    while True:
        part_path = os.path.join(input_dir, f"part_{i:03d}")
        if not os.path.exists(part_path):
            break
        with open(part_path, 'rb') as f:
            out.write(f.read())
        i += 1
print(f"合并完成: {output_file}")

# 测试一下
# 先创建一个测试文件
with open("小明的日志.txt", "w") as f:
f.write("日志内容1\n" * 1000)

# 拆
split_file("小明的日志.txt", "小明的日志分片")

# 合
merge_files("小明的日志_合并.txt", "小明的日志分片")

预期输出

文件大小: 0.02 MB
分成 1 个片,每片最大 10 MB
保存: 小明的日志分片/part_000 (12.0 KB)
拆分完成!共 1 个文件
合并完成: 小明的日志_合并.txt

一句话解释:这个项目演示了如何把文件「切蛋糕」,切成一小块一小块地处理。


项目 2(15 分钟):带断点续传的上传器

场景:接项目 1,小明要上传这些分片给服务器。但是网络不稳定,经常断断续续。需要支持断点续传。

"""
项目2:断点续传上传器
模拟真实网络环境,支持中断后继续
"""

import os
import json
import random
import time

class ResumableUploader:
"""支持断点续传的上传器"""

def __init__(self, task_id, chunk_dir):
    self.task_id = task_id
    self.chunk_dir = chunk_dir
    self.progress_file = f"./{task_id}_upload.json"
    self.load_progress()

def load_progress(self):
    """读取断点记录"""
    if os.path.exists(self.progress_file):
        with open(self.progress_file, 'r') as f:
            self.done_chunks = set(json.load(f))
    else:
        self.done_chunks = set()

def save_progress(self):
    """保存断点记录"""
    with open(self.progress_file, 'w') as f:
        json.dump(list(self.done_chunks), f)
    print(f"  [断点已保存] 已完成: {len(self.done_chunks)} 个片")

def get_chunk_files(self):
    """获取所有分片文件"""
    chunks = []
    i = 0
    while True:
        path = os.path.join(self.chunk_dir, f"part_{i:03d}")
        if not os.path.exists(path):
            break
        chunks.append(path)
        i += 1
    return chunks

def simulate_upload(self, chunk_path, chunk_index):
    """模拟上传,可能失败"""
    # 模拟 90% 成功率
    if random.random() < 0.1:
        raise Exception("网络波动,上传失败!")
    time.sleep(0.1)  # 模拟网络延迟
    return True

def upload(self, max_chunks=None):
    """执行上传,中途可能失败"""
    all_chunks = self.get_chunk_files()
    total = len(all_chunks)

    if max_chunks:
        all_chunks = all_chunks[:max_chunks]

    print(f"总共 {total} 个分片,已完成 {len(self.done_chunks)} 个")

    for i, chunk_path in enumerate(all_chunks):
        if i in self.done_chunks:
            print(f"  [{i+1}/{total}] 已上传,跳过")
            continue

        print(f"  [{i+1}/{total}] 正在上传 {os.path.basename(chunk_path)}...")

        try:
            self.simulate_upload(chunk_path, i)
            self.done_chunks.add(i)
            self.save_progress()
        except Exception as e:
            print(f"  [!] {e}")
            print("  [!] 上传中断,再次调用 upload() 会从断点继续")
            return False

    print("全部上传完成!")
    return True

# 模拟网络不稳定的上传
uploader = ResumableUploader("xiaoming_upload", "小明的日志分片")

# 第一次上传,故意只上传一部分
print("=== 第一次尝试(故意中断)===")
uploader.upload(max_chunks=2)

print("\n=== 第二次调用,自动从断点继续 ===")
uploader.upload()

# 清理
os.remove("xiaoming_upload_upload.json")

预期输出(可能略有不同,因为有随机失败):

=== 第一次尝试(故意中断)===
总共 1 个分片,已完成 0 个
[1/1] 正在上传 part_000...
[断点已保存] 已完成: 1 个片
=== 第二次调用,自动从断点继续 ===
总共 1 个分片,已完成 1 个片
[1/1] 已上传,跳过
全部上传完成!

一句话解释:断点续传的核心就是「每完成一个片就记个账」,下次从记账本接着来。


项目 3(15 分钟):综合实战工具——带秒传检测的大文件上传器

场景:把前面的功能全部串起来,做一个真正能用的小工具。

"""
项目3:综合实战工具——智能大文件上传器
功能:秒传检测 + 分片 + 断点续传 + 进度条
"""

import os
import hashlib
import json
import time
from tqdm import tqdm

class SmartFileUploader:
"""智能文件上传器"""

def __init__(self, file_path, chunk_size_mb=5):
    self.file_path = file_path
    self.chunk_size = chunk_size_mb * 1024 * 1024
    self.task_id = hashlib.md5(file_path.encode()).hexdigest()[:12]
    self.progress_file = f"./{self.task_id}_progress.json"
    self.done_chunks = set()

def calculate_hash(self):
    """计算文件哈希(用于秒传检测)"""
    print("正在计算文件指纹...")
    h = hashlib.sha256()
    with open(self.file_path, 'rb') as f:
        for chunk in iter(lambda: f.read(self.chunk_size), b''):
            h.update(chunk)
    return h.hexdigest()

def check_instant_upload(self):
    """检查是否可以使用秒传"""
    file_hash = self.calculate_hash()
    # 模拟:只有特定哈希才触发秒传
    instant_hash = "abc123def456"
    if file_hash == instant_hash:
        print("✅ 秒传成功!服务器已有相同文件")
        return True
    print(f"文件指纹: {file_hash[:20]}...(需完整上传)")
    return False

def load_progress(self):
    """加载断点"""
    if os.path.exists(self.progress_file):
        with open(self.progress_file, 'r') as f:
            self.done_chunks = set(json.load(f))

def save_progress(self, chunk_idx):
    """保存断点"""
    self.done_chunks.add(chunk_idx)
    with open(self.progress_file, 'w') as f:
        json.dump(list(self.done_chunks), f)

def split_and_upload(self):
    """分片上传"""
    self.load_progress()

    # 获取文件大小,计算分片数
    file_size = os.path.getsize(self.file_path)
    total_chunks = (file_size + self.chunk_size - 1) // self.chunk_size

    print(f"文件大小: {file_size / 1024 / 1024:.2f} MB")
    print(f"分片大小: {self.chunk_size / 1024 / 1024:.2f} MB")
    print(f"总分片数: {total_chunks}")

    # 找出还没上传的分片
    remaining = [i for i in range(total_chunks) if i not in self.done_chunks]

    if not remaining:
        print("🎉 所有分片已上传完成!")
        return True

    print(f"还需上传 {len(remaining)} 个分片...")

    # 打开文件,按分片读取并上传
    with open(self.file_path, 'rb') as f:
        with tqdm(total=len(remaining), desc="上传进度", unit="片") as pbar:
            for idx in remaining:
                # 定位到对应分片的位置
                f.seek(idx * self.chunk_size)
                chunk_data = f.read(self.chunk_size)

                # 这里是实际上传的模拟
                # 实际代码:requests.post(url, data=chunk_data)
                time.sleep(0.05)  # 模拟网络延迟

                self.save_progress(idx)
                pbar.update(1)

    print("✅ 上传完成!")
    return True

def upload(self):
    """主入口"""
    print(f"📁 文件: {self.file_path}")
    print(f"🆔 任务ID: {self.task_id}")
    print("-" * 40)

    # 1. 秒传检测
    if self.check_instant_upload():
        return True

    # 2. 分片上传(带断点续传)
    return self.split_and_upload()

# ===== 使用示例 =====
if __name__ == "__main__":
# 创建一个测试大文件(50MB)
print("正在创建 50MB 测试文件...")
with open("test_large_file.bin", "wb") as f:
    f.write(b"x" * (50 * 1024 * 1024))

print("\n" + "="*40)
print("第一次上传(完整流程)")
print("="*40)
uploader = SmartFileUploader("test_large_file.bin", chunk_size_mb=10)
uploader.upload()

print("\n" + "="*40)
print("第二次上传(模拟断点续传)")
print("="*40)
uploader2 = SmartFileUploader("test_large_file.bin", chunk_size_mb=10)
uploader2.upload()

# 清理测试文件
os.remove("test_large_file.bin")
for f in os.listdir("."):
    if f.endswith("_progress.json"):
        os.remove(f)

预期输出

正在创建 50MB 测试文件...

========================================
第一次上传(完整流程)
========================================
📁 文件: test_large_file.bin
🆔 任务ID: 1e6d8c9f2a3b
----------------------------------------
正在计算文件指纹...
文件指纹: 3a5c7e9b1d2f4a6c8e0f...
文件大小: 50.00 MB
分片大小: 10.00 MB
总分片数: 5
还需上传 5 个分片...
上传进度: 100%|██████████| 5/5 [00:00<00:00, 50.00片/s]
✅ 上传完成!

========================================
第二次上传(模拟断点续传)
========================================
📁 文件: test_large_file.bin
🆔 任务ID: 1e6d8c9f2a3b
----------------------------------------
正在计算文件指纹...
文件指纹: 3a5c7e9b1d2f4a6c8e0f...
文件大小: 50.00 MB
分片大小: 10.00 MB
总分片数: 5
🎉 所有分片已上传完成!

一句话解释:把秒传检测、分片上传、断点续传三个功能组合起来,就是一个工业级的文件上传工具。


💪 进阶 20 分钟:常见坑 + 性能小贴士

坑 1:读取大文件时内存爆炸

# ❌ 错误做法:一次性读取整个文件
with open("big_file.bin", "rb") as f:
data = f.read()  # 2GB文件直接爆内存

# ✅ 正确做法:分块读取
with open("big_file.bin", "rb") as f:
for chunk in iter(lambda: f.read(1024*1024), b''):
    process(chunk)  # 处理每一块

坑 2:断点文件被误删

# ❌ 错误做法:断点文件和源文件放一起,容易混淆
progress_file = f"./{filename}_progress.json"

# ✅ 正确做法:用文件内容的哈希作为ID
progress_file = f"./{file_content_hash}_progress.json"

坑 3:进度条更新太频繁

# ❌ 错误做法:每个字节都更新进度条,慢死了
for byte in data:
pbar.update(1)  # 一亿字节更新一亿次

# ✅ 正确做法:分片级别更新
chunk_size = 1024 * 1024
for chunk in iter(lambda: f.read(chunk_size), b''):
pbar.update(1)  # 每1MB更新一次

坑 4:并发上传时进度错乱

# ❌ 错误做法:多线程同时写进度文件,数据错乱
def upload_async(chunks):
for chunk in chunks:
    upload(chunk)
    save_progress()  # 多线程同时写,乱了

# ✅ 正确做法:加锁,或者让主线程统一管理进度
import threading
lock = threading.Lock()

def upload_async(chunk):
upload(chunk)
with lock:
    save_progress()  # 同一时刻只有一个人在记账

坑 5:网络重试没有限制

# ❌ 错误做法:无限重试,卡死在某个片上
while True:
try:
    upload(chunk)
    break
except:
    pass

# ✅ 正确做法:设置重试上限
for attempt in range(3):
try:
    upload(chunk)
    break
except:
    if attempt == 2:
        raise Exception("上传失败,已重试3次")

性能小贴士:并行上传多个片

from concurrent.futures import ThreadPoolExecutor

def parallel_upload(chunks, max_workers=4):
"""并行上传多个分片,速度更快"""
with ThreadPoolExecutor(max_workers=max_workers) as executor:
    futures = [executor.submit(upload_one_chunk, c) for c in chunks]
    for future in futures:
        future.result()  # 等待所有完成

调试技巧:加日志

import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')
logger = logging.getLogger(__name__)

def upload_with_log(chunk_index, data):
logger.info(f"开始上传分片 {chunk_index}, 大小: {len(data)} bytes")
try:
    result = do_upload(data)
    logger.info(f"分片 {chunk_index} 上传成功")
    return result
except Exception as e:
    logger.error(f"分片 {chunk_index} 上传失败: {e}")
    raise

✏️ 练习题

练习 1(1 分钟):抄改基础题

# 修改下面代码,把分片大小从 1MB 改成 5MB
uploader = SmartUploader("./test.mp4")
uploader.upload()
  • 预期输出:显示新的分片大小 5.00 MB

练习 2(2 分钟):加判断

给断点续传的 upload 方法加一个判断:如果所有分片都已完成,直接打印"无需上传"而不是进入上传流程。

  • 提示:在 upload 方法开头加一个 if not remaining:

练习 3(2 分钟):处理新数据

创建一个新的 ResumableUploader 实例,上传 项目1 生成的 小明的日志分片 文件夹。模拟中断一次,然后继续。

  • 输入"小明的日志分片" 目录
  • 预期输出:显示跳过的片和继续上传的片

练习 4(3 分钟):串接两个项目

项目1 的文件拆分功能和 项目2 的断点续传功能串起来:先拆分,再模拟上传。

  • 提示:先调用 split_file(),再创建 ResumableUploader

练习 5(2 分钟):报错分析

以下代码运行时会报错,请分析原因:

uploader = SmartFileUploader("不存在的文件.mp4")
uploader.upload()
  • 预期报错FileNotFoundError
  • 提示:检查文件是否存在后再创建 Uploader

作业:做一个「大文件分片上传下载实战工具」

需求描述:综合运用本文学到的知识,写一个完整的工具,支持:

功能点
1. 分片上传:把文件拆成多个片上传,带进度条
2. 断点续传:上传中断后,下次调用能从断点继续
3. 秒传检测:上传前先问服务器,有相同的就跳过

加分项
1. 支持 Ctrl+C 中断后自动保存断点
2. 显示上传速度(MB/s)

验收标准
- 能跑起来不报错
- 中断后再次运行能从断点继续
- 代码有注释,说明每一步在干嘛

提交方式:把代码保存为 uploader.py,直接运行 python uploader.py 你的文件路径


📚 总结 + 资源

一句话总结三个核心点

分片上传让大文件变得可控,断点续传让失败不再可怕,秒传检测让重复文件上传秒完成。

延伸学习资源
- Python 官方文档:io 模块——深入理解流的概念
- 《Python编程:从入门到实践》——第 11 章关于文件的实战项目
- tqdm 官方文档——进度条的各种用法

互动钩子

你在上传大文件时遇到过什么奇葩问题?是断网、公司电脑蓝屏、还是上传到一半发现没电了?评论区聊聊你的经历,老粉优先回复!


下一章我们要学习 第8章 8.1 child_process 子进程,到时候你会发现,光在自己进程里折腾还不够,有时候需要「召唤」另一个程序来帮你干活——比如调用 ffmpeg 转码视频、调用 git 做版本控制,这些都需要子进程来完成。敬请期待!

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。