捕获并处理 Saga 内消费者抛出的异常

问题描述 投票:0回答:1

我正在开发一个使用 Kafka 并应用 Saga 模式的 POC。我能够创建一条快乐的路径,在该路径中我可以使用某个主题中的消息并通过其他 Kafka 主题将其发送到多个服务。现在我正在尝试处理消息消耗期间的异常。

如您所见,我有一个

OrderRequestEvent
类型的简单消费者。该消费者收到一条消息,然后用这个
apiService
来验证它。如果验证失败,它应该抛出异常。我的想法是在传奇中捕获/处理此异常,更改状态并将其发送到另一个主题以进行可能的重试,而不最终确定传奇。

我的消费者来了:

  public class OrderManagementSystemConsumer : IConsumer<OrderRequestEvent>
    {
        private readonly ITopicProducer<CustomerValidationRequestEvent> customerValidationResponseEvent;
        private readonly IApiService apiService;

        public OrderManagementSystemConsumer(
            ITopicProducer<CustomerValidationRequestEvent> customerValidationResponseEvent, 
            IApiService apiService)
        {
            this.customerValidationResponseEvent = customerValidationResponseEvent;
            this.apiService = apiService;
        }

        public async Task Consume(ConsumeContext<OrderRequestEvent> context)
        {
            ArgumentNullException.ThrowIfNull(context, nameof(context));

            if (await this.apiService.ValidateIncomingRequestAsync(context.Message))
                throw new ArgumentException("Something wrong just happened");
        }
    }

我的传奇故事的一部分处理

OrderRequestEvent

 Initially(
            When(OrderRequestedEvent)
                .Then(context => LogContext.Info?.Log("Initializing saga: {0}", context.Saga.CorrelationId))
                .InitializeSaga()
                .Then(context => LogContext.Info?.Log("Validating Customer: {0}", context.Saga.CorrelationId))
                .SendingToCustomerValidation().LogSaga()
                .TransitionTo(ValidatingCustomer));

我注意到,即使我的异常被正确地抛出到消费者内部,传奇仍会正常继续。显然,消费者抛出的异常对传奇没有任何影响。

.net apache-kafka masstransit saga
1个回答
0
投票

简单来说,这是正确的。如果您的终端没有某种形式的工作,消费者就不会与 Sagas“连接”。我阅读代码的方式是,您有一个处理事件(OrderRequestEvent)的传奇和一个消费者,而不是获取自己的事件副本并执行其操作。这是系统按预期工作。

如果您希望将它们连接起来,那么您需要向 Saga 添加一些逻辑。最少的移动部件是构建一个“自定义 Saga Activity”,其中包含与消费者相同的逻辑。该 Activity 的成功或失败将遵循现有的 Saga 逻辑。 通常,您希望 saga 活动快速(按照 HTTP 请求的顺序),因此如果验证需要依赖于第三方服务,我建议将 saga 委托给消费者。这更接近您在最初问题中的情况。在这种情况下,您将遵循

Saga 请求文档

,并让 saga 发出您的消费者需要处理的请求/响应,然后响应 请求/响应文档。在您现有的模型中,您只需抛出一个异常,这将作为错误发送回请求者,因此您需要在 Saga 中处理它。 至于您发布的代码,您完全有可能已经尝试这样做,但代码隐藏在

SendingToCustomerValidation

LogSaga
等方法后面。
最终,这都是MT支持并经常使用的。因此,如果它不起作用,则几乎总是配置问题。确保打开日志进行调试并查看发生了什么。 :)

© www.soinside.com 2019 - 2024. All rights reserved.