元数据卡
- 前置知识:第13章(微服务迁移);了解 ACID 事务
- 预计时间:50 分钟
- 核心难度:深入
- 完成标志:能设计 Saga(协调/编排)、理解 CQRS 和事件溯源的适用场景
你的进度
微引擎跑起来了。每台引擎有自己的数据仓库——积分引擎有自己的积分账本,比赛引擎有比赛记录。
现在你的“创建比赛”流程需要:在比赛引擎写入一条记录,通知积分引擎为参赛者初始化积分。一台机器时代加一把锁就解决的事——现在跨越了两台独立引擎、两本账本。 你的任务
分布式数据一致性不是"要不要 ACID"的问题——在分布式环境下,传统 ACID 事务要么不可用,要么代价太高。你需要新的模式来保证数据在多个服务之间最终保持一致。这一章覆盖四种常用的模式:Saga 处理长事务流程,CQRS 分离读写路径,事件溯源以事件作为一手数据,事务发件箱确保消息的可靠发送。
本章分层
- 必读:Saga(编排模式)、事务发件箱
- 选读:CQRS
- 进阶:事件溯源
本章不会要求你掌握
- 分布式事务协议(2PC/3PC)
- 最终一致性的形式化证明
破局 · 溯源
你的"创建比赛"流程单体时代 3 行代码:
@Transactional
public void createTournament(Tournament t) {
tournamentRepo.save(t); // 表 1
scoreRepo.initializeScores(t); // 表 2
notificationRepo.createNotification(t); // 表 3
}拆了微服务后,scoreRepo 和 notificationRepo 不在同一个数据库里——没有 @Transactional 能跨越 MySQL 和 PostgreSQL。
你的比赛服务调用了积分服务 API 来初始化分数——调用成功。然后比赛服务自己挂了(OOM),通知没发成。积分已经初始化了,但是比赛记录丢了——用户看到积分被扣了但没有比赛。
第一层:Saga——跨服务的长事务
Saga 把一个大事务拆成一系列本地事务,每个本地事务在其所属服务内运行,完成后触发下一个。如果中间某一步失败,Saga 执行补偿操作回退。
两种实现方式:
编排式 Saga(Choreography):每个服务在完成本地事务后,发布事件触发下一步。没有中央协调器。
创建比赛服务: save tournament → publish "TournamentCreated"
↓
积分服务: 收到 "TournamentCreated" → init scores → publish "ScoresInitialized"
↓
通知服务: 收到 "ScoresInitialized" → send notification如果积分初始化失败:
创建比赛服务: save tournament → publish "TournamentCreated"
↓
积分服务: 收到 → init scores → 失败 → publish "ScoresInitFailed"
↓
创建比赛服务: 收到 "ScoresInitFailed" → delete tournament (补偿事务)用事件驱动的方式实现:
// 比赛服务——创建比赛
@Service
public class TournamentSagaOrchestrator {
@EventListener
public void on(TournamentCreatedEvent event) {
tournamentRepository.save(event.toTournament());
// 事件总线会发布 TournamentCreatedEvent
// 积分服务监听这个事件
}
@EventListener
public void on(ScoresInitFailedEvent event) {
// 补偿:删除比赛
tournamentRepository.deleteById(event.getTournamentId());
notificationService.warn("比赛创建失败,已回滚");
}
}
// 积分服务——积分初始化
@EventListener
public void on(TournamentCreatedEvent event) {
try {
scoreService.initialize(event.getPlayerIds());
eventPublisher.publish(new ScoresInitializedEvent(event.getTournamentId()));
} catch (Exception e) {
eventPublisher.publish(new ScoresInitFailedEvent(event.getTournamentId()));
}
}协调式 Saga(Orchestration):一个协调器(Orchestrator)告诉每个服务做什么。
// ch15/saga/orchestrator/CreateTournamentSaga.java
// 协调器——中央控制
public class CreateTournamentSaga {
private final TournamentService tournaments;
private final ScoreService scores;
private final NotificationService notifications;
// Saga 状态机
public SagaResult execute(CreateTournamentRequest request) {
try {
// Step 1: 创建比赛
String tournamentId = tournaments.create(request);
// Step 2: 初始化积分
scores.initialize(tournamentId, request.getPlayerIds());
// Step 3: 发送通知
notifications.sendCreated(tournamentId, request.getPlayerIds());
return SagaResult.success(tournamentId);
} catch (ScoresInitException e) {
// 补偿:删除比赛
tournaments.delete(request.getTournamentId());
return SagaResult.failure("积分初始化失败");
} catch (NotificationException e) {
// 补偿:删除比赛 + 清理积分
scores.cleanup(request.getTournamentId());
tournaments.delete(request.getTournamentId());
return SagaResult.failure("通知发送失败");
}
}
}编排式更适合简单线性流程,协调式适合复杂的、有分支和条件的流程。编排式耦合度更低(没有中央协调器),但调试时需要跟踪所有事件流。
第二层:CQRS——命令查询职责分离
你的比赛查询服务需要展示:参赛者信息、比赛战绩、积分排名、历史记录。你一个查询联了 5 张表、2 个微服务 API——响应时间 3 秒。
问题分析:读和写是不同形态的负载。 写的模式是少量的、针对特定聚合的更新。读的模式是大量的、跨多个聚合的聚合查询。
CQRS 把"写"的模型和"读"的模型分开:
+----------+ +----------+
| Command | | Query |
| (写) | | (读) |
+----+-----+ +----+-----+
| |
v v
写入数据库 读取专用视图
写入时发布事件 → 事件同步到读模型// 命令模型(写)
@RestController
public class MatchCommandController {
private final MatchCommandService commandService;
@PostMapping("/matches/{id}/finish")
public void finishMatch(@PathVariable String id,
@RequestBody FinishRequest req) {
commandService.finishMatch(id, req.winnerId());
}
}
// 查询模型(读)—— 使用了专门的读数据库
@RestController
public class MatchQueryController {
private final MatchReadRepository readRepo;
@GetMapping("/matches/{id}/details")
public MatchDetails getDetails(@PathVariable String id) {
// 读模型数据可能已经提前聚合好了
return readRepo.findDetailsById(id);
}
}查询模型的数据不是直接来自同一张表——它可能来自一个专用的查询视图(materialized view)、一个独立的查询数据库或一个缓存。数据通过事件保持同步。
何时用 CQRS:读写负载差异显著时(写少读多或写多读少)、查询需要跨多个聚合的聚合数据时。不要为了用 CQRS 而用——对于 CRUD 应用,CQRS 增加了不必要的复杂度。
第三层:事件溯源——以事件作为真相
你有一个积分服务,记录了玩家的积分变化。但你发现一个问题:有时候需要知道"为什么积分变了"——被扣分了,是被对手打败了?还是系统管理员调整的?但数据库只存了最终分数,看不到历史。
事件溯源:不存当前状态,只存发生的事件。当前状态由事件按顺序重放(replay)计算出来。
// 事件
public interface Event {}
public record ScoreIncreased(String playerId, int amount, String reason) implements Event {}
public record ScoreDecreased(String playerId, int amount, String reason) implements Event {}
// 事件存储
public class EventStore {
private final List<Event> store = new ArrayList<>();
public void append(Event event) {
store.add(event);
}
public List<Event> getEvents(String playerId) {
return store.stream()
.filter(e -> e instanceof ScoreIncreased si && si.playerId().equals(playerId)
|| e instanceof ScoreDecreased sd && sd.playerId().equals(playerId))
.collect(Collectors.toList());
}
}
// 通过重放事件计算当前积分
public class PlayerScoreProjection {
private int score = 0;
public void apply(Event event) {
switch (event) {
case ScoreIncreased e -> score += e.amount();
case ScoreDecreased e -> score -= e.amount();
}
}
public int getScore(String playerId, EventStore store) {
score = 0;
store.getEvents(playerId).forEach(this::apply);
return score;
}
}事件溯源的优势:
- 完整的审计日志——每个积分变化都有原因
- 时间旅行——你能回到任意时间点的状态
- 问题排查——"为什么分数是这个值?"通过重放事件找到原因
代价:事件模式会持续演进(旧事件的格式和新版本不兼容),CQRS 的读模型需要同步事件流。
第四层:事务发件箱——可靠的消息发送
当 Sagas 依赖事件驱动时,一个关键问题出现了:你的服务保存了数据库记录,然后发送了事件——但数据库保存成功了,消息队列发送失败了。或者反过来——消息发送成功但数据库保存失败了。
本质上是"对两个系统(数据库 + 消息队列)的原子写入"的问题。
事务发件箱模式:不是直接发消息,而是先把消息写入数据库的发件箱表(和业务数据在同一个事务中),然后有一个独立的进程/线程把发件箱的消息发送到消息队列。
// 第1步:业务操作 + 发件箱记录在同一个事务中
@Transactional
public void finishMatch(String matchId, String winnerId) {
// 核心业务逻辑
Match match = matchRepository.findById(matchId).orElseThrow();
match.finish(winnerId);
matchRepository.save(match);
// 在同一个事务中写入发件箱
outboxRepository.save(new OutboxMessage(
EventType.MATCH_FINISHED,
new MatchFinishedEvent(matchId, winnerId)
));
}
// 第2步:独立进程轮询发件箱并发送
@Component
public class OutboxRelay {
private final OutboxRepository outbox;
private final MessageQueue queue;
@Scheduled(fixedDelay = 1000)
public void relay() {
List<OutboxMessage> pending = outbox.findTop10ByStatus(MessageStatus.PENDING);
for (OutboxMessage msg : pending) {
try {
queue.send(msg.getTopic(), msg.getPayload());
outbox.markSent(msg.getId());
} catch (Exception e) {
// 记录重试次数,超过阈值后标记为 DEAD_LETTER
outbox.incrementRetry(msg.getId());
}
}
}
}这个模式保证了"至少一次"的语义——消息可能重复发送,但不会丢失。接收方需要幂等处理。
发件箱表结构:
CREATE TABLE outbox_messages (
id UUID PRIMARY KEY,
event_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
status VARCHAR(20) DEFAULT 'PENDING',
retry_count INT DEFAULT 0,
created_at TIMESTAMP DEFAULT NOW(),
last_attempt_at TIMESTAMP
);常见陷阱
陷阱一:Saga 补偿事务不完善。 你为 Step 1 写了补偿,但在 Step 3 失败时忘了补偿 Step 2。每个补偿必须覆盖到它之前的所有步骤。补偿事务本身也必须是幂等的——补偿可能会重复执行。
陷阱二:CQRS 的读模型数据滞后。 用户创建了一个比赛,刷新页面没看到——因为事件还没同步到读模型。读模型是最终一致性的。如果在 UI 中要求强一致性("创建后立即显示"),可以考虑写后直接返回读模型的数据。
陷阱三:事件溯源的事件模式变更。 Day 1 的 ScoreIncreased(playerId, amount),Day 100 改成了 ScoreIncreased(playerId, amount, reason)。旧事件没有 reason。处理方式:重放时用一个兼容的版本适配器。
陷阱四:发件箱不处理死信。 发件箱里的消息重试了 20 次仍然失败——它会一直重试下去,阻塞新消息的处理。设置最大重试次数,超过后标记为 DEAD_LETTER,由人工或自动报警处理。
通关挑战
- 热身:为你的系统设计一个 Saga——"玩家报名比赛"需要:报名服务创建记录、积分服务冻结参赛费、通知服务发确认消息。画出流程图,标出每一步的补偿。
- 挑战:实现一个事务发件箱——在你的项目中选择一个"操作后发通知"的场景,改用发件箱模式。
- 进阶:选一个适合 CQRS 的查询场景(比如"玩家主页"需要聚合玩家信息、比赛记录、积分),实现一个读模型视图。
旅人笔记
跨服务的数据一致性是一场持久战——Saga 用补偿代替回滚,CQRS 给读写不同的武器,事件溯源以事件为时间胶囊,发件箱确保消息不在半路丢失。没有银弹,但有模式可循。
下一站预告
数据一致性解决后,你的微服务系统基本具备运行条件。但下一个实际问题来了:你怎么把新版本部署上去而不让用户感觉到?下一章:部署与运维模式。