元数据卡
- 前置知识:第15章(数据一致性模式);第10章(Observer 模式)
- 预计时间:45 分钟
- 核心难度:深入
- 完成标志:能设计事件驱动的架构并选择编排/协调模式
你的进度
你的微引擎架构跑了大半年。引擎之间的通信都是同步魔力管道——玩家引擎呼叫比赛引擎、比赛引擎等待积分引擎的响应。积分引擎一慢,所有依赖它的引擎全部排队等着。
工匠之都的通信站里有一种新的消息板:你贴一张公告在上面,谁需要谁来看。不需要等待回复。 你的任务
在同步请求-响应之外,还有一种更松耦合的通信范式——事件驱动。你不需要直接告诉积分服务"积分更新了"——你只是发布一个事件"比赛结束了"。谁需要知道这个事件?积分服务、排行榜服务、通知服务——它们各自订阅。发事件的不知道谁在听,听事件的不需要知道谁发了。这章讲事件驱动架构的核心概念。
本章分层
- 必读:事件网格基础、Choreography vs Orchestration
- 选读:Stream Processing 概念
- 进阶:死信与重试策略
本章不会要求你掌握
- Kafka Streams / Flink 的深入开发
- 事件溯源(第15章已覆盖)
破局 · 溯源
你的 "match-finish" 同步调用链:
// 比赛结束 → 同步调用 4 个服务
matchService.finishMatch(matchId, winnerId);
scoreService.updateScore(matchId, winnerId); // 300ms
leaderboardService.refresh(matchId); // 200ms
notificationService.sendWinnerNotification(winnerId); // 500ms
achievementService.check(winnerId); // 400ms
// 总耗时 ~1.4 秒这 1.4 秒里,用户等待的只是一个简单的 API 响应。1.4 秒的 API 响应时间——不算慢,但也没多好。而且其中任何一个服务挂了,整个接口报错。
换成事件驱动:
比赛结束 → matchService 发布 "MatchFinishedEvent" 到事件总线 → 立即返回
↓
积分服务(订阅 MatchFinishedEvent)→ 异步更新积分
排行榜服务(订阅)→ 异步刷新
通知服务(订阅)→ 异步发通知
成就服务(订阅)→ 异步检查成就用户马上得到响应(< 50ms),四个异步任务在后台执行。
第一层:事件网格——不只是消息队列
事件驱动的基础设施是事件网格(Event Mesh)。它比简单的消息队列(RabbitMQ)或流平台(Kafka)更灵活——允许动态路由、过滤、转换。
最简实现:Spring 的 ApplicationEventPublisher:
// ch17/event/MatchFinishedEvent.java
public record MatchFinishedEvent(String matchId, String winnerId, int score) {}
// 发布者
@Service
public class MatchService {
private final ApplicationEventPublisher publisher;
public MatchService(ApplicationEventPublisher publisher) {
this.publisher = publisher;
}
public void finishMatch(String matchId, String winnerId) {
// ... 核心业务代码 ...
// 发布事件——同步或异步取决于配置
publisher.publishEvent(new MatchFinishedEvent(matchId, winnerId, 100));
}
}
// 订阅者(异步处理)
@Component
public class ScoreEventHandler {
@Async
@EventListener
public void handleMatchFinished(MatchFinishedEvent event) {
// 异步更新积分
scoreService.updateScore(event.winnerId(), event.score());
}
}
@Component
public class NotificationEventHandler {
@Async
@EventListener
public void handleMatchFinished(MatchFinishedEvent event) {
notificationService.send(event.winnerId(), "你赢了比赛!");
}
}这只是一个 Spring 应用内的"事件网格"。在生产中,通常是 Kafka + Schema Registry + 事件存储:
HTTP API → 发布事件到 Kafka Topic
↓
Kafka Partition 1: MatchFinishedEvent ← 排序保证
↓
Consumer Group "score-service" → 处理积分
Consumer Group "notification" → 发通知第二层:Choreography vs Orchestration
第 15 章你看到 Saga 有两种实现方式。在第 17 章加深——这两种方式在更通用的事件驱动场景中对应两种设计哲学。
Choreography(编排式):没有中央协调器。每个服务独立监听事件并响应。服务之间通过事件松耦合。
服务 A 发布 "OrderCreated"
→ 服务 B 监听 "OrderCreated",处理后发布 "PaymentProcessed"
→ 服务 C 监听 "PaymentProcessed",处理后发布 "ShipmentArranged"
→ 服务 A 监听 "ShipmentArranged" → 更新订单状态优势:高度松耦合(添加新服务只需监听已有事件) 劣势:流程隐式(要理解完整流程需要跟踪所有事件),循环依赖(A 发事件 → B 发事件 → A 又收到)
Orchestration(协调式):一个 Orchestrator 知道整个流程,告诉每个服务做什么。
Orchestrator:
1. 告诉 Service A "创建订单"
2. 等待 A 响应
3. 告诉 Service B "处理支付"
4. 等待 B 响应
5. 告诉 Service C "安排发货"// 简化版 Orcherstrator
@Service
public class MatchLifecycleOrchestrator {
public SagaResult runMatch(MatchRequest request) {
MatchId matchId = matchService.create(request);
if (matchId == null) return SagaResult.failure("创建失败");
boolean paymentOk = paymentService.collectFee(matchId, request.playerIds());
if (!paymentOk) {
matchService.cancel(matchId); // 补偿
return SagaResult.failure("收费失败");
}
MatchResult result = matchEngine.run(matchId);
if (result.isSuccess()) {
notificationService.sendResults(result);
}
return SagaResult.success("比赛完成");
}
}Orchestrator 的流程是显式的——看一个类就知道整个流程。缺点是 Orchestrator 变成了"上帝微服务"——流程修改需要改它。
这没有对错之分。小型流程(4-5 步以内)Choreography 更简洁。复杂长流程需要显式控制——用 Orchestration。
第三层:Stream Processing——持续处理事件流
你有一个场景:"每场比赛结束后,计算近 100 场的胜率排名并实时更新排行榜。"
一次性任务是简单的 SQL。持续计算每一场比赛的影响——你需要 Stream Processing。
Stream Processing 是写一个持续运行的查询(不是一次性的 SQL,是持续的 SQL/函数):
// Kafka Streams 示例
public class WinRateCalculator {
public static void main(String[] args) {
StreamsBuilder builder = new StreamsBuilder();
// 从 Kafka topic 读取比赛结果流
KStream<String, MatchResult> matches = builder
.stream("match-results",
Consumed.with(Serdes.String(), matchResultSerde));
// 按玩家分组,计算获胜率
KTable<String, Double> winRates = matches
.groupBy((key, result) -> result.getWinnerId())
.aggregate(
() -> new WinRateState(0, 0),
(player, result, state) -> {
state.incrementWins();
state.incrementTotal();
return state;
},
Materialized.with(Serdes.String(), winRateStateSerde)
)
.mapValues(state -> (double) state.getWins() / state.getTotal());
// 写入排行榜 topic
winRates.toStream().to("leaderboard-updates");
}
}Stream Processing 与普通的"定时跑 SQL 刷新"的区别:
- 实时:每来一条事件立即更新(不是每分钟刷一次)
- 有状态:累计计算需要维护状态(如窗口内的计数)
- Exactly-once:Kafka Streams 保证每条消息只处理一次
第四层:死信与重试——处理失败的事件
你的积分服务在处理 MatchFinishedEvent 时挂了。事件已经消费了——永远不会再重试。积分更新丢失了。
死信队列模式:消费失败的事件先重试几次,最终放到"死信队列"(Dead Letter Queue),由人工或其他进程处理。
@Component
public class ReliableEventHandler {
private static final int MAX_RETRIES = 3;
@Async
@EventListener
public void handle(MatchFinishedEvent event) {
try {
scoreService.updateScore(event.winnerId(), event.score());
} catch (Exception e) {
// 重试
retryLater(event, e);
}
}
private void retryLater(MatchFinishedEvent event, Exception cause) {
int retryCount = event.getRetryCount() == null ? 0 : event.getRetryCount();
if (retryCount < MAX_RETRIES) {
// 重新发送到延迟队列(Kafka 不支持延迟,需要单独实现)
delayedQueue.send(event.incrementRetry(), 5_000 * (retryCount + 1));
log.warn("Retry {} for event {}", retryCount + 1, event.matchId());
} else {
// 超过重试次数 → 死信队列
deadLetterQueue.send(event);
log.error("Event {} failed after {} retries", event.matchId(), MAX_RETRIES);
}
}
}死信队列的监控是最重要的——如果一个事件进了死信队列 30 分钟还没处理,就要自动报警。死信 "死"了太久说明系统有问题需要立即处理。
常见陷阱
第五层:事件版本管理与兼容性
随着时间推移,你的事件类型会经历版本变化。
// v1: 原始版本
public record PlayerRegisteredV1(String playerId, String name) {}
// v2: 加了 level 字段
public record PlayerRegisteredV2(String playerId, String name, int level) {}新版本的事件发布后,旧订阅者如果只认识 V1 格式,无法解析 V2 的 payload。标准做法:
Schema Registry。每个事件类型注册一个 schema(Avro/Protobuf/JSON Schema),发布时带 schema ID:
// Schema Registry 中的 match-finished 事件 schema v1
{
"type": "record",
"name": "MatchFinishedEvent",
"fields": [
{"name": "matchId", "type": "string"},
{"name": "winnerId", "type": "string"},
{"name": "score", "type": "int"}
]
}当某个消费端还不支持新字段时,向后兼容的策略是:新字段设为 optional / 带默认值。消费端读到不认识的事件,会忽略新字段但不会崩溃。
处理事件版本的黄金规则:
- 生产者应该只添加字段(不要重命名、删除或改变类型)
- 新字段必须是 optional 或带默认值
- 为每个事件类型维护按时间轴排序的所有版本
- 弃用过时的事件类型时,先确认所有消费端已迁移超过一个完整 TTL 周期
第六层:事件驱动的最终一致性挑战
采用事件驱动后,你会发现一个令人不安的事实:读模型可能不一致。积分服务异步处理了 ScoreUpdatedEvent,但排行榜已经更新了——用户看到新排名但是积分数还没更新。
处理方法:
UI 层明确展示"最近更新"标签——让用户知道数据可能有短暂延迟,而不是错误地认为数据丢失了。
关键路径采用"写后读"一致性——用户在创建操作后立即查询时,如果数据尚未同步完成,从生产者直接查询最新数据作为临时保证。
状态跟踪表——在消费端维护一个"最后处理的事件 ID",当用户触发查询时,如果发现某个事件尚未处理完毕,可以返回"处理中"状态。
超时补偿——如果某个事件超过 30 秒未被成功处理,触发告警并按死信流程重试。
第七层:在项目中落地事件驱动——不需要 Kafka 也行
在 Spring 应用中用最轻量的方式落地事件驱动——使用 Spring 内置的事件机制:
# application.yml
spring:
task:
execution:
pool:
core-size: 4
max-size: 10@SpringBootApplication
@EnableAsync
public class TournamentApplication {
public static void main(String[] args) {
SpringApplication.run(TournamentApplication.class, args);
}
}
public record MatchFinishedEvent(String matchId, String winnerId, int score) {}
@Component
public class MatchEventProcessor {
@Async
@EventListener
public void onMatchFinished(MatchFinishedEvent event) {
// 在独立线程池中处理,不影响主流程
long start = System.currentTimeMillis();
try {
updateLeaderboard(event);
checkAchievements(event);
sendNotifications(event);
} finally {
long elapsed = System.currentTimeMillis() - start;
if (elapsed > 5000) {
log.warn("Match event {} processed slowly: {}ms",
event.matchId(), elapsed);
}
}
}
}原则:不要为了将来的需求引入现在的复杂度。先用 Spring @EventListener,当需要持久化、重试、排序时,再迁移到消息队列(RabbitMQ/Kafka)。
第八层:事件驱动的监控与可观测性
事件驱动架构有一个隐藏的成本:调用链变模糊了。在同步架构中,A 调 B 调 C 一清二楚。在事件驱动中,A 发布事件后,谁处理了它?花了多久?失败了没有?核心工具:
分布式追踪(OpenTelemetry)。每个事件携带一个 traceId:
public record MatchFinishedEvent(
String traceId,
String matchId,
String winnerId,
int score
) {}
// 处理时传递 traceId
@Async
@EventListener
public void onMatchFinished(MatchFinishedEvent event) {
try (var span = tracer.spanBuilder("process.match.finished")
.setAttribute("matchId", event.matchId())
.startSpan()) {
span.setAttribute("traceId", event.traceId());
// ... 处理逻辑 ...
}
}关键指标监控:每个事件的延迟(从发布到消费)、处理时间、失败率、队列积压深度。设置告警:队列积压 > 1000 条持续 5 分钟 → 人工介入。
有了这些,你的事件驱动系统不再是"发了就忘"的黑箱——每个事件的完整生命周期都是可追踪的。
陷阱二:乱序事件。 比赛结束事件先发,选手报名事件后发(因为网络延迟)。积分服务收不到"报名"却收到了"结束"。按事件顺序的天然假设在分布式环境下不成立——使用事件的时间戳和序列号排序,或者在每个聚合内做事件顺序保证。
陷阱三:循环事件。 服务 A 发布事件 "X",服务 B 处理事件 X 后发布 "Y",服务 A 又监听 Y 并发布 "X"——死循环。确保每个服务的事件处理是幂等的,并且在发布事件时避免触发自己的回调。
陷阱四:事件太多。 每个状态变更都发布事件——系统增加了 50 个事件类型。团队成员记不住哪个事件是干什么的。事件应该是"有业务意义的变化"——"库存减 1"这种内部操作可以不发事件,"订单已发货"这种业务意义的事件才发。
通关挑战
- 热身:画一个你系统中的一个业务流程的 Choreography 版本和 Orchestration 版本。对比两个方案的优缺点。
- 挑战:将你的项目中一个同步 RPC 调用链改为事件驱动(至少 2 个异步处理者)。用 Spring
@Async+@EventListener实现。 - 进阶:在有条件的情况下,搭建一个 Kafka 主题,写一个 Stream Processing 小程序(统计某个事件在一定时间窗口内的频率)。
旅人笔记
事件驱动把"同步等待"变成"异步通知",把"你调用我"变成"你发布我订阅"。Choreography 松耦合但流程隐式,Orchestration 流程显式但需要协调器。配上死信、重试和 Stream Processing——你的服务的通信方式从"打电话"变成了"驿站传信"。
下一站预告
事件驱动架构是软件工程的顶峰之一。但有一个新兴的领域正在快速改变软件的构建方式——AI 系统。这些系统有自己的一套模式:RAG、Agent、Cache-Augmented、评估。最后一章,也是最前沿的一章——AI 系统模式。