.NET - 服务总线 - 将消息移动到 DLQ

问题描述 投票:0回答:1

我在 .NET WebApi 应用程序中使用 Azure 服务总线和公共交通库。我有一个 ServiceA,我在其中发布 StartProcess 消息。此消息也用于状态机 - StartStateMachine。

此消息将发送到 ServiceB(即 Azure Functions),并在其中进行处理和发送。我被迫使用以下表示法在 ServiceA 中注册 ServiceB 的使用者,因为根据我发现和阅读的内容,Azure Functions 的订阅必须预先存在。通常,我会使用 MassTransit 进行此注册,但 MassTransit 无法在没有“绑定”(消费者实现)的情况下创建消费者。这是因为消费者处于不同的项目/服务中。

if (!(await adminClient.TopicExistsAsync(formatter.MessageName<StartProccess>()));
    adminClient.CreateTopicAsync(formatter.MessageName<StartProccess>());
    
if (!(await adminClient.SubscriptionExistsAsync(formatter.MessageName<StartProccess>()), formatterCheckFunc.ConsumerFromMessageName<StartProccess>()).Value)
    await adminClient.CreateSubscriptionAsync(new CreateSubscriptionOptions(formatter.MessageName<StartProccess>(), formatterCheckFunc.ConsumerFromMessageName<StartProccess>()) { MaxDeliveryCount = 1 });

此处列出最大交付次数的原因是,当消费者因异常而崩溃时,它不会再次运行十次。我想实现像公共交通这样的功能,如果出现错误,会立即调用 IFault 实现,并且不会重复调用。

一切都按其应有的方式进行。然而,偶尔(而且真的很少),例如,百分之一的情况下,我会收到一条 StartProcess 消息,该消息最终在 DLQ 中显示一条消息 - 超出了最大发送次数。我不明白为什么。它确实是很长一段时间才会出现的一次。这样我的状态机就无法启动,而且真的很笨拙。

enter image description here

有人遇到过这个问题吗? 谢谢你

编辑:

消息处理

    try
    {
        await _receiver.HandleConsumer<Consumer>(TopicName, SubscriptionName, message, cancellationToken);
        await messageActions.CompleteMessageAsync(message);
    }
    catch (SomeException ex)
    {
        await messageActions.AbandonMessageAsync(message);
        _log.LogWarning("LogIt");
    }
    catch
    {
        // This?
        await messageActions.CompleteMessageAsync(message);
        // Or this?
        await messageActions.DeadLetterMessageAsync(message);
        throw;
    }
c# .net azureservicebus masstransit servicebus
1个回答
1
投票

一切都按其应有的方式进行。然而,偶尔(而且真的很少),例如,百分之一的情况下,我会收到一条 StartProccess 消息,该消息最终在 DLQ 中显示一条消息 - 已超出最大发送次数。

MaxDeliveryCount
设置为 1 就是造成这种情况的原因。

值为 1 表示单次处理尝试。如果该单次尝试因任何原因不成功,该消息将成为死信。

© www.soinside.com 2019 - 2024. All rights reserved.