我正在构建微服务。我的一个MicroService正在使用CQRS和事件采购。在系统中引发集成事件,我将事件存储中的聚合保存并更新我的读取模型。
我的问题是,当我们针对该聚合更新事件流时,我们需要汇总版本的原因?我读到我们需要这个以保持一致性,事件要按顺序重播,我们需要在保存之前检查版本(https://blog.leifbattermann.de/2017/04/21/12-things-you-should-know-about-event-sourcing/)我仍然无法理解这个因为事件被提出并按顺序保存,所以我真的需要具体例子,了解我们从版本中获得的好处以及为什么我们甚至需要它们。
非常感谢,
伊姆兰
让我描述一下聚合版本有用的情况:
在我们的reSove framework聚合版本中用于乐观并发控制。
我将通过例子来解释它。让我们说InventoryItem
聚合接受命令AddItems
和OrderItems
。 AddItems
增加库存商品数量,OrderItems
- 减少。假设你有一个InventoryItem
聚合#123有一个事件 - ITEMS_ADDED
,数量为5. Agrregate#123状态说有5件库存。
因此,您的UI向用户显示库存中有5件商品。用户A决定订购3件物品,用户B - 4件物品。几乎在同一时间发出OrederItems
命令,假设用户A首先是几毫秒。
现在,如果在内存中有一个聚合#123的单个实例,在单个线程中,你没有问题 - 来自用户A的第一个命令将成功,将应用事件,状态说数量为2,所以第二个命令来自用户B将失败。
在分布式或无服务器系统中,来自A和B的命令将位于不同的进程中,如果我们不使用某些并发控制,则两个命令都会成功并使聚合进入错误状态。有几种方法可以做到这一点 - 悲观锁定,命令队列,聚合存储库或乐观锁定。
乐观锁定似乎是最简单和最实用的解决方案:
我们说聚合有版本 - 其流中的事件数量。所以我们的聚合#123有版本1。
当聚合发出事件时,此事件数据具有聚合版本。在我们的例子中,来自用户A和B的ITEMS_ORDERED
事件将具有事件聚合版本2.显然,聚合事件应该具有随后增加的版本。所以我们需要做的就是设置一个数据库约束,在写入事件存储时,元组{aggregateId, aggregateVersion}
应该是唯一的。
让我们看看我们的示例如何在具有乐观并发控制的分布式系统中工作:
OrderItem
{version 1, quantity 5}
恢复OrderItem
ITEMS_ORDERED {aggregateId 123, version 2}
被写入事件存储。ITEMS_ORDERED {aggregateId 123, version 2}
尝试将其写入事件存储,fais具有并发异常。{version 2, quantity 2}
状态并且命令将被正确执行。我希望这可以清除聚合版本有用的情况。
是的,这是对的。您需要版本或序列号以保持一致性。
你想要的两件事:
根据意见进一步澄清:
该文章的作者将RDBMS假定为事件存储。期望版本ID或事件序列由生产者生成。因此,在您的示例中,“已发送”事件的序列将高于“传输中”事件。
当您想要并行处理事件时,会发生此问题。如果一个消费者获得“交付”事件而另一个消费者获得“传输中”事件会怎样?显然,您必须确保特定订单的所有事件都由同一个消费者处理。在Kafka中,您可以通过选择订单ID作为分区键来解决此问题。由于一个分区将仅由一个消费者处理,因此您知道在交付之前您将始终获得“在途中”。但是,多个订单将分布在同一个消费者群体中的不同消费者之间,因此您可以进行并行处理。
关于聚合ID,我认为这是卡夫卡主题的同义词。由于作者假设RDBMS存储,他需要一些标识符来隔离不同类别的消息。您可以通过在Kafka中创建单独的主题以及每个聚合的使用者组来实现。