我在 .NET WebApi 应用程序中使用 Azure 服务总线和公共交通库。我有一个 ServiceA,我在其中发布 StartProcess 消息。此消息也用于状态机 - StartStateMachine。
此消息将发送到 ServiceB(即 Azure Functions),并在其中进行处理和发送。我被迫使用以下表示法在 ServiceA 中注册 ServiceB 的使用者,因为根据我发现和阅读的内容,Azure Functions 的订阅必须预先存在。通常,我会使用 MassTransit 进行此注册,但 MassTransit 无法在没有“绑定”(消费者实现)的情况下创建消费者。这是因为消费者处于不同的项目/服务中。
if (!(await adminClient.TopicExistsAsync(formatter.MessageName<StartProccess>()));
adminClient.CreateTopicAsync(formatter.MessageName<StartProccess>());
if (!(await adminClient.SubscriptionExistsAsync(formatter.MessageName<StartProccess>()), formatterCheckFunc.ConsumerFromMessageName<StartProccess>()).Value)
await adminClient.CreateSubscriptionAsync(new CreateSubscriptionOptions(formatter.MessageName<StartProccess>(), formatterCheckFunc.ConsumerFromMessageName<StartProccess>()) { MaxDeliveryCount = 1 });
此处列出最大交付次数的原因是,当消费者因异常而崩溃时,它不会再次运行十次。我想实现像公共交通这样的功能,如果出现错误,会立即调用 IFault 实现,并且不会重复调用。
一切都按其应有的方式进行。然而,偶尔(而且真的很少),例如,百分之一的情况下,我会收到一条 StartProcess 消息,该消息最终在 DLQ 中显示一条消息 - 超出了最大发送次数。我不明白为什么。它确实是很长一段时间才会出现的一次。这样我的状态机就无法启动,而且真的很笨拙。
有人遇到过这个问题吗? 谢谢你
编辑:
消息处理
try
{
await _receiver.HandleConsumer<Consumer>(TopicName, SubscriptionName, message, cancellationToken);
await messageActions.CompleteMessageAsync(message);
}
catch (SomeException ex)
{
await messageActions.AbandonMessageAsync(message);
_log.LogWarning("LogIt");
}
catch
{
// This?
await messageActions.CompleteMessageAsync(message);
// Or this?
await messageActions.DeadLetterMessageAsync(message);
throw;
}
一切都按其应有的方式进行。然而,偶尔(而且真的很少),例如,百分之一的情况下,我会收到一条 StartProccess 消息,该消息最终在 DLQ 中显示一条消息 - 已超出最大发送次数。
将
MaxDeliveryCount
设置为 1 就是造成这种情况的原因。
值为 1 表示单次处理尝试。如果该单次尝试因任何原因不成功,该消息将成为死信。