Mass Transit:当有不同的消息类型时,确保消息处理顺序

问题描述 投票:1回答:1

我是Mass Transit的新手,我想了解它是否有助于我的方案。我正在构建一个使用CQRS事件源架构实现的示例应用程序,我需要一个服务总线,以便将命令堆栈创建的事件分派给查询堆栈非规范化器。

让我们假设在我们的域中有一个聚合,让我们称之为Photo,以及两个不同的域事件:PhotoUploaded和PhotoArchived。

在这种情况下,我们有两种不同的消息类型,默认的Mass Transit行为是创建两个不同的RabbitMq交换:一个用于PhotoUploaded消息类型,另一个用于PhotoArchived消息类型。

让我们假设有一个名为PhotoDenormalizer的非规范化器:这个服务将是两种消息类型的消费者,因为每当照片上传或存档时都必须更新照片读取模型。

鉴于默认的公共交通拓扑,将有两个不同的交换,因此不能保证不同类型的事件之间的消息处理顺序:我们唯一的保证是所有相同类型的事件将按顺序处理,但我们不能保证不同类型事件之间的处理顺序(注意,给定我的例子的事件语义,处理顺序很重要)。

我该如何处理这种情况?公共交通是否适合我的需求?我是否完全忽略了域事件调度的重点?

rabbitmq cqrs servicebus event-sourcing masstransit
1个回答
6
投票

免责声明:这不是您的问题的答案,而是一个预防性的信息,为什么您不应该做您打算做的事情。

虽然像RMQ这样的消息代理和像MassTransit这样的消息中间件库非常适合集成,但我强烈建议不要使用消息代理来进行事件采购。我可以参考我的旧答案Event-sourcing: when (and not) should I use Message Queue?解释其背后的原因。

您发现自己的原因之一 - 事件订单永远不会得到保证。

另一个明显的原因是,通过消息代理发布的事件构建读取模型有效地消除了重放的可能性,并构建了需要从一开始就开始处理事件的新读取模型,但他们得到的只是事件。正在出版。

聚合形成事务边界,因此每个命令都需要保证它在一个事务中完成。虽然MT支持transaction middleware,但它只保证您获得支持它们的依赖关系的事务,但不支持消费者体中的context.Publish(@event),因为RMQ不支持事务。您很有可能提交更改而不会在读取方面获取事件。因此,事件存储的经验法则是您应该能够从商店订阅更改流,而不是从代码中发布事件,除非那些是集成事件而不是域事件。

对于事件源,每个读取模型在其预测的事件流中保留自己的检查点至关重要。消息代理不会给你那种权力,因为“检查点”实际上是你的队列,一旦消息从队列中消失 - 它就永远消失了,没有回来。

关于实际问题:

您可以使用message topology configuration为不同的消息设置相同的实体名称,然后将它们发布到同一个交换机,但这属于Chris在该页面上写的“滥用”类别。我没试过,但你绝对可以尝试。消息CLR类型是元数据的一部分,因此不应存在反序列化问题。

但同样,将消息放在同一个交换中将不会给您任何订购保证,除了所有消息将落在消费服务的一个队列中的事实。

您必须至少根据聚合ID设置分区筛选器,以防止并行处理同一聚合的多个消息。顺便说一句,这对整合也很有用。这就是我们这样做的方式:

void AddHandler<T>(Func<ConsumeContext<T>, string> partition) where T : class
    => ep.Handler<T>(
        c => appService.Handle(c, aggregateStore), 
        hc => hc.UsePartitioner(8, partition));

AddHandler<InternalCommands.V1.Whatever>(c => c.Message.StreamGuid);
© www.soinside.com 2019 - 2024. All rights reserved.