编程 事件溯源与CQRS深度实战:从状态存储到事件日志的架构革命

2026-05-09 03:05:57 +0800 CST views 16

事件溯源与CQRS深度实战:从状态存储到事件日志的架构革命

引言:为什么传统CRUD不够用了?

你有没有遇到过这样的问题:用户说"昨天数据明明是对的,今天怎么就变了?"你打开数据库,只看到当前状态,却无法追溯状态是如何一步步变成现在这样的。又或者,业务方要求"给我一个历史报表,统计上个月某一天的库存快照",你只能在心里默默叹气——因为你的系统只存了"当前状态",历史数据早就被覆盖了。

这不仅是数据追溯的问题。在高并发场景下,多个用户同时修改同一条记录,乐观锁、悲观锁、数据库行锁……各种锁机制让系统复杂度直线上升。更糟糕的是,当业务越来越复杂,数据库表越来越大,读写性能开始下降,你开始考虑读写分离,却发现主从延迟、数据一致性等问题接踵而来。

事件溯源(Event Sourcing)和CQRS(Command Query Responsibility Segregation)就是为解决这些问题而生的架构模式。它们不是银弹,但在特定场景下,确实能带来传统CRUD无法比拟的优势。本文将深入探讨这两个模式的原理、实现方式、适用场景以及生产级实践。


第一部分:事件溯源的核心思想

1.1 从"状态存储"到"事件日志"

传统CRUD模式的核心是状态存储:数据库中存储的是实体的当前状态,每次更新操作都会覆盖之前的状态。例如,一个订单从"创建"到"已支付"再到"已发货",数据库中只保留"已发货"这个最终状态。

-- 传统CRUD:覆盖状态
UPDATE orders SET status = 'PAID' WHERE id = 123;
UPDATE orders SET status = 'SHIPPED' WHERE id = 123;
-- 历史状态丢失

事件溯源的核心思想是用事件序列代替当前状态:不存储实体的当前状态,而是存储导致状态变化的所有事件。任何时刻的状态都可以通过重放事件序列来重建。

事件序列:
1. OrderCreated(orderId=123, items=[...], total=99.9)
2. OrderPaid(orderId=123, paymentId=pay_456, paidAt=2026-05-08 10:30:00)
3. OrderShipped(orderId=123, trackingNo=SF123456, shippedAt=2026-05-08 14:00:00)

当前状态:通过重放上述事件计算得出

这种方式的本质转变带来了一系列优势:

完整的审计日志:每个事件都是不可变的事实,天然形成完整的操作历史。

时间旅行能力:可以重建任意历史时刻的系统状态,支持"回放"和"撤销"操作。

业务语义明确:事件名称直接反映业务含义(如OrderCreated、OrderPaid),比单纯的状态字段更有表达力。

并发友好:事件只追加不修改,天然避免了更新冲突。

1.2 事件的本质

在事件溯源中,事件是不可变的事实记录。一个事件通常包含以下信息:

public abstract class DomainEvent {
    private final String eventId;      // 全局唯一ID
    private final String aggregateId;   // 聚合根ID
    private final String eventType;     // 事件类型
    private final Instant timestamp;    // 发生时间
    private final int version;          // 版本号(用于乐观锁)
    private final Map<String, Object> metadata; // 元数据(如操作人、IP等)
}

// 具体事件
public class OrderCreatedEvent extends DomainEvent {
    private final String orderId;
    private final String customerId;
    private final List<OrderItem> items;
    private final BigDecimal totalAmount;
    private final String shippingAddress;
}

事件的关键特性

  1. 不可变性:事件一旦发生,就不能被修改或删除。如果需要"撤销"某个事件,只能发布一个补偿事件。

  2. 时序性:事件按时间顺序存储,顺序决定了状态演化的路径。

  3. 原子性:每个事件代表一个原子操作,要么发生,要么不发生。

  4. 幂等性:相同事件被处理多次,应该产生相同的结果。

1.3 聚合根与事件流

聚合根(Aggregate Root)是领域驱动设计(DDD)中的核心概念,在事件溯源中扮演着事件管理者的角色。每个聚合根管理自己的一组事件,通过应用事件来维护内部状态。

public class OrderAggregate {
    private String orderId;
    private OrderStatus status;
    private List<OrderItem> items;
    private BigDecimal totalAmount;
    private String paymentId;
    private String trackingNo;
    
    // 当前版本号,用于乐观并发控制
    private int version = 0;
    
    // 待发布的新事件列表
    private final List<DomainEvent> pendingEvents = new ArrayList<>();
    
    // 创建订单(命令处理)
    public void create(CreateOrderCommand command) {
        if (status != null) {
            throw new IllegalStateException("Order already exists");
        }
        
        // 验证业务规则
        if (command.getItems().isEmpty()) {
            throw new IllegalArgumentException("Order must have at least one item");
        }
        
        // 发布事件
        applyChange(new OrderCreatedEvent(
            command.getOrderId(),
            command.getCustomerId(),
            command.getItems(),
            calculateTotal(command.getItems()),
            command.getShippingAddress()
        ));
    }
    
    // 支付订单(命令处理)
    public void pay(PayOrderCommand command) {
        if (status != OrderStatus.CREATED) {
            throw new IllegalStateException("Only created order can be paid");
        }
        
        applyChange(new OrderPaidEvent(
            orderId,
            command.getPaymentId(),
            Instant.now()
        ));
    }
    
    // 发货(命令处理)
    public void ship(ShipOrderCommand command) {
        if (status != OrderStatus.PAID) {
            throw new IllegalStateException("Only paid order can be shipped");
        }
        
        applyChange(new OrderShippedEvent(
            orderId,
            command.getTrackingNo(),
            Instant.now()
        ));
    }
    
    // 应用事件(状态变更)
    private void applyChange(DomainEvent event) {
        apply(event);
        pendingEvents.add(event);
    }
    
    // 重放事件(状态重建)
    public void apply(DomainEvent event) {
        if (event instanceof OrderCreatedEvent e) {
            this.orderId = e.getOrderId();
            this.items = e.getItems();
            this.totalAmount = e.getTotalAmount();
            this.status = OrderStatus.CREATED;
        } else if (event instanceof OrderPaidEvent e) {
            this.paymentId = e.getPaymentId();
            this.status = OrderStatus.PAID;
        } else if (event instanceof OrderShippedEvent e) {
            this.trackingNo = e.getTrackingNo();
            this.status = OrderStatus.SHIPPED;
        }
        this.version++;
    }
    
    // 从历史事件重建状态
    public static OrderAggregate fromEvents(List<DomainEvent> events) {
        OrderAggregate aggregate = new OrderAggregate();
        events.forEach(aggregate::apply);
        aggregate.pendingEvents.clear(); // 重建时不产生新事件
        return aggregate;
    }
}

1.4 事件存储(Event Store)

事件存储是专门用于存储事件序列的数据库,与传统数据库有以下不同:

  1. 只追加写入:事件只追加,不更新、不删除。
  2. 乐观并发控制:使用版本号而非锁机制。
  3. 流式组织:事件按聚合根组织成流。
public interface EventStore {
    // 追加事件到指定流
    void append(String streamId, List<DomainEvent> events, int expectedVersion);
    
    // 加载指定流的所有事件
    List<DomainEvent> load(String streamId);
    
    // 从指定版本开始加载事件
    List<DomainEvent> loadFromVersion(String streamId, int version);
    
    // 订阅所有新事件
    void subscribe(Consumer<DomainEvent> subscriber);
}

// 基于数据库的实现
public class JdbcEventStore implements EventStore {
    private final JdbcTemplate jdbcTemplate;
    private final ObjectMapper objectMapper;
    
    @Override
    public void append(String streamId, List<DomainEvent> events, int expectedVersion) {
        jdbcTemplate.execute((ConnectionCallback<Void>) connection -> {
            // 检查当前版本(乐观锁)
            Integer currentVersion = getCurrentVersion(connection, streamId);
            if (currentVersion != null && currentVersion != expectedVersion) {
                throw new OptimisticConcurrencyException(
                    "Expected version " + expectedVersion + 
                    " but was " + currentVersion
                );
            }
            
            // 插入事件
            PreparedStatement ps = connection.prepareStatement(
                "INSERT INTO events (event_id, stream_id, event_type, event_data, version, timestamp) " +
                "VALUES (?, ?, ?, ?, ?, ?)"
            );
            
            int version = expectedVersion;
            for (DomainEvent event : events) {
                ps.setString(1, event.getEventId());
                ps.setString(2, streamId);
                ps.setString(3, event.getEventType());
                ps.setString(4, objectMapper.writeValueAsString(event));
                ps.setInt(5, ++version);
                ps.setTimestamp(6, Timestamp.from(event.getTimestamp()));
                ps.addBatch();
            }
            
            ps.executeBatch();
            return null;
        });
    }
    
    @Override
    public List<DomainEvent> load(String streamId) {
        return jdbcTemplate.query(
            "SELECT event_data, event_type FROM events WHERE stream_id = ? ORDER BY version",
            (rs, rowNum) -> deserializeEvent(rs.getString("event_type"), rs.getString("event_data")),
            streamId
        );
    }
}

事件表设计

CREATE TABLE events (
    event_id VARCHAR(36) PRIMARY KEY,
    stream_id VARCHAR(64) NOT NULL,
    event_type VARCHAR(128) NOT NULL,
    event_data JSON NOT NULL,
    version INT NOT NULL,
    timestamp TIMESTAMP(6) NOT NULL,
    metadata JSON,
    INDEX idx_stream_version (stream_id, version)
);

第二部分:CQRS——读写分离的架构模式

2.1 CQRS的核心思想

CQRS(Command Query Responsibility Segregation)的核心思想是将**命令(写操作)查询(读操作)**分离,使用不同的模型处理。

传统架构中,读和写共用同一个模型:

┌─────────────────────────────────────┐
│            同一个模型               │
│  ┌─────────────────────────────┐    │
│  │     Order Entity            │    │
│  │  - id, status, items...     │    │
│  │  - create() update() find() │    │
│  └─────────────────────────────┘    │
└─────────────────────────────────────┘
        ↑                   ↓
     写操作               读操作

CQRS架构将读写分离:

┌──────────────────────┐     ┌──────────────────────┐
│     写模型(命令端)   │     │     读模型(查询端)   │
│  ┌────────────────┐  │     │  ┌────────────────┐  │
│  │ OrderAggregate │  │────→│  │ OrderSummary   │  │
│  │ - 状态管理      │  │事件  │  │ - 查询优化      │  │
│  │ - 业务规则      │  │同步  │  │ - 投影视图      │  │
│  │ - 事件发布      │  │     │  │ - 缓存支持      │  │
│  └────────────────┘  │     │  └────────────────┘  │
└──────────────────────┘     └──────────────────────┘
        ↑                            ↓
     命令处理                      查询处理

2.2 为什么要分离读写模型?

性能优化的差异

写操作需要保证事务一致性、业务规则完整性,通常涉及复杂的验证逻辑。读操作则追求快速响应,可能需要不同的数据结构(如JOIN优化、预计算字段)。

// 写模型:保证业务一致性
public class OrderAggregate {
    public void create(CreateOrderCommand cmd) {
        // 验证库存
        validateInventory(cmd.getItems());
        // 验证价格
        validatePricing(cmd.getItems(), cmd.getCoupons());
        // 计算优惠
        BigDecimal discount = calculateDiscount(cmd);
        // 发布事件
        applyChange(new OrderCreatedEvent(...));
    }
}

// 读模型:优化查询性能
@Table("order_summaries")
public class OrderSummary {
    private String orderId;
    private String customerName;    // 冗余存储,避免JOIN
    private String status;
    private Integer itemCount;      // 预计算
    private BigDecimal totalAmount;
    private Instant createdAt;
    // 不需要复杂的业务规则
}

扩展性的差异

读操作的频率通常远高于写操作(读写比例可能达到10:1甚至更高)。分离后,可以独立扩展读写端:读端可以添加缓存、使用读副本、部署多个查询服务实例。

技术选型的灵活性

写端可能选择关系型数据库保证事务,读端可能选择NoSQL数据库优化查询。例如:

  • 写端:PostgreSQL + Event Store
  • 读端:Elasticsearch(全文搜索)、Redis(缓存)、MongoDB(文档查询)

2.3 命令(Command)与命令处理器

命令是表达"意图"的对象,它描述了"用户想要做什么",而非"系统状态如何变化"。

// 命令定义
public record CreateOrderCommand(
    String orderId,
    String customerId,
    List<OrderItem> items,
    String shippingAddress,
    String couponCode
) implements Command {}

public record PayOrderCommand(
    String orderId,
    String paymentId,
    BigDecimal amount
) implements Command {}

// 命令处理器
@Component
public class OrderCommandHandler {
    private final EventStore eventStore;
    private final OrderRepository orderRepository;
    
    @Transactional
    public void handle(CreateOrderCommand command) {
        // 加载聚合根(如果是新建,则为空)
        OrderAggregate order = orderRepository.load(command.orderId())
            .orElse(new OrderAggregate());
        
        // 执行命令
        order.create(command);
        
        // 持久化事件
        eventStore.append(
            "order-" + command.orderId(),
            order.getPendingEvents(),
            order.getVersion() - order.getPendingEvents().size()
        );
        
        // 发布事件到消息队列(异步处理)
        order.getPendingEvents().forEach(eventBus::publish);
    }
    
    @Transactional
    public void handle(PayOrderCommand command) {
        // 从事件存储重建聚合根
        List<DomainEvent> events = eventStore.load("order-" + command.orderId());
        OrderAggregate order = OrderAggregate.fromEvents(events);
        
        // 执行命令
        order.pay(command);
        
        // 持久化新事件
        eventStore.append(
            "order-" + command.orderId(),
            order.getPendingEvents(),
            order.getVersion() - order.getPendingEvents().size()
        );
    }
}

2.4 投影(Projection)与读模型

投影是将事件流转换为读模型的过程。每当有新事件产生,投影处理器就会更新对应的读模型。

@Component
public class OrderSummaryProjection {
    private final OrderSummaryRepository repository;
    
    @EventHandler
    public void on(OrderCreatedEvent event) {
        OrderSummary summary = new OrderSummary();
        summary.setOrderId(event.getOrderId());
        summary.setCustomerId(event.getCustomerId());
        summary.setStatus("CREATED");
        summary.setItemCount(event.getItems().size());
        summary.setTotalAmount(event.getTotalAmount());
        summary.setCreatedAt(event.getTimestamp());
        
        repository.save(summary);
    }
    
    @EventHandler
    public void on(OrderPaidEvent event) {
        repository.findByOrderId(event.getOrderId())
            .ifPresent(summary -> {
                summary.setStatus("PAID");
                summary.setPaidAt(event.getPaidAt());
                repository.save(summary);
            });
    }
    
    @EventHandler
    public void on(OrderShippedEvent event) {
        repository.findByOrderId(event.getOrderId())
            .ifPresent(summary -> {
                summary.setStatus("SHIPPED");
                summary.setTrackingNo(event.getTrackingNo());
                summary.setShippedAt(event.getShippedAt());
                repository.save(summary);
            });
    }
    
    @EventHandler
    public void on(OrderCancelledEvent event) {
        repository.findByOrderId(event.getOrderId())
            .ifPresent(summary -> {
                summary.setStatus("CANCELLED");
                summary.setCancelledAt(event.getCancelledAt());
                summary.setCancelReason(event.getReason());
                repository.save(summary);
            });
    }
}

2.5 最终一致性

CQRS的一个重要特点是读写模型的最终一致性。命令端发布事件后,读模型可能需要几毫秒到几秒的时间才能完成更新。这意味着:

  • 用户提交命令后立即查询,可能看不到最新状态。
  • 需要在UI层处理这种情况(如显示"处理中"状态,或使用WebSocket推送更新)。
// 处理最终一致性的策略

// 策略1:命令返回后等待读模型更新
@PostMapping("/orders")
public ResponseEntity<OrderResponse> createOrder(@RequestBody CreateOrderRequest request) {
    String orderId = UUID.randomUUID().toString();
    
    // 发送命令
    commandBus.send(new CreateOrderCommand(orderId, ...));
    
    // 等待读模型更新(最多等待5秒)
    Awaitility.await()
        .atMost(5, TimeUnit.SECONDS)
        .until(() -> orderQueryService.findById(orderId).isPresent());
    
    return ResponseEntity.ok(orderQueryService.findById(orderId).orElseThrow());
}

// 策略2:返回命令ID,客户端轮询
@PostMapping("/orders")
public ResponseEntity<CommandResult> createOrder(@RequestBody CreateOrderRequest request) {
    String orderId = UUID.randomUUID().toString();
    String commandId = UUID.randomUUID().toString();
    
    // 异步处理命令
    commandBus.sendAsync(new CreateOrderCommand(orderId, ..., commandId));
    
    return ResponseEntity.accepted()
        .body(new CommandResult(commandId, orderId, "PROCESSING"));
}

// 策略3:WebSocket推送更新
@EventHandler
public void on(OrderCreatedEvent event) {
    // 更新读模型
    updateReadModel(event);
    
    // 推送给客户端
    webSocketService.sendToUser(
        event.getCustomerId(),
        "order:created",
        new OrderNotification(event.getOrderId(), "CREATED")
    );
}

第三部分:事件溯源与CQRS的联合应用

3.1 为什么事件溯源常与CQRS结合?

事件溯源和CQRS是两个独立的模式,但它们经常一起使用,原因如下:

  1. 事件溯源产生事件流,CQRS需要事件流:事件溯源的核心输出是事件序列,而CQRS的投影处理器需要订阅事件来更新读模型。两者天然契合。

  2. 事件溯源优化写端,CQRS优化读端:事件溯源让写端通过追加事件获得高性能,CQRS让读端通过专用模型获得查询灵活性。

  3. 分离关注点,降低复杂度:写端专注于业务规则和状态一致性,读端专注于查询性能和用户体验。

3.2 完整架构设计

┌──────────────────────────────────────────────────────────────────────┐
│                            客户端层                                   │
│  ┌─────────────┐   ┌─────────────┐   ┌─────────────────────────────┐│
│  │  Web App    │   │  Mobile App │   │  第三方集成                  ││
│  └─────────────┘   └─────────────┘   └─────────────────────────────┘│
└───────────────────────────────┬──────────────────────────────────────┘
                                │
┌───────────────────────────────┴──────────────────────────────────────┐
│                            API网关层                                  │
│  ┌─────────────────────────────────────────────────────────────────┐│
│  │  REST API: POST /commands, GET /queries                         ││
│  │  WebSocket: 订阅事件更新                                         ││
│  └─────────────────────────────────────────────────────────────────┘│
└───────────────┬─────────────────────────────────────┬────────────────┘
                │ 命令                                 │ 查询
                ↓                                     ↓
┌───────────────────────────────┐   ┌────────────────────────────────────┐
│       命令端(写模型)          │   │         查询端(读模型)            │
│  ┌─────────────────────────┐  │   │  ┌────────────────────────────┐   │
│  │   Command Handler       │  │   │  │   Query Service            │   │
│  │   - 接收命令             │  │   │  │   - 处理查询请求            │   │
│  │   - 加载聚合根           │  │   │  │   - 返回投影视图            │   │
│  │   - 执行业务规则         │  │   │  └────────────────────────────┘   │
│  │   - 发布事件             │  │   │                ↑                  │
│  └─────────────────────────┘  │   │                │                  │
│                ↓               │   │  ┌────────────────────────────┐   │
│  ┌─────────────────────────┐  │   │  │   Read Model Store         │   │
│  │   Event Store           │  │   │  │   - PostgreSQL (查询优化)   │   │
│  │   - 追加事件             │──┼──→│  │   - Redis (缓存)           │   │
│  │   - 乐观并发控制         │  │   │  │   - Elasticsearch (搜索)   │   │
│  │   - 事件流管理           │  │   │  └────────────────────────────┘   │
│  └─────────────────────────┘  │   │                                    │
│                ↓               │   │  ┌────────────────────────────┐   │
│  ┌─────────────────────────┐  │   │  │   Projection Handlers      │   │
│  │   Event Publisher       │  │   │  │   - 订阅事件                │   │
│  │   - 发布到消息队列       │  │   │  │   - 更新读模型              │   │
│  └─────────────────────────┘  │   │  └────────────────────────────┘   │
└───────────────────────────────┘   └────────────────────────────────────┘

3.3 Spring Boot实现示例

项目结构

src/main/java/com/example/ordersystem/
├── command/                          # 命令端
│   ├── api/
│   │   └── OrderCommandController.java
│   ├── application/
│   │   └── OrderCommandService.java
│   ├── domain/
│   │   ├── OrderAggregate.java
│   │   ├── events/
│   │   │   ├── OrderCreatedEvent.java
│   │   │   ├── OrderPaidEvent.java
│   │   │   └── OrderShippedEvent.java
│   │   └── commands/
│   │       ├── CreateOrderCommand.java
│   │       └── PayOrderCommand.java
│   └── infrastructure/
│       ├── EventStore.java
│       └── JdbcEventStore.java
├── query/                            # 查询端
│   ├── api/
│   │   └── OrderQueryController.java
│   ├── application/
│   │   └── OrderQueryService.java
│   ├── domain/
│   │   └── OrderSummary.java
│   └── infrastructure/
│       ├── OrderSummaryRepository.java
│       └── OrderSummaryProjection.java
└── shared/
    └── EventBus.java

命令端实现

// OrderCommandController.java
@RestController
@RequestMapping("/api/orders")
public class OrderCommandController {
    private final OrderCommandService commandService;
    
    @PostMapping
    public ResponseEntity<CommandResponse> createOrder(
            @RequestBody @Valid CreateOrderRequest request) {
        String orderId = UUID.randomUUID().toString();
        commandService.createOrder(new CreateOrderCommand(
            orderId,
            request.getCustomerId(),
            request.getItems(),
            request.getShippingAddress(),
            request.getCouponCode()
        ));
        return ResponseEntity.accepted()
            .body(new CommandResponse(orderId, "CREATED"));
    }
    
    @PostMapping("/{orderId}/pay")
    public ResponseEntity<CommandResponse> payOrder(
            @PathVariable String orderId,
            @RequestBody @Valid PayOrderRequest request) {
        commandService.payOrder(new PayOrderCommand(
            orderId,
            request.getPaymentId(),
            request.getAmount()
        ));
        return ResponseEntity.accepted()
            .body(new CommandResponse(orderId, "PAID"));
    }
}

// OrderCommandService.java
@Service
@Transactional
public class OrderCommandService {
    private final EventStore eventStore;
    private final EventBus eventBus;
    
    public void createOrder(CreateOrderCommand command) {
        OrderAggregate order = new OrderAggregate();
        order.create(command);
        
        String streamId = "order-" + command.getOrderId();
        eventStore.append(streamId, order.getPendingEvents(), 0);
        
        order.getPendingEvents().forEach(eventBus::publish);
    }
    
    public void payOrder(PayOrderCommand command) {
        String streamId = "order-" + command.getOrderId();
        List<DomainEvent> events = eventStore.load(streamId);
        
        OrderAggregate order = OrderAggregate.fromEvents(events);
        order.pay(command);
        
        eventStore.append(streamId, order.getPendingEvents(), order.getVersion() - order.getPendingEvents().size());
        order.getPendingEvents().forEach(eventBus::publish);
    }
}

查询端实现

// OrderQueryController.java
@RestController
@RequestMapping("/api/orders")
public class OrderQueryController {
    private final OrderQueryService queryService;
    
    @GetMapping("/{orderId}")
    public ResponseEntity<OrderSummary> getOrder(@PathVariable String orderId) {
        return queryService.findById(orderId)
            .map(ResponseEntity::ok)
            .orElse(ResponseEntity.notFound().build());
    }
    
    @GetMapping
    public Page<OrderSummary> listOrders(
            @RequestParam(required = false) String customerId,
            @RequestParam(required = false) String status,
            Pageable pageable) {
        return queryService.findByCriteria(customerId, status, pageable);
    }
    
    @GetMapping("/search")
    public List<OrderSummary> searchOrders(@RequestParam String keyword) {
        return queryService.search(keyword);
    }
}

// OrderSummaryProjection.java
@Component
public class OrderSummaryProjection {
    private final OrderSummaryRepository repository;
    private final ElasticsearchTemplate elasticsearchTemplate;
    
    @EventListener
    @Async
    public void on(OrderCreatedEvent event) {
        // 保存到PostgreSQL
        OrderSummary summary = OrderSummary.builder()
            .orderId(event.getOrderId())
            .customerId(event.getCustomerId())
            .status("CREATED")
            .itemCount(event.getItems().size())
            .totalAmount(event.getTotalAmount())
            .createdAt(event.getTimestamp())
            .build();
        
        repository.save(summary);
        
        // 同步到Elasticsearch(用于搜索)
        elasticsearchTemplate.save(buildSearchDocument(event));
    }
    
    @EventListener
    @Async
    public void on(OrderPaidEvent event) {
        repository.findByOrderId(event.getOrderId())
            .ifPresent(summary -> {
                summary.setStatus("PAID");
                summary.setPaidAt(event.getPaidAt());
                summary.setPaymentId(event.getPaymentId());
                repository.save(summary);
            });
    }
    
    @EventListener
    @Async
    public void on(OrderShippedEvent event) {
        repository.findByOrderId(event.getOrderId())
            .ifPresent(summary -> {
                summary.setStatus("SHIPPED");
                summary.setTrackingNo(event.getTrackingNo());
                summary.setShippedAt(event.getShippedAt());
                repository.save(summary);
            });
    }
}

第四部分:高级话题与生产实践

4.1 快照(Snapshot)优化

当聚合根的事件数量非常大时,每次重建状态都需要重放所有事件,性能会下降。快照是一种优化手段:定期保存聚合根的完整状态,重建时只需加载最近的快照,然后重放快照之后的事件。

public class Snapshot {
    private String aggregateId;
    private String aggregateType;
    private int version;
    private String state;  // JSON序列化的状态
    private Instant timestamp;
}

public class OrderAggregate {
    // 从快照+增量事件重建
    public static OrderAggregate fromSnapshotAndEvents(
            Snapshot snapshot, 
            List<DomainEvent> events) {
        // 反序列化快照状态
        OrderAggregate aggregate = objectMapper.readValue(
            snapshot.getState(), 
            OrderAggregate.class
        );
        // 应用增量事件
        events.forEach(aggregate::apply);
        return aggregate;
    }
}

// 快照策略
@Component
public class SnapshotStrategy {
    private static final int SNAPSHOT_THRESHOLD = 100; // 每100个事件生成一次快照
    
    public boolean shouldCreateSnapshot(List<DomainEvent> events, int currentVersion) {
        return events.size() >= SNAPSHOT_THRESHOLD;
    }
    
    public Snapshot createSnapshot(OrderAggregate aggregate) {
        return new Snapshot(
            aggregate.getOrderId(),
            "Order",
            aggregate.getVersion(),
            objectMapper.writeValueAsString(aggregate),
            Instant.now()
        );
    }
}

4.2 Saga模式与分布式事务

在微服务架构中,一个业务操作可能涉及多个服务的事件溯源聚合根。Saga模式通过事件编排来协调跨服务的事务。

// 订单创建Saga
public class CreateOrderSaga {
    private final EventBus eventBus;
    
    public void start(CreateOrderCommand command) {
        // 步骤1:创建订单
        eventBus.publish(new OrderCreationStarted(command));
        
        // 步骤2:预留库存(监听OrderCreated事件后执行)
        // 步骤3:处理支付(监听InventoryReserved事件后执行)
        // 步骤4:确认订单(监听PaymentProcessed事件后执行)
    }
    
    // 补偿逻辑
    @EventHandler
    public void on(PaymentFailedEvent event) {
        // 补偿:释放库存
        eventBus.publish(new ReleaseInventory(event.getOrderId()));
        // 补偿:取消订单
        eventBus.publish(new CancelOrder(event.getOrderId(), "Payment failed"));
    }
}

// Saga状态机
public enum SagaState {
    STARTED,
    ORDER_CREATED,
    INVENTORY_RESERVED,
    PAYMENT_PROCESSING,
    COMPLETED,
    COMPENSATING,
    FAILED
}

4.3 事件版本兼容性

随着业务演进,事件的schema可能发生变化。需要处理新旧版本事件的兼容性问题。

策略1:向上兼容

新版本事件保留所有旧字段,只添加新字段。

// V1事件
public class OrderCreatedEventV1 {
    private String orderId;
    private List<OrderItem> items;
}

// V2事件(兼容V1)
public class OrderCreatedEventV2 {
    private String orderId;
    private List<OrderItem> items;
    private String couponCode;  // 新增字段
    private BigDecimal discount; // 新增字段
}

策略2:事件升级器

在加载事件时,自动将旧版本事件转换为新版本。

@Component
public class EventUpgrader {
    private final ObjectMapper objectMapper;
    
    public DomainEvent upgrade(String eventType, String eventData, int version) {
        return switch (eventType) {
            case "OrderCreatedEvent" -> {
                if (version == 1) {
                    OrderCreatedEventV1 v1 = objectMapper.readValue(eventData, OrderCreatedEventV1.class);
                    yield new OrderCreatedEventV2(
                        v1.getOrderId(),
                        v1.getItems(),
                        null,  // 默认值
                        BigDecimal.ZERO
                    );
                }
                yield objectMapper.readValue(eventData, OrderCreatedEventV2.class);
            }
            default -> objectMapper.readValue(eventData, DomainEvent.class);
        };
    }
}

4.4 事件重放与系统恢复

事件溯源系统的一个重要能力是可以重建整个系统状态。这在以下场景非常有用:

  • 引入新的读模型投影
  • 修复读模型的bug
  • 系统迁移或灾备恢复
@Service
public class EventReplayService {
    private final EventStore eventStore;
    private final List<ProjectionHandler> projections;
    
    public void replayAll() {
        // 清空所有读模型
        projections.forEach(ProjectionHandler::clear);
        
        // 重放所有事件
        eventStore.subscribeAll(event -> {
            projections.forEach(p -> p.handle(event));
        });
    }
    
    public void replayFrom(Instant from) {
        eventStore.subscribeFrom(from, event -> {
            projections.forEach(p -> p.handle(event));
        });
    }
    
    public void rebuildProjection(String projectionName) {
        ProjectionHandler projection = projections.stream()
            .filter(p -> p.getName().equals(projectionName))
            .findFirst()
            .orElseThrow();
        
        projection.clear();
        eventStore.subscribeAll(projection::handle);
    }
}

4.5 性能优化实践

事件存储优化

-- 分区表(按时间分区)
CREATE TABLE events (
    event_id VARCHAR(36) PRIMARY KEY,
    stream_id VARCHAR(64) NOT NULL,
    event_type VARCHAR(128) NOT NULL,
    event_data JSON NOT NULL,
    version INT NOT NULL,
    timestamp TIMESTAMP(6) NOT NULL,
    metadata JSON
) PARTITION BY RANGE (timestamp);

-- 创建分区
CREATE TABLE events_2026_01 PARTITION OF events
    FOR VALUES FROM ('2026-01-01') TO ('2026-02-01');

CREATE TABLE events_2026_02 PARTITION OF events
    FOR VALUES FROM ('2026-02-01') TO ('2026-03-01');

-- 索引优化
CREATE INDEX idx_stream_version ON events (stream_id, version);
CREATE INDEX idx_timestamp ON events (timestamp);

读模型缓存策略

@Service
public class CachedOrderQueryService {
    private final RedisTemplate<String, OrderSummary> redisTemplate;
    private final OrderSummaryRepository repository;
    
    @Cacheable(value = "order-summary", key = "#orderId")
    public Optional<OrderSummary> findById(String orderId) {
        return repository.findByOrderId(orderId);
    }
    
    @CacheEvict(value = "order-summary", key = "#event.orderId")
    @EventListener
    public void on(OrderUpdatedEvent event) {
        // 事件更新时自动清除缓存
    }
    
    // 预热热点数据
    @Scheduled(fixedRate = 300000) // 每5分钟
    public void warmUpCache() {
        List<String> hotOrderIds = findHotOrders();
        hotOrderIds.forEach(this::findById);
    }
}

第五部分:适用场景与决策指南

5.1 适合事件溯源+CQRS的场景

场景原因
金融交易系统需要完整的审计日志,任何状态变化都必须可追溯
订单履约系统状态流转复杂,需要支持回滚、补偿等操作
协作编辑系统需要支持撤销/重做、版本对比、冲突解决
审计合规系统法规要求保留完整历史记录
IoT数据处理高写入频率,读查询模式多样化
事件驱动架构已有事件基础设施,容易集成

5.2 不适合的场景

场景原因
简单CRUD应用引入事件溯源会增加复杂度,收益不大
报表查询为主读操作远多于写操作,但不需要复杂的状态管理
团队不熟悉DDD学习曲线陡峭,容易误用
实时性要求极高最终一致性可能导致用户体验问题
数据量极大的单实体事件数量过多会影响重建性能

5.3 引入决策流程

开始
  │
  ├─ 是否需要完整审计日志?
  │     ├─ 是 → 考虑事件溯源
  │     └─ 否 → 继续
  │
  ├─ 是否需要时间旅行(重建历史状态)?
  │     ├─ 是 → 事件溯源
  │     └─ 否 → 继续
  │
  ├─ 读写比例是否显著不平衡(>10:1)?
  │     ├─ 是 → 考虑CQRS
  │     └─ 否 → 传统架构可能足够
  │
  ├─ 是否需要独立的读写扩展?
  │     ├─ 是 → CQRS
  │     └─ 否 → 继续
  │
  └─ 业务复杂度是否高(多聚合根协作)?
        ├─ 是 → DDD + 事件溯源
        └─ 否 → 传统CRUD

结语:架构的本质是权衡

事件溯源和CQRS不是银弹,它们解决了一类问题,同时也引入了新的复杂度。选择它们意味着:

获得的好处

  • 完整的审计日志
  • 时间旅行能力
  • 读写独立扩展
  • 更好的业务表达力
  • 更容易实现事件驱动架构

付出的代价

  • 学习曲线陡峭
  • 最终一致性的处理
  • 事件版本管理的复杂度
  • 调试和排障难度增加
  • 需要额外的基础设施(事件存储、消息队列)

在决定采用之前,请认真评估你的业务场景、团队能力和运维能力。好的架构不是最先进的架构,而是最适合当前阶段的架构。


参考资料

  • Martin Fowler: Event Sourcing
  • Greg Young: CQRS Documents
  • Vaughn Vernon: Domain-Driven Design
  • EventStoreDB Documentation
  • Axon Framework Reference

推荐文章

pip安装到指定目录上
2024-11-17 16:17:25 +0800 CST
JavaScript 上传文件的几种方式
2024-11-18 21:11:59 +0800 CST
前端如何优化资源加载
2024-11-18 13:35:45 +0800 CST
Boost.Asio: 一个美轮美奂的C++库
2024-11-18 23:09:42 +0800 CST
地图标注管理系统
2024-11-19 09:14:52 +0800 CST
一个数字时钟的HTML
2024-11-19 07:46:53 +0800 CST
pin.gl是基于WebRTC的屏幕共享工具
2024-11-19 06:38:05 +0800 CST
Vue3中如何处理跨域请求?
2024-11-19 08:43:14 +0800 CST
JS 箭头函数
2024-11-17 19:09:58 +0800 CST
H5抖音商城小黄车购物系统
2024-11-19 08:04:29 +0800 CST
Elasticsearch 文档操作
2024-11-18 12:36:01 +0800 CST
#免密码登录服务器
2024-11-19 04:29:52 +0800 CST
Go语言中的`Ring`循环链表结构
2024-11-19 00:00:46 +0800 CST
在 Docker 中部署 Vue 开发环境
2024-11-18 15:04:41 +0800 CST
Linux 网站访问日志分析脚本
2024-11-18 19:58:45 +0800 CST
程序员茄子在线接单