在 IBus.PubSub.SubscribeAsync() 中仅设置同步阻塞调用?

问题描述 投票:0回答:1

问题是我需要按顺序处理每条消息,以便一条消息应该阻止下一条消息。 任何消息都不应异步处理,也不应与另一条消息同时处理。 但有时(比如 0.8% 的情况,即将到来的消息是异步处理的,即使所有方法都是同步“void”并且**不是“任务”**)。

// Steps to reproduce the behavior: Run this class and receive several messages:

public class Subscriber : ISubscriber
{
    private readonly IBus _bus;
    private readonly IMessageProcessor _myMessageProcessor;
    private readonly ILogger<ISubscriber> _logger;
    private const string _subscriptionId = "MySubs";

    public Subscriber(IBus bus, IMessageProcessor myMessageProcessor, ILogger<ISubscriber> logger)
    {
        _bus = bus;
        _myMessageProcessor = myMessageProcessor;
        _logger = logger;
    }

    public void Subscribe()
    {
        _bus.PubSub.SubscribeAsync<MyCustomMessage>(_subscriptionId, OnMyCustomMessage, c => c.WithAutoDelete(false));
        _bus.Advanced.Connected += Advanced_Connected;
    }

    private void Advanced_Connected(object sender, EventArgs e)
    {
        _bus.PubSub.SubscribeAsync<MyCustomMessage>(_subscriptionId, OnMyCustomMessage, c => c.WithAutoDelete(false));
    }

    private void OnMyCustomMessage(MyCustomMessage message)
    {
        try {
            if (_logger.IsEnabled(LogLevel.Debug))
                _logger.LogDebug($"Received message #{message.Id}. {Newtonsoft.Json.JsonConvert.SerializeObject(message)}");
            _myMessageProcessor.ProcessCustomMessage(message);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, $"{nameof(Subscriber)}.{nameof(OnMyCustomMessage)}: args: {Newtonsoft.Json.JsonConvert.SerializeObject(message)}");
        }
    }
}

预期行为

预期的行为是对每条消息进行 100% 同步处理,无需异步,不需要异步代码。

版本控制:

EasyNetQ 版本:[例如7.3.6] RabbitMQ 版本 [例如25.1.1]

其他背景:

更新到 7.4.* 后,我降级了,因为 7.3.8 版本中有非阻塞调用的更新,但我只需要阻塞调用。 您能指导我如何在 PubSub 中使用消息阻止处理程序吗?

我尝试使用 _bus.PubSub.Subscribe 而不是 _bus.PubSub.SubscribeAsync,但没有成功。

c# dependency-injection rabbitmq .net-7.0 easynetq
1个回答
0
投票

好吧,正如 EasyNetQ 开发人员回答的那样,现在的解决方案可能是在声明或订阅队列时使用 WithPrefetchCount(1) 方法。

await _bus.PubSub.SubscribeAsync<MyCustomMessage>(_subscriptionId, OnMyCustomMessage, c => {
    c.WithAutoDelete(false);
    c.WithPrefetchCount(1);
});

© www.soinside.com 2019 - 2024. All rights reserved.