元数据卡
- 前置知识:第4章(SQL for Analysis)、Python pandas 基础
- 预计时间:50 分钟
- 核心难度:进阶
- 阅读模式:高度专注
- 完成标志:能识别 pandas 已无法处理的数据规模场景,并能选择正确的分布式工具
你的进度
你的模型在 10 万条记录上跑得不错。但情报官扔给你新任务:'这里有 10 个前哨站过去五年的完整日志,大概 500GB。给我分析一下装备磨损率和战斗频率的关系。'
pandas 加载到一半直接 OOM 了。你意识到单个机器的内存不够用了——数据已经大到超出单机处理能力的边界。
你需要分布式数据处理。
你的任务
你的数据集从 CSV 变成了几百个 CSV,从内存能放下变成十几个 GB。df.groupby() 跑了一个小时还没出结果,pd.concat() 直接 OOM。Pandas 在设计时假设数据能放进内存——当这个假设不成立时,你需要分布式数据处理。这一章教你从 pandas 的极限出发,平滑过渡到 Dask 和 Spark SQL。
Pandas 的极限与替代方案
Pandas 在单机内存 < 16 GB 且数据 < 可用内存一半时表现良好。当数据超过这个阈值,你有三个方向:
- 迭代/分块 — 不加载全部数据,逐块处理
- 延迟计算 — 构建计算图,按需触发
- 分布式 — 多台机器并行算
import pandas as pd
# 方向1:分块读取
chunk_size = 100_000
results = []
for chunk in pd.read_csv("large_dataset.csv", chunksize=chunk_size):
# 对每个块做聚合
chunk_result = chunk.groupby("mission_type")["resources"].sum()
results.append(chunk_result)
# 合并各块结果
final = pd.concat(results).groupby(level=0).sum()分块读取能解决内存问题,但速度上不做并行——每个块依次处理。需要并行时,转向 Dask。
Dask:Pandas 的分布式扩展
Dask 提供了和 pandas 几乎相同的 DataFrame API,但底层是延迟计算的。
import dask.dataframe as dd
# 读取多个文件 —— 单个 Dask DataFrame
ddf = dd.read_csv("missions_*.csv")
# 写法和 pandas 一样,但还没有真正计算
result = ddf.groupby("mission_type")["resources"].sum()
# 触发计算 —— 这一步才真正跑
final = result.compute()
print(final)Dask 做了什么:
read_csv时只读取文件分区信息,不加载数据groupby构建了一个计算图(任务图),不执行compute()才触发计算,利用多核或集群并行
Dask 的局限:不是所有 pandas 操作都被支持。特别是复杂的索引操作、groupby().transform() 的一些用法。需要先用 Dask 跑一遍试试,落地的部分再用 pandas 补。
# 常用操作 —— Dask 支持良好
ddf["duration_hours"] = ddf["duration_minutes"] / 60
ddf.dropna(subset=["success_rate"])
ddf[ddf["success_rate"] > 0.5].groupby("region").size().compute()
# 复杂操作 —— 可能需要转为 pandas
subset = ddf[ddf["region"] == "north"].compute()Spark SQL:从 SQL 到分布式
如果你的团队已经在用 Spark,或者数据量大到 Dask 不够用(超过几十 GB 且需要集群),Spark SQL 是最通用的选择。
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("data_analysis").getOrCreate()
# 从文件读入
df_spark = spark.read.csv("missions_*.csv", header=True, inferSchema=True)
# 使用 SQL 做分析
df_spark.createOrReplaceTempView("missions")
result_sql = spark.sql("""
SELECT mission_type, AVG(success_rate) as avg_success
FROM missions
WHERE resources > 100
GROUP BY mission_type
HAVING avg_success > 0.6
""")
result_sql.show()
# 或者用 DataFrame API
from pyspark.sql import functions as F
result_df = (
df_spark
.filter(F.col("resources") > 100)
.groupBy("mission_type")
.agg(F.avg("success_rate").alias("avg_success"))
.filter(F.col("avg_success") > 0.6)
)
result_df.show()Spark SQL 的延迟机制和 Dask 类似——只有调用 show() 或 write() 时才触发计算。
选择原则
| 场景 | 工具 | 原因 |
|---|---|---|
| 数据 < 16GB, 单机 | pandas | 最简单,生态最丰富 |
| 数据 16GB-100GB, 单机/小集群 | Dask | API 最接近 pandas,迁移成本最低 |
| 数据 > 100GB, 需要集群 | Spark SQL | 成熟稳定,资源管理完善 |
| 流式数据 | Spark Streaming / Flink | 延时敏感场景 |
性能优化的普适原则
无论你用哪种工具,这些原则通用:
- 尽早过滤。在
groupby之前做filter,减少下游处理的数据量。 - 减少 Shuffle。
groupby按非常见键会触发全量数据重排。尽量按分区键做分组。 - 只读需要的列。
read_csv(usecols=[...])比读完再 drop 快得多。 - 选择合适的文件格式。Parquet 比 CSV 快 5-10 倍,因为列式存储只读取需要的列。
# pandas 读 parquet
df = pd.read_parquet("missions.parquet")
# Dask 读 parquet
ddf = dd.read_parquet("missions_*.parquet")
# Spark 读 parquet
df_spark = spark.read.parquet("missions/")常见陷阱
- 在 Dask 中频繁调用
compute()。每次compute()触发全量计算。你应该构建整个计算图,最后只调一次compute()。 - 用默认分区数。不够时每个分区数据量太大→OOM,太多时每个分区太小→调度开销大于计算。根据数据量和集群资源调整
npartitions。 - 忘记数据本地性。如果计算节点需要从远程读取数据,网络会成为瓶颈。把数据放在计算节点所在集群的存储上。
通关挑战
- 热身:用 pandas 的分块读取功能处理一个超过内存的大文件。
- 挑战:把你的一个 pandas 分析脚本迁移到 Dask,记录代码改动点。需要改多少行?哪些操作不能直接迁移?
- 观察:比较同一数据集上用 pandas(分块读取)、Dask、Spark SQL 的执行时间。
验收标准
- 知道什么时候该从 pandas 切换到分布式工具
- 能用 Dask 做 DataFrame 级别的分布式计算
- 理解延迟计算和触发计算的区别
- 会在 Spark SQL 中写分析查询
旅人笔记
分布式不是魔法——它用多台机器分担计算,但也带来了网络开销和调度复杂度。先评估数据量,再选工具。
下一站预告
技术手段讲了很多。但数据科学不只是技术问题——下一章,我们停下来思考数据伦理。