Skip to content

元数据卡

  • 前置知识:第4章(HDFS 分布式存储)、Vol 5(SQL 查询)
  • 预计时间:45 分钟
  • 核心难度:进阶
  • 阅读模式:高度专注
  • 完成标志:能说清 MapReduce 的计算流程,理解 Spark RDD 的 lineage 和容错机制,知道 DataFrame 对比 RDD 的优点,能在本地用 PySpark 运行一个简单计算任务

你的进度

100 个前哨站每天通过消息队列发送战报给数据仓库。一年下来,你的 HDFS 集群里存了 3PB 的日志。

林将军说:"这些数据不是用来存着的。明天之前你给我一个报告:过去一个月,哪个战区的战斗次数增长最快、补给消耗趋势、以及各前哨站的兵力波动的相关性。"

你不能在小笔记本上跑一个 SELECT ... GROUP BY——3PB 的数据,单机 SQL 跑一个月都算不完。你需要把计算也分到多台机器上,每台机器只处理它手上那部分数据,然后让你汇总。

这就是分布式计算。


你的任务

掌握分布式计算的两个里程碑:MapReduce(第一个通用的分布式计算模型)和 Spark(内存计算、DAG 执行引擎)。理解 RDD 的核心抽象——分区、依赖、血统——以及 DataFrame/DataSet 如何优化执行计划。


破局 · 溯源


从"搬数据"到"搬计算"

处理 TB 级数据的传统方案不是做不到,是做法不对。在分布式存储之前,做法是:

1. 把数据从远程机器通过网络搬到一台超强机器上
2. 在这台机器上执行计算
3. 拿到结果

问题:网络带宽是瓶颈。传输 10TB 数据到一台机器,在千兆网络中需要约 23 小时——而且这只算一次传输。

2014 年 Google 发表了 MapReduce 论文。它的核心思想简单但有效:

把计算搬到数据所在的位置,而不是把数据搬到计算的位置。

你在 1000 台 DataNode 上各存了一些数据块。MapReduce 把计算程序派到每台 DataNode 上,每台 DataNode 只处理它本地的那部分数据。计算结果的中间数据以键值对形式输出,再经过 Shuffle(分组和排序),最终归约为最终结果。


MapReduce 模型

MapReduce 只有两个阶段:Map 和 Reduce。但这两个阶段之间有一个隐蔽但关键的步骤——Shuffle。

输入(大文件切成 M 个分片)
    |
    v
Map 阶段(在每个 DataNode 上并行执行)
    输入: (key1, value1)
    输出: list(key2, value2)
    |
    v
Shuffle 阶段(框架自动完成)
    按 key 分组: (key2, list(value2))
    |
    v
Reduce 阶段
    输入: (key2, list(value2))
    输出: list(key3, value3)
    |
    v
输出写入 HDFS

实战例子:统计每个前哨站的警戒日志数

java
// MapReduce Word Count(统计日志条目数)
// Hadoop MapReduce 程序
// 编译打包: mvn package -f pom.xml
// 运行: hadoop jar target/atlas-job.jar /input/scout-logs /output/result

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.StringTokenizer;

public class ScoutLogCount {

    // Mapper: 每条日志对应一个前哨站 ID -> 1
    public static class LogMapper
        extends Mapper<Object, Text, Text, IntWritable> {
        
        private final static IntWritable one = new IntWritable(1);
        private Text outpostId = new Text();
        
        @Override
        public void map(Object key, Text value, Context context)
            throws IOException, InterruptedException {
            
            // 假设日志格式: "outpost_007|2026-06-24|ALERT|torch_out"
            String[] fields = value.toString().split("\\|");
            if (fields.length >= 1) {
                outpostId.set(fields[0]);
                context.write(outpostId, one);  // 输出 (outpost_007, 1)
            }
        }
    }

    // Reducer: 对同一个前哨站的计数累加
    public static class LogReducer
        extends Reducer<Text, IntWritable, Text, IntWritable> {
        
        private IntWritable result = new IntWritable();
        
        @Override
        public void reduce(Text key, Iterable<IntWritable> values,
                           Context context)
            throws IOException, InterruptedException {
            
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);  // 输出 (outpost_007, 12903)
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "scout log count");
        job.setJarByClass(ScoutLogCount.class);
        job.setMapperClass(LogMapper.class);
        job.setCombinerClass(LogReducer.class);  // 本地聚合优化
        job.setReducerClass(LogReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

在这个例子中:

  • Map 阶段在每台 DataNode 上读取本地 HDFS 块,把每条日志转换为 (outpost_id, 1) 键值对
  • Shuffle 阶段自动把相同的 outpost_id 分到同一个 Reducer
  • Reduce 阶段对同一个前哨站的计数求和

Combiner 优化

注意到代码中的 setCombinerClass(LogReducer.class)——这是一个本地聚合。在 Map 阶段输出后立即在本地做一次求和,减少网络传输量。这是 MapReduce 中最重要的性能优化之一。

MapReduce 的问题

MapReduce 虽然革命性,但有几个明显的缺陷:

  1. 中间结果必须写到磁盘。 每次 Map 和 Reduce 之间的数据都要写入 HDFS 或本地磁盘。如果任务有多个 MapReduce 串联,每一级都产生磁盘 IO。

  2. 编程模型有限。 复杂的数据处理(多步关联、迭代计算)需要连续串联多个 MapReduce 任务,每个任务之间写磁盘,性能灾难。

  3. 批处理模式,不适合交互式查询。提交一个任务,等 30 分钟出结果——你不能在等结果的过程中追加新的过滤条件。

Spark 解决了这些问题。


Spark:内存分布式计算引擎

Spark 的核心差异在于两点:

  1. 中间数据优先留在内存中,不到万不得已不写磁盘
  2. 执行计划是一个 DAG(有向无环图),不是命名的 Map/Reduce 阶段

RDD:弹性分布式数据集

RDD(Resilient Distributed Dataset)是 Spark 的核心抽象。它是对分布式数据集合的不可变、可分区、可并行操作的抽象。

RDD 的"弹性"体现在容错机制上——不复制数据,而是记录数据从何而来(Lineage)。

RDD 的血统(Lineage)链:

HDFS 文件 ──map()──→ RDD[Log] ──filter()──→ RDD[警戒日志] ──reduceByKey()──→ RDD[(outpost, count)]
                      (分区1..N)     (分区1..N)                  (分区1..M)

如果 RDD[警戒日志] 的分区 2 丢失,Spark 不需要重新计算全部分区——它从 Lineage 追溯,只重新执行 filter() 在原始分区 2 的输入数据上。

java
// Spark RDD 示例(Java)
// 编译: mvn package
// 运行: spark-submit --class ScoutAnalysis target/atlas-job.jar

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

public class ScoutAnalysis {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
            .setAppName("Scout Log Analysis");
        JavaSparkContext sc = new JavaSparkContext(conf);
        
        // 从 HDFS 读取日志文件
        // RDD 中的每个元素是一行文本
        JavaRDD<String> lines = sc.textFile(
            "hdfs://namenode:9000/logs/scout-*.log"
        );
        
        // 提取前哨站 ID 和日志级别
        JavaPairRDD<String, String> outpostLevels = lines
            .mapToPair(line -> {
                String[] parts = line.split("\\|");
                // 假设格式: "outpost_007|2026-06-24|ALERT|内容"
                return new Tuple2<>(parts[0], parts[2]); // (outpost_id, level)
            });
        
        // 只保留 ALERT 级别(警戒)
        JavaPairRDD<String, String> alerts = outpostLevels
            .filter(pair -> pair._2().equals("ALERT"));
        
        // 按前哨站分组统计警戒次数
        JavaPairRDD<String, Integer> alertCounts = alerts
            .mapToPair(pair -> new Tuple2<>(pair._1(), 1))
            .reduceByKey(Integer::sum);
        
        // 按计数降序排列
        JavaPairRDD<Integer, String> sorted = alertCounts
            .mapToPair(pair -> new Tuple2<>(pair._2(), pair._1()))
            .sortByKey(false);
        
        // 输出前 10
        sorted.take(10).forEach(System.out::println);
        
        sc.close();
    }
}

这段代码和 MapReduce 版本在逻辑上等价,但:

  1. 数据在处理过程中留在内存中(除非内存不够才写磁盘)
  2. 如果没有 take(10) 的最终操作,前面所有转换都不执行——惰性求值
  3. 如果某个分区在 filter 后丢失,Spark 只需要重新执行 mapToPair.filter 在丢失的分区上,不是整个任务

Spark 算子分类

Spark 的操作分为两类:

类型操作行为
Transformations(转换)map, filter, flatMap, join, groupByKey惰性求值,构建 DAG
Actions(动作)reduce, collect, count, take, saveAsTextFile触发实际计算

构建 DAG 然后惰性求值是 Spark 性能的关键:Spark 可以优化整个执行计划,比如把连续的 map().filter().map() 合并为一次遍历。

对比窗口:MapReduce vs Spark

维度MapReduceSpark
执行模型命名的 Map & Reduce 阶段任意 DAG
中间数据写磁盘(HDFS 或本地)优先内存
迭代支持每轮写磁盘内存中循环
延迟分钟级(启动 + 磁盘)秒级(内存)
交互式查询不支持(提交-等待)支持(Spark Shell)
容错重算整个任务基于 Lineage 重算丢失分区
API只有 Java MapReduce APIRDD/DataFrame/SQL/MLlib
语言支持Java 为主Java/Scala/Python/R

DataFrame:更高层次的抽象

RDD 是"数据分布在 N 个分区"的抽象,但你不关心分区——你关心"按前哨站分组统计警戒次数"。DataFrame 提供了关系型操作的抽象:

java
// Spark DataFrame 示例
// 比 RDD API 更简洁,且有 Catalyst 查询优化

import org.apache.spark.sql.*;

public class ScoutDataFrameAnalysis {
    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
            .appName("Scout Analysis SQL")
            .getOrCreate();
        
        // 从 JSON 或 Parquet 读取结构化数据
        Dataset<Row> logs = spark.read()
            .option("header", "true")
            .parquet("hdfs:///logs/scout-parquet/");
        
        // 注册为临时表,直接用 SQL 查询
        logs.createOrReplaceTempView("logs");
        
        // 按前哨站分组统计
        Dataset<Row> result = spark.sql(
            "SELECT outpost_id, level, COUNT(*) as cnt " +
            "FROM logs " +
            "WHERE level = 'ALERT' " +
            "AND timestamp >= '2026-06-01' " +
            "GROUP BY outpost_id, level " +
            "ORDER BY cnt DESC"
        );
        
        result.show(10);
        spark.stop();
    }
}

DataFrame 对比 RDD 的优势:

  1. Catalyst 优化器:SQL 或 DataFrame 操作被自动优化——谓词下推、列剪枝、连接重排。你不用手动优化算子顺序。
  2. Tungsten 引擎:数据以列式格式存储在内存中,缓存友好,避免 Java 对象开销。
  3. Parquet + 列裁剪:如果只需要 100 列中的 3 列,Spark 只读取那 3 列的数据。

在大多数生产场景中,你应该使用 DataFrame/SQL 而不是 RDD。RDD 适用于底层数据处理逻辑非常特殊的场景——比如处理非结构化数据或需要精细控制分区策略。


Spark 执行模型

当你调用 action 时,Spark 做什么:

你的代码 (Driver 进程)
    |
    v
DAG Scheduler: 将 RDD 的 Lineage 转化为 Stage 的 DAG
    |--- 宽依赖(shuffle)处划分 Stage
    |
    v
Task Scheduler: 每个 Stage 转化为 Tasks,分发到 Executor
    |--- 数据本地性优先(尽量在数据所在的机器上执行)
    |
    v
Executor (Worker): 执行 Task,结果返回 Driver
    |
    v
下一个 Stage ... 直到所有 Stage 完成

宽依赖 vs 窄依赖:

窄依赖(不需要 shuffle):map, filter, union
  父 RDD 的每个分区只被子 RDD 的一个分区使用
  故障时只需要重算丢失的分区

宽依赖(需要 shuffle):groupByKey, reduceByKey, join
  父 RDD 的每个分区被子 RDD 的多个分区使用
  故障时需要重算父 RDD 的全部相关分区

宽依赖是 Spark 性能瓶颈的主要来源。减少 Shuffle 是 Spark 调优的第一原则。


常见陷阱

  1. 在 Spark 中写大数据量的 groupByKey。 groupByKey 把所有 key 的 value 都 shuffle 到同一个分区,如果某个 key 的数据量超出单个 executor 的内存——OOM。使用 reduceByKeyaggregateByKey,它们在 shuffle 前做本地聚合。

  2. 忽略数据倾斜。 一个 key 的数据量是其他 key 的 1000 倍,导致所有数据聚到一个分区。解决方案:加盐(salted key)、扩大分区数、使用广播 join 代替 shuffle join。

  3. 在每个 operator 中使用 Java 对象的闭包。 Spark 会序列化闭包中的对象。如果你在 map 中引用了大对象(比如一个 100MB 的模型),这个对象会被序列化到每个 task——内存爆炸。使用 broadcast variable。

  4. 用 RDD 而不是 DataFrame。 DataFrame 有 Catalyst 优化器,自动做列裁剪、谓词下推。RDD 什么优化都没有。90% 的场景用 DataFrame。


通关挑战

  • 热身:在本地运行一个 Spark Shell,加载一个文本文件,用 RDD 和 DataFrame 两种方式统计单词出现次数。比较代码量和执行计划(用 .explain() 查看)。

  • 挑战:模拟 1GB 日志数据(Python 生成器循环写入),用 Spark 分析:TOP 10 的警戒频次前哨站、每小时警戒次数趋势、每条警戒日志的平均延迟(日志时间 vs 上报时间)。

  • 观察:在 spark-submit 的 Web UI(默认 http://driver:4040)上查看 Stage 分布、Shuffle 读写量、任务执行时间。找到慢的任务,分析原因。

# 启动 Spark Shell
spark-shell --master local[*]

# 提交任务
spark-submit --class ScoutAnalysis --master yarn \
  --num-executors 10 --executor-cores 2 --executor-memory 4g \
  target/atlas-job.jar

旅人笔记

分布式计算不是"写复杂 SQL"——它是"把计算送到数据所在的机器上"。MapReduce 开创了这个模式但受制于磁盘 IO。Spark 用内存 DAG + 惰性求值 + Lineage 容错把它推到极致。DataFrame 和 SQL 让你用声明式的写法释放分布式计算能力——但理解 RDD 的分区、宽窄依赖、血统,是你在调优时不会盲动的底气。


下一站预告

数据存好了,计算分出去了——整个系统看起来像一台"超级计算机"。但你很快发现,按照功能拆分的服务越来越多,部署、发现、熔断、配置——每一个都是单独的难题。下一章,我们从"一个大系统"走向"一组小服务"的微服务架构。

Built with VitePress | Software Systems Atlas