Skip to content

元数据卡

  • 前置知识:第15章(数据一致性模式);第10章(Observer 模式)
  • 预计时间:45 分钟
  • 核心难度:深入
  • 完成标志:能设计事件驱动的架构并选择编排/协调模式

你的进度

你的微引擎架构跑了大半年。引擎之间的通信都是同步魔力管道——玩家引擎呼叫比赛引擎、比赛引擎等待积分引擎的响应。积分引擎一慢,所有依赖它的引擎全部排队等着。

工匠之都的通信站里有一种新的消息板:你贴一张公告在上面,谁需要谁来看。不需要等待回复。 你的任务

在同步请求-响应之外,还有一种更松耦合的通信范式——事件驱动。你不需要直接告诉积分服务"积分更新了"——你只是发布一个事件"比赛结束了"。谁需要知道这个事件?积分服务、排行榜服务、通知服务——它们各自订阅。发事件的不知道谁在听,听事件的不需要知道谁发了。这章讲事件驱动架构的核心概念。

本章分层

  • 必读:事件网格基础、Choreography vs Orchestration
  • 选读:Stream Processing 概念
  • 进阶:死信与重试策略

本章不会要求你掌握

  • Kafka Streams / Flink 的深入开发
  • 事件溯源(第15章已覆盖)

破局 · 溯源

你的 "match-finish" 同步调用链:

java
// 比赛结束 → 同步调用 4 个服务
matchService.finishMatch(matchId, winnerId);
scoreService.updateScore(matchId, winnerId);           // 300ms
leaderboardService.refresh(matchId);                   // 200ms
notificationService.sendWinnerNotification(winnerId);  // 500ms
achievementService.check(winnerId);                    // 400ms
// 总耗时 ~1.4 秒

这 1.4 秒里,用户等待的只是一个简单的 API 响应。1.4 秒的 API 响应时间——不算慢,但也没多好。而且其中任何一个服务挂了,整个接口报错。

换成事件驱动:

比赛结束 → matchService 发布 "MatchFinishedEvent" 到事件总线 → 立即返回

                    积分服务(订阅 MatchFinishedEvent)→ 异步更新积分
                    排行榜服务(订阅)→ 异步刷新
                    通知服务(订阅)→ 异步发通知
                    成就服务(订阅)→ 异步检查成就

用户马上得到响应(< 50ms),四个异步任务在后台执行。

第一层:事件网格——不只是消息队列

事件驱动的基础设施是事件网格(Event Mesh)。它比简单的消息队列(RabbitMQ)或流平台(Kafka)更灵活——允许动态路由、过滤、转换。

最简实现:Spring 的 ApplicationEventPublisher

java
// ch17/event/MatchFinishedEvent.java
public record MatchFinishedEvent(String matchId, String winnerId, int score) {}

// 发布者
@Service
public class MatchService {
    private final ApplicationEventPublisher publisher;

    public MatchService(ApplicationEventPublisher publisher) {
        this.publisher = publisher;
    }

    public void finishMatch(String matchId, String winnerId) {
        // ... 核心业务代码 ...
        // 发布事件——同步或异步取决于配置
        publisher.publishEvent(new MatchFinishedEvent(matchId, winnerId, 100));
    }
}

// 订阅者(异步处理)
@Component
public class ScoreEventHandler {
    @Async
    @EventListener
    public void handleMatchFinished(MatchFinishedEvent event) {
        // 异步更新积分
        scoreService.updateScore(event.winnerId(), event.score());
    }
}

@Component
public class NotificationEventHandler {
    @Async
    @EventListener
    public void handleMatchFinished(MatchFinishedEvent event) {
        notificationService.send(event.winnerId(), "你赢了比赛!");
    }
}

这只是一个 Spring 应用内的"事件网格"。在生产中,通常是 Kafka + Schema Registry + 事件存储:

HTTP API → 发布事件到 Kafka Topic

Kafka Partition 1: MatchFinishedEvent   ← 排序保证

Consumer Group "score-service" → 处理积分
Consumer Group "notification" → 发通知

第二层:Choreography vs Orchestration

第 15 章你看到 Saga 有两种实现方式。在第 17 章加深——这两种方式在更通用的事件驱动场景中对应两种设计哲学。

Choreography(编排式):没有中央协调器。每个服务独立监听事件并响应。服务之间通过事件松耦合。

服务 A 发布 "OrderCreated"
    → 服务 B 监听 "OrderCreated",处理后发布 "PaymentProcessed"
        → 服务 C 监听 "PaymentProcessed",处理后发布 "ShipmentArranged"
            → 服务 A 监听 "ShipmentArranged" → 更新订单状态

优势:高度松耦合(添加新服务只需监听已有事件) 劣势:流程隐式(要理解完整流程需要跟踪所有事件),循环依赖(A 发事件 → B 发事件 → A 又收到)

Orchestration(协调式):一个 Orchestrator 知道整个流程,告诉每个服务做什么。

Orchestrator:
    1. 告诉 Service A "创建订单"
    2. 等待 A 响应
    3. 告诉 Service B "处理支付"
    4. 等待 B 响应
    5. 告诉 Service C "安排发货"
java
// 简化版 Orcherstrator
@Service
public class MatchLifecycleOrchestrator {
    public SagaResult runMatch(MatchRequest request) {
        MatchId matchId = matchService.create(request);
        if (matchId == null) return SagaResult.failure("创建失败");

        boolean paymentOk = paymentService.collectFee(matchId, request.playerIds());
        if (!paymentOk) {
            matchService.cancel(matchId); // 补偿
            return SagaResult.failure("收费失败");
        }

        MatchResult result = matchEngine.run(matchId);
        if (result.isSuccess()) {
            notificationService.sendResults(result);
        }
        return SagaResult.success("比赛完成");
    }
}

Orchestrator 的流程是显式的——看一个类就知道整个流程。缺点是 Orchestrator 变成了"上帝微服务"——流程修改需要改它。

这没有对错之分。小型流程(4-5 步以内)Choreography 更简洁。复杂长流程需要显式控制——用 Orchestration。

第三层:Stream Processing——持续处理事件流

你有一个场景:"每场比赛结束后,计算近 100 场的胜率排名并实时更新排行榜。"

一次性任务是简单的 SQL。持续计算每一场比赛的影响——你需要 Stream Processing。

Stream Processing 是写一个持续运行的查询(不是一次性的 SQL,是持续的 SQL/函数):

java
// Kafka Streams 示例
public class WinRateCalculator {
    public static void main(String[] args) {
        StreamsBuilder builder = new StreamsBuilder();

        // 从 Kafka topic 读取比赛结果流
        KStream<String, MatchResult> matches = builder
            .stream("match-results",
                Consumed.with(Serdes.String(), matchResultSerde));

        // 按玩家分组,计算获胜率
        KTable<String, Double> winRates = matches
            .groupBy((key, result) -> result.getWinnerId())
            .aggregate(
                () -> new WinRateState(0, 0),
                (player, result, state) -> {
                    state.incrementWins();
                    state.incrementTotal();
                    return state;
                },
                Materialized.with(Serdes.String(), winRateStateSerde)
            )
            .mapValues(state -> (double) state.getWins() / state.getTotal());

        // 写入排行榜 topic
        winRates.toStream().to("leaderboard-updates");
    }
}

Stream Processing 与普通的"定时跑 SQL 刷新"的区别:

  • 实时:每来一条事件立即更新(不是每分钟刷一次)
  • 有状态:累计计算需要维护状态(如窗口内的计数)
  • Exactly-once:Kafka Streams 保证每条消息只处理一次

第四层:死信与重试——处理失败的事件

你的积分服务在处理 MatchFinishedEvent 时挂了。事件已经消费了——永远不会再重试。积分更新丢失了。

死信队列模式:消费失败的事件先重试几次,最终放到"死信队列"(Dead Letter Queue),由人工或其他进程处理。

java
@Component
public class ReliableEventHandler {

    private static final int MAX_RETRIES = 3;

    @Async
    @EventListener
    public void handle(MatchFinishedEvent event) {
        try {
            scoreService.updateScore(event.winnerId(), event.score());
        } catch (Exception e) {
            // 重试
            retryLater(event, e);
        }
    }

    private void retryLater(MatchFinishedEvent event, Exception cause) {
        int retryCount = event.getRetryCount() == null ? 0 : event.getRetryCount();
        if (retryCount < MAX_RETRIES) {
            // 重新发送到延迟队列(Kafka 不支持延迟,需要单独实现)
            delayedQueue.send(event.incrementRetry(), 5_000 * (retryCount + 1));
            log.warn("Retry {} for event {}", retryCount + 1, event.matchId());
        } else {
            // 超过重试次数 → 死信队列
            deadLetterQueue.send(event);
            log.error("Event {} failed after {} retries", event.matchId(), MAX_RETRIES);
        }
    }
}

死信队列的监控是最重要的——如果一个事件进了死信队列 30 分钟还没处理,就要自动报警。死信 "死"了太久说明系统有问题需要立即处理。


常见陷阱

第五层:事件版本管理与兼容性

随着时间推移,你的事件类型会经历版本变化。

java
// v1: 原始版本
public record PlayerRegisteredV1(String playerId, String name) {}

// v2: 加了 level 字段
public record PlayerRegisteredV2(String playerId, String name, int level) {}

新版本的事件发布后,旧订阅者如果只认识 V1 格式,无法解析 V2 的 payload。标准做法:

Schema Registry。每个事件类型注册一个 schema(Avro/Protobuf/JSON Schema),发布时带 schema ID:

json
// Schema Registry 中的 match-finished 事件 schema v1
{
  "type": "record",
  "name": "MatchFinishedEvent",
  "fields": [
    {"name": "matchId", "type": "string"},
    {"name": "winnerId", "type": "string"},
    {"name": "score", "type": "int"}
  ]
}

当某个消费端还不支持新字段时,向后兼容的策略是:新字段设为 optional / 带默认值。消费端读到不认识的事件,会忽略新字段但不会崩溃。

处理事件版本的黄金规则:

  1. 生产者应该只添加字段(不要重命名、删除或改变类型)
  2. 新字段必须是 optional 或带默认值
  3. 为每个事件类型维护按时间轴排序的所有版本
  4. 弃用过时的事件类型时,先确认所有消费端已迁移超过一个完整 TTL 周期

第六层:事件驱动的最终一致性挑战

采用事件驱动后,你会发现一个令人不安的事实:读模型可能不一致。积分服务异步处理了 ScoreUpdatedEvent,但排行榜已经更新了——用户看到新排名但是积分数还没更新。

处理方法:

  1. UI 层明确展示"最近更新"标签——让用户知道数据可能有短暂延迟,而不是错误地认为数据丢失了。

  2. 关键路径采用"写后读"一致性——用户在创建操作后立即查询时,如果数据尚未同步完成,从生产者直接查询最新数据作为临时保证。

  3. 状态跟踪表——在消费端维护一个"最后处理的事件 ID",当用户触发查询时,如果发现某个事件尚未处理完毕,可以返回"处理中"状态。

  4. 超时补偿——如果某个事件超过 30 秒未被成功处理,触发告警并按死信流程重试。

第七层:在项目中落地事件驱动——不需要 Kafka 也行

在 Spring 应用中用最轻量的方式落地事件驱动——使用 Spring 内置的事件机制:

yaml
# application.yml
spring:
  task:
    execution:
      pool:
        core-size: 4
        max-size: 10
java
@SpringBootApplication
@EnableAsync
public class TournamentApplication {
    public static void main(String[] args) {
        SpringApplication.run(TournamentApplication.class, args);
    }
}

public record MatchFinishedEvent(String matchId, String winnerId, int score) {}

@Component
public class MatchEventProcessor {

    @Async
    @EventListener
    public void onMatchFinished(MatchFinishedEvent event) {
        // 在独立线程池中处理,不影响主流程
        long start = System.currentTimeMillis();
        try {
            updateLeaderboard(event);
            checkAchievements(event);
            sendNotifications(event);
        } finally {
            long elapsed = System.currentTimeMillis() - start;
            if (elapsed > 5000) {
                log.warn("Match event {} processed slowly: {}ms",
                         event.matchId(), elapsed);
            }
        }
    }
}

原则:不要为了将来的需求引入现在的复杂度。先用 Spring @EventListener,当需要持久化、重试、排序时,再迁移到消息队列(RabbitMQ/Kafka)。

第八层:事件驱动的监控与可观测性

事件驱动架构有一个隐藏的成本:调用链变模糊了。在同步架构中,A 调 B 调 C 一清二楚。在事件驱动中,A 发布事件后,谁处理了它?花了多久?失败了没有?核心工具:

分布式追踪(OpenTelemetry)。每个事件携带一个 traceId

java
public record MatchFinishedEvent(
    String traceId,
    String matchId, 
    String winnerId, 
    int score
) {}

// 处理时传递 traceId
@Async
@EventListener
public void onMatchFinished(MatchFinishedEvent event) {
    try (var span = tracer.spanBuilder("process.match.finished")
             .setAttribute("matchId", event.matchId())
             .startSpan()) {
        span.setAttribute("traceId", event.traceId());
        // ... 处理逻辑 ...
    }
}

关键指标监控:每个事件的延迟(从发布到消费)、处理时间、失败率、队列积压深度。设置告警:队列积压 > 1000 条持续 5 分钟 → 人工介入。

有了这些,你的事件驱动系统不再是"发了就忘"的黑箱——每个事件的完整生命周期都是可追踪的。

陷阱二:乱序事件。 比赛结束事件先发,选手报名事件后发(因为网络延迟)。积分服务收不到"报名"却收到了"结束"。按事件顺序的天然假设在分布式环境下不成立——使用事件的时间戳和序列号排序,或者在每个聚合内做事件顺序保证。

陷阱三:循环事件。 服务 A 发布事件 "X",服务 B 处理事件 X 后发布 "Y",服务 A 又监听 Y 并发布 "X"——死循环。确保每个服务的事件处理是幂等的,并且在发布事件时避免触发自己的回调。

陷阱四:事件太多。 每个状态变更都发布事件——系统增加了 50 个事件类型。团队成员记不住哪个事件是干什么的。事件应该是"有业务意义的变化"——"库存减 1"这种内部操作可以不发事件,"订单已发货"这种业务意义的事件才发。


通关挑战

  • 热身:画一个你系统中的一个业务流程的 Choreography 版本和 Orchestration 版本。对比两个方案的优缺点。
  • 挑战:将你的项目中一个同步 RPC 调用链改为事件驱动(至少 2 个异步处理者)。用 Spring @Async + @EventListener 实现。
  • 进阶:在有条件的情况下,搭建一个 Kafka 主题,写一个 Stream Processing 小程序(统计某个事件在一定时间窗口内的频率)。

旅人笔记

事件驱动把"同步等待"变成"异步通知",把"你调用我"变成"你发布我订阅"。Choreography 松耦合但流程隐式,Orchestration 流程显式但需要协调器。配上死信、重试和 Stream Processing——你的服务的通信方式从"打电话"变成了"驿站传信"。


下一站预告

事件驱动架构是软件工程的顶峰之一。但有一个新兴的领域正在快速改变软件的构建方式——AI 系统。这些系统有自己的一套模式:RAG、Agent、Cache-Augmented、评估。最后一章,也是最前沿的一章——AI 系统模式。

Built with VitePress | Software Systems Atlas