Skip to content

元数据卡

  • 前置知识:第4章(SQL for Analysis)、Python pandas 基础
  • 预计时间:50 分钟
  • 核心难度:进阶
  • 阅读模式:高度专注
  • 完成标志:能识别 pandas 已无法处理的数据规模场景,并能选择正确的分布式工具

你的进度

你的模型在 10 万条记录上跑得不错。但情报官扔给你新任务:'这里有 10 个前哨站过去五年的完整日志,大概 500GB。给我分析一下装备磨损率和战斗频率的关系。'

pandas 加载到一半直接 OOM 了。你意识到单个机器的内存不够用了——数据已经大到超出单机处理能力的边界。

你需要分布式数据处理。

你的任务

你的数据集从 CSV 变成了几百个 CSV,从内存能放下变成十几个 GB。df.groupby() 跑了一个小时还没出结果,pd.concat() 直接 OOM。Pandas 在设计时假设数据能放进内存——当这个假设不成立时,你需要分布式数据处理。这一章教你从 pandas 的极限出发,平滑过渡到 Dask 和 Spark SQL。


Pandas 的极限与替代方案

Pandas 在单机内存 < 16 GB 且数据 < 可用内存一半时表现良好。当数据超过这个阈值,你有三个方向:

  1. 迭代/分块 — 不加载全部数据,逐块处理
  2. 延迟计算 — 构建计算图,按需触发
  3. 分布式 — 多台机器并行算
python
import pandas as pd

# 方向1:分块读取
chunk_size = 100_000
results = []
for chunk in pd.read_csv("large_dataset.csv", chunksize=chunk_size):
    # 对每个块做聚合
    chunk_result = chunk.groupby("mission_type")["resources"].sum()
    results.append(chunk_result)

# 合并各块结果
final = pd.concat(results).groupby(level=0).sum()

分块读取能解决内存问题,但速度上不做并行——每个块依次处理。需要并行时,转向 Dask。

Dask:Pandas 的分布式扩展

Dask 提供了和 pandas 几乎相同的 DataFrame API,但底层是延迟计算的。

python
import dask.dataframe as dd

# 读取多个文件 —— 单个 Dask DataFrame
ddf = dd.read_csv("missions_*.csv")

# 写法和 pandas 一样,但还没有真正计算
result = ddf.groupby("mission_type")["resources"].sum()

# 触发计算 —— 这一步才真正跑
final = result.compute()
print(final)

Dask 做了什么:

  • read_csv 时只读取文件分区信息,不加载数据
  • groupby 构建了一个计算图(任务图),不执行
  • compute() 才触发计算,利用多核或集群并行

Dask 的局限:不是所有 pandas 操作都被支持。特别是复杂的索引操作、groupby().transform() 的一些用法。需要先用 Dask 跑一遍试试,落地的部分再用 pandas 补。

python
# 常用操作 —— Dask 支持良好
ddf["duration_hours"] = ddf["duration_minutes"] / 60
ddf.dropna(subset=["success_rate"])
ddf[ddf["success_rate"] > 0.5].groupby("region").size().compute()

# 复杂操作 —— 可能需要转为 pandas
subset = ddf[ddf["region"] == "north"].compute()

Spark SQL:从 SQL 到分布式

如果你的团队已经在用 Spark,或者数据量大到 Dask 不够用(超过几十 GB 且需要集群),Spark SQL 是最通用的选择。

python
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("data_analysis").getOrCreate()

# 从文件读入
df_spark = spark.read.csv("missions_*.csv", header=True, inferSchema=True)

# 使用 SQL 做分析
df_spark.createOrReplaceTempView("missions")
result_sql = spark.sql("""
    SELECT mission_type, AVG(success_rate) as avg_success
    FROM missions
    WHERE resources > 100
    GROUP BY mission_type
    HAVING avg_success > 0.6
""")
result_sql.show()

# 或者用 DataFrame API
from pyspark.sql import functions as F

result_df = (
    df_spark
    .filter(F.col("resources") > 100)
    .groupBy("mission_type")
    .agg(F.avg("success_rate").alias("avg_success"))
    .filter(F.col("avg_success") > 0.6)
)
result_df.show()

Spark SQL 的延迟机制和 Dask 类似——只有调用 show()write() 时才触发计算。

选择原则

场景工具原因
数据 < 16GB, 单机pandas最简单,生态最丰富
数据 16GB-100GB, 单机/小集群DaskAPI 最接近 pandas,迁移成本最低
数据 > 100GB, 需要集群Spark SQL成熟稳定,资源管理完善
流式数据Spark Streaming / Flink延时敏感场景

性能优化的普适原则

无论你用哪种工具,这些原则通用:

  1. 尽早过滤。在 groupby 之前做 filter,减少下游处理的数据量。
  2. 减少 Shufflegroupby 按非常见键会触发全量数据重排。尽量按分区键做分组。
  3. 只读需要的列read_csv(usecols=[...]) 比读完再 drop 快得多。
  4. 选择合适的文件格式。Parquet 比 CSV 快 5-10 倍,因为列式存储只读取需要的列。
python
# pandas 读 parquet
df = pd.read_parquet("missions.parquet")

# Dask 读 parquet
ddf = dd.read_parquet("missions_*.parquet")

# Spark 读 parquet
df_spark = spark.read.parquet("missions/")

常见陷阱

  • 在 Dask 中频繁调用 compute()。每次 compute() 触发全量计算。你应该构建整个计算图,最后只调一次 compute()
  • 用默认分区数。不够时每个分区数据量太大→OOM,太多时每个分区太小→调度开销大于计算。根据数据量和集群资源调整 npartitions
  • 忘记数据本地性。如果计算节点需要从远程读取数据,网络会成为瓶颈。把数据放在计算节点所在集群的存储上。

通关挑战

  • 热身:用 pandas 的分块读取功能处理一个超过内存的大文件。
  • 挑战:把你的一个 pandas 分析脚本迁移到 Dask,记录代码改动点。需要改多少行?哪些操作不能直接迁移?
  • 观察:比较同一数据集上用 pandas(分块读取)、Dask、Spark SQL 的执行时间。

验收标准

  • 知道什么时候该从 pandas 切换到分布式工具
  • 能用 Dask 做 DataFrame 级别的分布式计算
  • 理解延迟计算和触发计算的区别
  • 会在 Spark SQL 中写分析查询

旅人笔记

分布式不是魔法——它用多台机器分担计算,但也带来了网络开销和调度复杂度。先评估数据量,再选工具。


下一站预告

技术手段讲了很多。但数据科学不只是技术问题——下一章,我们停下来思考数据伦理。

Built with VitePress | Software Systems Atlas