元数据卡
- 前置知识:第4章(HDFS 分布式存储)、Vol 5(SQL 查询)
- 预计时间:45 分钟
- 核心难度:进阶
- 阅读模式:高度专注
- 完成标志:能说清 MapReduce 的计算流程,理解 Spark RDD 的 lineage 和容错机制,知道 DataFrame 对比 RDD 的优点,能在本地用 PySpark 运行一个简单计算任务
你的进度
100 个前哨站每天通过消息队列发送战报给数据仓库。一年下来,你的 HDFS 集群里存了 3PB 的日志。
林将军说:"这些数据不是用来存着的。明天之前你给我一个报告:过去一个月,哪个战区的战斗次数增长最快、补给消耗趋势、以及各前哨站的兵力波动的相关性。"
你不能在小笔记本上跑一个 SELECT ... GROUP BY——3PB 的数据,单机 SQL 跑一个月都算不完。你需要把计算也分到多台机器上,每台机器只处理它手上那部分数据,然后让你汇总。
这就是分布式计算。
你的任务
掌握分布式计算的两个里程碑:MapReduce(第一个通用的分布式计算模型)和 Spark(内存计算、DAG 执行引擎)。理解 RDD 的核心抽象——分区、依赖、血统——以及 DataFrame/DataSet 如何优化执行计划。
破局 · 溯源
从"搬数据"到"搬计算"
处理 TB 级数据的传统方案不是做不到,是做法不对。在分布式存储之前,做法是:
1. 把数据从远程机器通过网络搬到一台超强机器上
2. 在这台机器上执行计算
3. 拿到结果问题:网络带宽是瓶颈。传输 10TB 数据到一台机器,在千兆网络中需要约 23 小时——而且这只算一次传输。
2014 年 Google 发表了 MapReduce 论文。它的核心思想简单但有效:
把计算搬到数据所在的位置,而不是把数据搬到计算的位置。
你在 1000 台 DataNode 上各存了一些数据块。MapReduce 把计算程序派到每台 DataNode 上,每台 DataNode 只处理它本地的那部分数据。计算结果的中间数据以键值对形式输出,再经过 Shuffle(分组和排序),最终归约为最终结果。
MapReduce 模型
MapReduce 只有两个阶段:Map 和 Reduce。但这两个阶段之间有一个隐蔽但关键的步骤——Shuffle。
输入(大文件切成 M 个分片)
|
v
Map 阶段(在每个 DataNode 上并行执行)
输入: (key1, value1)
输出: list(key2, value2)
|
v
Shuffle 阶段(框架自动完成)
按 key 分组: (key2, list(value2))
|
v
Reduce 阶段
输入: (key2, list(value2))
输出: list(key3, value3)
|
v
输出写入 HDFS实战例子:统计每个前哨站的警戒日志数
// MapReduce Word Count(统计日志条目数)
// Hadoop MapReduce 程序
// 编译打包: mvn package -f pom.xml
// 运行: hadoop jar target/atlas-job.jar /input/scout-logs /output/result
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.StringTokenizer;
public class ScoutLogCount {
// Mapper: 每条日志对应一个前哨站 ID -> 1
public static class LogMapper
extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text outpostId = new Text();
@Override
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
// 假设日志格式: "outpost_007|2026-06-24|ALERT|torch_out"
String[] fields = value.toString().split("\\|");
if (fields.length >= 1) {
outpostId.set(fields[0]);
context.write(outpostId, one); // 输出 (outpost_007, 1)
}
}
}
// Reducer: 对同一个前哨站的计数累加
public static class LogReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
@Override
public void reduce(Text key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result); // 输出 (outpost_007, 12903)
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "scout log count");
job.setJarByClass(ScoutLogCount.class);
job.setMapperClass(LogMapper.class);
job.setCombinerClass(LogReducer.class); // 本地聚合优化
job.setReducerClass(LogReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}在这个例子中:
- Map 阶段在每台 DataNode 上读取本地 HDFS 块,把每条日志转换为
(outpost_id, 1)键值对 - Shuffle 阶段自动把相同的
outpost_id分到同一个 Reducer - Reduce 阶段对同一个前哨站的计数求和
Combiner 优化
注意到代码中的 setCombinerClass(LogReducer.class)——这是一个本地聚合。在 Map 阶段输出后立即在本地做一次求和,减少网络传输量。这是 MapReduce 中最重要的性能优化之一。
MapReduce 的问题
MapReduce 虽然革命性,但有几个明显的缺陷:
中间结果必须写到磁盘。 每次 Map 和 Reduce 之间的数据都要写入 HDFS 或本地磁盘。如果任务有多个 MapReduce 串联,每一级都产生磁盘 IO。
编程模型有限。 复杂的数据处理(多步关联、迭代计算)需要连续串联多个 MapReduce 任务,每个任务之间写磁盘,性能灾难。
批处理模式,不适合交互式查询。提交一个任务,等 30 分钟出结果——你不能在等结果的过程中追加新的过滤条件。
Spark 解决了这些问题。
Spark:内存分布式计算引擎
Spark 的核心差异在于两点:
- 中间数据优先留在内存中,不到万不得已不写磁盘
- 执行计划是一个 DAG(有向无环图),不是命名的 Map/Reduce 阶段
RDD:弹性分布式数据集
RDD(Resilient Distributed Dataset)是 Spark 的核心抽象。它是对分布式数据集合的不可变、可分区、可并行操作的抽象。
RDD 的"弹性"体现在容错机制上——不复制数据,而是记录数据从何而来(Lineage)。
RDD 的血统(Lineage)链:
HDFS 文件 ──map()──→ RDD[Log] ──filter()──→ RDD[警戒日志] ──reduceByKey()──→ RDD[(outpost, count)]
(分区1..N) (分区1..N) (分区1..M)如果 RDD[警戒日志] 的分区 2 丢失,Spark 不需要重新计算全部分区——它从 Lineage 追溯,只重新执行 filter() 在原始分区 2 的输入数据上。
// Spark RDD 示例(Java)
// 编译: mvn package
// 运行: spark-submit --class ScoutAnalysis target/atlas-job.jar
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
public class ScoutAnalysis {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("Scout Log Analysis");
JavaSparkContext sc = new JavaSparkContext(conf);
// 从 HDFS 读取日志文件
// RDD 中的每个元素是一行文本
JavaRDD<String> lines = sc.textFile(
"hdfs://namenode:9000/logs/scout-*.log"
);
// 提取前哨站 ID 和日志级别
JavaPairRDD<String, String> outpostLevels = lines
.mapToPair(line -> {
String[] parts = line.split("\\|");
// 假设格式: "outpost_007|2026-06-24|ALERT|内容"
return new Tuple2<>(parts[0], parts[2]); // (outpost_id, level)
});
// 只保留 ALERT 级别(警戒)
JavaPairRDD<String, String> alerts = outpostLevels
.filter(pair -> pair._2().equals("ALERT"));
// 按前哨站分组统计警戒次数
JavaPairRDD<String, Integer> alertCounts = alerts
.mapToPair(pair -> new Tuple2<>(pair._1(), 1))
.reduceByKey(Integer::sum);
// 按计数降序排列
JavaPairRDD<Integer, String> sorted = alertCounts
.mapToPair(pair -> new Tuple2<>(pair._2(), pair._1()))
.sortByKey(false);
// 输出前 10
sorted.take(10).forEach(System.out::println);
sc.close();
}
}这段代码和 MapReduce 版本在逻辑上等价,但:
- 数据在处理过程中留在内存中(除非内存不够才写磁盘)
- 如果没有
take(10)的最终操作,前面所有转换都不执行——惰性求值 - 如果某个分区在
filter后丢失,Spark 只需要重新执行mapToPair.filter在丢失的分区上,不是整个任务
Spark 算子分类
Spark 的操作分为两类:
| 类型 | 操作 | 行为 |
|---|---|---|
| Transformations(转换) | map, filter, flatMap, join, groupByKey | 惰性求值,构建 DAG |
| Actions(动作) | reduce, collect, count, take, saveAsTextFile | 触发实际计算 |
构建 DAG 然后惰性求值是 Spark 性能的关键:Spark 可以优化整个执行计划,比如把连续的 map().filter().map() 合并为一次遍历。
对比窗口:MapReduce vs Spark
| 维度 | MapReduce | Spark |
|---|---|---|
| 执行模型 | 命名的 Map & Reduce 阶段 | 任意 DAG |
| 中间数据 | 写磁盘(HDFS 或本地) | 优先内存 |
| 迭代支持 | 每轮写磁盘 | 内存中循环 |
| 延迟 | 分钟级(启动 + 磁盘) | 秒级(内存) |
| 交互式查询 | 不支持(提交-等待) | 支持(Spark Shell) |
| 容错 | 重算整个任务 | 基于 Lineage 重算丢失分区 |
| API | 只有 Java MapReduce API | RDD/DataFrame/SQL/MLlib |
| 语言支持 | Java 为主 | Java/Scala/Python/R |
DataFrame:更高层次的抽象
RDD 是"数据分布在 N 个分区"的抽象,但你不关心分区——你关心"按前哨站分组统计警戒次数"。DataFrame 提供了关系型操作的抽象:
// Spark DataFrame 示例
// 比 RDD API 更简洁,且有 Catalyst 查询优化
import org.apache.spark.sql.*;
public class ScoutDataFrameAnalysis {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("Scout Analysis SQL")
.getOrCreate();
// 从 JSON 或 Parquet 读取结构化数据
Dataset<Row> logs = spark.read()
.option("header", "true")
.parquet("hdfs:///logs/scout-parquet/");
// 注册为临时表,直接用 SQL 查询
logs.createOrReplaceTempView("logs");
// 按前哨站分组统计
Dataset<Row> result = spark.sql(
"SELECT outpost_id, level, COUNT(*) as cnt " +
"FROM logs " +
"WHERE level = 'ALERT' " +
"AND timestamp >= '2026-06-01' " +
"GROUP BY outpost_id, level " +
"ORDER BY cnt DESC"
);
result.show(10);
spark.stop();
}
}DataFrame 对比 RDD 的优势:
- Catalyst 优化器:SQL 或 DataFrame 操作被自动优化——谓词下推、列剪枝、连接重排。你不用手动优化算子顺序。
- Tungsten 引擎:数据以列式格式存储在内存中,缓存友好,避免 Java 对象开销。
- Parquet + 列裁剪:如果只需要 100 列中的 3 列,Spark 只读取那 3 列的数据。
在大多数生产场景中,你应该使用 DataFrame/SQL 而不是 RDD。RDD 适用于底层数据处理逻辑非常特殊的场景——比如处理非结构化数据或需要精细控制分区策略。
Spark 执行模型
当你调用 action 时,Spark 做什么:
你的代码 (Driver 进程)
|
v
DAG Scheduler: 将 RDD 的 Lineage 转化为 Stage 的 DAG
|--- 宽依赖(shuffle)处划分 Stage
|
v
Task Scheduler: 每个 Stage 转化为 Tasks,分发到 Executor
|--- 数据本地性优先(尽量在数据所在的机器上执行)
|
v
Executor (Worker): 执行 Task,结果返回 Driver
|
v
下一个 Stage ... 直到所有 Stage 完成宽依赖 vs 窄依赖:
窄依赖(不需要 shuffle):map, filter, union
父 RDD 的每个分区只被子 RDD 的一个分区使用
故障时只需要重算丢失的分区
宽依赖(需要 shuffle):groupByKey, reduceByKey, join
父 RDD 的每个分区被子 RDD 的多个分区使用
故障时需要重算父 RDD 的全部相关分区宽依赖是 Spark 性能瓶颈的主要来源。减少 Shuffle 是 Spark 调优的第一原则。
常见陷阱
在 Spark 中写大数据量的 groupByKey。
groupByKey把所有 key 的 value 都 shuffle 到同一个分区,如果某个 key 的数据量超出单个 executor 的内存——OOM。使用reduceByKey或aggregateByKey,它们在 shuffle 前做本地聚合。忽略数据倾斜。 一个 key 的数据量是其他 key 的 1000 倍,导致所有数据聚到一个分区。解决方案:加盐(salted key)、扩大分区数、使用广播 join 代替 shuffle join。
在每个 operator 中使用 Java 对象的闭包。 Spark 会序列化闭包中的对象。如果你在
map中引用了大对象(比如一个 100MB 的模型),这个对象会被序列化到每个 task——内存爆炸。使用 broadcast variable。用 RDD 而不是 DataFrame。 DataFrame 有 Catalyst 优化器,自动做列裁剪、谓词下推。RDD 什么优化都没有。90% 的场景用 DataFrame。
通关挑战
热身:在本地运行一个 Spark Shell,加载一个文本文件,用 RDD 和 DataFrame 两种方式统计单词出现次数。比较代码量和执行计划(用
.explain()查看)。挑战:模拟 1GB 日志数据(Python 生成器循环写入),用 Spark 分析:TOP 10 的警戒频次前哨站、每小时警戒次数趋势、每条警戒日志的平均延迟(日志时间 vs 上报时间)。
观察:在
spark-submit的 Web UI(默认 http://driver:4040)上查看 Stage 分布、Shuffle 读写量、任务执行时间。找到慢的任务,分析原因。
# 启动 Spark Shell
spark-shell --master local[*]
# 提交任务
spark-submit --class ScoutAnalysis --master yarn \
--num-executors 10 --executor-cores 2 --executor-memory 4g \
target/atlas-job.jar旅人笔记
分布式计算不是"写复杂 SQL"——它是"把计算送到数据所在的机器上"。MapReduce 开创了这个模式但受制于磁盘 IO。Spark 用内存 DAG + 惰性求值 + Lineage 容错把它推到极致。DataFrame 和 SQL 让你用声明式的写法释放分布式计算能力——但理解 RDD 的分区、宽窄依赖、血统,是你在调优时不会盲动的底气。
下一站预告
数据存好了,计算分出去了——整个系统看起来像一台"超级计算机"。但你很快发现,按照功能拆分的服务越来越多,部署、发现、熔断、配置——每一个都是单独的难题。下一章,我们从"一个大系统"走向"一组小服务"的微服务架构。