Skip to content

元数据卡

  • 前置知识:第13章(微服务迁移);了解 ACID 事务
  • 预计时间:50 分钟
  • 核心难度:深入
  • 完成标志:能设计 Saga(协调/编排)、理解 CQRS 和事件溯源的适用场景

你的进度

微引擎跑起来了。每台引擎有自己的数据仓库——积分引擎有自己的积分账本,比赛引擎有比赛记录。

现在你的“创建比赛”流程需要:在比赛引擎写入一条记录,通知积分引擎为参赛者初始化积分。一台机器时代加一把锁就解决的事——现在跨越了两台独立引擎、两本账本。 你的任务

分布式数据一致性不是"要不要 ACID"的问题——在分布式环境下,传统 ACID 事务要么不可用,要么代价太高。你需要新的模式来保证数据在多个服务之间最终保持一致。这一章覆盖四种常用的模式:Saga 处理长事务流程,CQRS 分离读写路径,事件溯源以事件作为一手数据,事务发件箱确保消息的可靠发送。

本章分层

  • 必读:Saga(编排模式)、事务发件箱
  • 选读:CQRS
  • 进阶:事件溯源

本章不会要求你掌握

  • 分布式事务协议(2PC/3PC)
  • 最终一致性的形式化证明

破局 · 溯源

你的"创建比赛"流程单体时代 3 行代码:

java
@Transactional
public void createTournament(Tournament t) {
    tournamentRepo.save(t);                // 表 1
    scoreRepo.initializeScores(t);         // 表 2
    notificationRepo.createNotification(t); // 表 3
}

拆了微服务后,scoreReponotificationRepo 不在同一个数据库里——没有 @Transactional 能跨越 MySQL 和 PostgreSQL。

你的比赛服务调用了积分服务 API 来初始化分数——调用成功。然后比赛服务自己挂了(OOM),通知没发成。积分已经初始化了,但是比赛记录丢了——用户看到积分被扣了但没有比赛。

第一层:Saga——跨服务的长事务

Saga 把一个大事务拆成一系列本地事务,每个本地事务在其所属服务内运行,完成后触发下一个。如果中间某一步失败,Saga 执行补偿操作回退。

两种实现方式:

编排式 Saga(Choreography):每个服务在完成本地事务后,发布事件触发下一步。没有中央协调器。

创建比赛服务: save tournament → publish "TournamentCreated"

积分服务: 收到 "TournamentCreated" → init scores → publish "ScoresInitialized"

通知服务: 收到 "ScoresInitialized" → send notification

如果积分初始化失败:

创建比赛服务: save tournament → publish "TournamentCreated"

积分服务: 收到 → init scores → 失败 → publish "ScoresInitFailed"

创建比赛服务: 收到 "ScoresInitFailed" → delete tournament  (补偿事务)

用事件驱动的方式实现:

java
// 比赛服务——创建比赛
@Service
public class TournamentSagaOrchestrator {
    @EventListener
    public void on(TournamentCreatedEvent event) {
        tournamentRepository.save(event.toTournament());
        // 事件总线会发布 TournamentCreatedEvent
        // 积分服务监听这个事件
    }

    @EventListener
    public void on(ScoresInitFailedEvent event) {
        // 补偿:删除比赛
        tournamentRepository.deleteById(event.getTournamentId());
        notificationService.warn("比赛创建失败,已回滚");
    }
}

// 积分服务——积分初始化
@EventListener
public void on(TournamentCreatedEvent event) {
    try {
        scoreService.initialize(event.getPlayerIds());
        eventPublisher.publish(new ScoresInitializedEvent(event.getTournamentId()));
    } catch (Exception e) {
        eventPublisher.publish(new ScoresInitFailedEvent(event.getTournamentId()));
    }
}

协调式 Saga(Orchestration):一个协调器(Orchestrator)告诉每个服务做什么。

java
// ch15/saga/orchestrator/CreateTournamentSaga.java
// 协调器——中央控制
public class CreateTournamentSaga {

    private final TournamentService tournaments;
    private final ScoreService scores;
    private final NotificationService notifications;

    // Saga 状态机
    public SagaResult execute(CreateTournamentRequest request) {
        try {
            // Step 1: 创建比赛
            String tournamentId = tournaments.create(request);
            
            // Step 2: 初始化积分
            scores.initialize(tournamentId, request.getPlayerIds());
            
            // Step 3: 发送通知
            notifications.sendCreated(tournamentId, request.getPlayerIds());
            
            return SagaResult.success(tournamentId);
            
        } catch (ScoresInitException e) {
            // 补偿:删除比赛
            tournaments.delete(request.getTournamentId());
            return SagaResult.failure("积分初始化失败");
            
        } catch (NotificationException e) {
            // 补偿:删除比赛 + 清理积分
            scores.cleanup(request.getTournamentId());
            tournaments.delete(request.getTournamentId());
            return SagaResult.failure("通知发送失败");
        }
    }
}

编排式更适合简单线性流程,协调式适合复杂的、有分支和条件的流程。编排式耦合度更低(没有中央协调器),但调试时需要跟踪所有事件流。

第二层:CQRS——命令查询职责分离

你的比赛查询服务需要展示:参赛者信息、比赛战绩、积分排名、历史记录。你一个查询联了 5 张表、2 个微服务 API——响应时间 3 秒。

问题分析:读和写是不同形态的负载。 写的模式是少量的、针对特定聚合的更新。读的模式是大量的、跨多个聚合的聚合查询。

CQRS 把"写"的模型和"读"的模型分开:

+----------+          +----------+
|  Command  |          |  Query   |
|  (写)     |          |  (读)    |
+----+-----+          +----+-----+
     |                     |
     v                     v
 写入数据库           读取专用视图

写入时发布事件 → 事件同步到读模型
java
// 命令模型(写)
@RestController
public class MatchCommandController {
    private final MatchCommandService commandService;

    @PostMapping("/matches/{id}/finish")
    public void finishMatch(@PathVariable String id,
                            @RequestBody FinishRequest req) {
        commandService.finishMatch(id, req.winnerId());
    }
}

// 查询模型(读)—— 使用了专门的读数据库
@RestController
public class MatchQueryController {
    private final MatchReadRepository readRepo;

    @GetMapping("/matches/{id}/details")
    public MatchDetails getDetails(@PathVariable String id) {
        // 读模型数据可能已经提前聚合好了
        return readRepo.findDetailsById(id);
    }
}

查询模型的数据不是直接来自同一张表——它可能来自一个专用的查询视图(materialized view)、一个独立的查询数据库或一个缓存。数据通过事件保持同步。

何时用 CQRS:读写负载差异显著时(写少读多或写多读少)、查询需要跨多个聚合的聚合数据时。不要为了用 CQRS 而用——对于 CRUD 应用,CQRS 增加了不必要的复杂度。

第三层:事件溯源——以事件作为真相

你有一个积分服务,记录了玩家的积分变化。但你发现一个问题:有时候需要知道"为什么积分变了"——被扣分了,是被对手打败了?还是系统管理员调整的?但数据库只存了最终分数,看不到历史。

事件溯源:不存当前状态,只存发生的事件。当前状态由事件按顺序重放(replay)计算出来。

java
// 事件
public interface Event {}
public record ScoreIncreased(String playerId, int amount, String reason) implements Event {}
public record ScoreDecreased(String playerId, int amount, String reason) implements Event {}

// 事件存储
public class EventStore {
    private final List<Event> store = new ArrayList<>();

    public void append(Event event) {
        store.add(event);
    }

    public List<Event> getEvents(String playerId) {
        return store.stream()
            .filter(e -> e instanceof ScoreIncreased si && si.playerId().equals(playerId)
                      || e instanceof ScoreDecreased sd && sd.playerId().equals(playerId))
            .collect(Collectors.toList());
    }
}

// 通过重放事件计算当前积分
public class PlayerScoreProjection {
    private int score = 0;

    public void apply(Event event) {
        switch (event) {
            case ScoreIncreased e -> score += e.amount();
            case ScoreDecreased e -> score -= e.amount();
        }
    }

    public int getScore(String playerId, EventStore store) {
        score = 0;
        store.getEvents(playerId).forEach(this::apply);
        return score;
    }
}

事件溯源的优势:

  • 完整的审计日志——每个积分变化都有原因
  • 时间旅行——你能回到任意时间点的状态
  • 问题排查——"为什么分数是这个值?"通过重放事件找到原因

代价:事件模式会持续演进(旧事件的格式和新版本不兼容),CQRS 的读模型需要同步事件流。

第四层:事务发件箱——可靠的消息发送

当 Sagas 依赖事件驱动时,一个关键问题出现了:你的服务保存了数据库记录,然后发送了事件——但数据库保存成功了,消息队列发送失败了。或者反过来——消息发送成功但数据库保存失败了。

本质上是"对两个系统(数据库 + 消息队列)的原子写入"的问题。

事务发件箱模式:不是直接发消息,而是先把消息写入数据库的发件箱表(和业务数据在同一个事务中),然后有一个独立的进程/线程把发件箱的消息发送到消息队列。

java
// 第1步:业务操作 + 发件箱记录在同一个事务中
@Transactional
public void finishMatch(String matchId, String winnerId) {
    // 核心业务逻辑
    Match match = matchRepository.findById(matchId).orElseThrow();
    match.finish(winnerId);
    matchRepository.save(match);

    // 在同一个事务中写入发件箱
    outboxRepository.save(new OutboxMessage(
        EventType.MATCH_FINISHED,
        new MatchFinishedEvent(matchId, winnerId)
    ));
}

// 第2步:独立进程轮询发件箱并发送
@Component
public class OutboxRelay {
    private final OutboxRepository outbox;
    private final MessageQueue queue;

    @Scheduled(fixedDelay = 1000)
    public void relay() {
        List<OutboxMessage> pending = outbox.findTop10ByStatus(MessageStatus.PENDING);
        for (OutboxMessage msg : pending) {
            try {
                queue.send(msg.getTopic(), msg.getPayload());
                outbox.markSent(msg.getId());
            } catch (Exception e) {
                // 记录重试次数,超过阈值后标记为 DEAD_LETTER
                outbox.incrementRetry(msg.getId());
            }
        }
    }
}

这个模式保证了"至少一次"的语义——消息可能重复发送,但不会丢失。接收方需要幂等处理。

发件箱表结构:

sql
CREATE TABLE outbox_messages (
    id UUID PRIMARY KEY,
    event_type VARCHAR(100) NOT NULL,
    payload JSONB NOT NULL,
    status VARCHAR(20) DEFAULT 'PENDING',
    retry_count INT DEFAULT 0,
    created_at TIMESTAMP DEFAULT NOW(),
    last_attempt_at TIMESTAMP
);

常见陷阱

陷阱一:Saga 补偿事务不完善。 你为 Step 1 写了补偿,但在 Step 3 失败时忘了补偿 Step 2。每个补偿必须覆盖到它之前的所有步骤。补偿事务本身也必须是幂等的——补偿可能会重复执行。

陷阱二:CQRS 的读模型数据滞后。 用户创建了一个比赛,刷新页面没看到——因为事件还没同步到读模型。读模型是最终一致性的。如果在 UI 中要求强一致性("创建后立即显示"),可以考虑写后直接返回读模型的数据。

陷阱三:事件溯源的事件模式变更。 Day 1 的 ScoreIncreased(playerId, amount),Day 100 改成了 ScoreIncreased(playerId, amount, reason)。旧事件没有 reason。处理方式:重放时用一个兼容的版本适配器。

陷阱四:发件箱不处理死信。 发件箱里的消息重试了 20 次仍然失败——它会一直重试下去,阻塞新消息的处理。设置最大重试次数,超过后标记为 DEAD_LETTER,由人工或自动报警处理。


通关挑战

  • 热身:为你的系统设计一个 Saga——"玩家报名比赛"需要:报名服务创建记录、积分服务冻结参赛费、通知服务发确认消息。画出流程图,标出每一步的补偿。
  • 挑战:实现一个事务发件箱——在你的项目中选择一个"操作后发通知"的场景,改用发件箱模式。
  • 进阶:选一个适合 CQRS 的查询场景(比如"玩家主页"需要聚合玩家信息、比赛记录、积分),实现一个读模型视图。

旅人笔记

跨服务的数据一致性是一场持久战——Saga 用补偿代替回滚,CQRS 给读写不同的武器,事件溯源以事件为时间胶囊,发件箱确保消息不在半路丢失。没有银弹,但有模式可循。


下一站预告

数据一致性解决后,你的微服务系统基本具备运行条件。但下一个实际问题来了:你怎么把新版本部署上去而不让用户感觉到?下一章:部署与运维模式。

Built with VitePress | Software Systems Atlas