问题是我需要按顺序处理每条消息,以便一条消息应该阻止下一条消息。 任何消息都不应异步处理,也不应与另一条消息同时处理。 但有时(比如 0.8% 的情况,即将到来的消息是异步处理的,即使所有方法都是同步“void”并且**不是“任务”**)。
// Steps to reproduce the behavior: Run this class and receive several messages:
public class Subscriber : ISubscriber
{
private readonly IBus _bus;
private readonly IMessageProcessor _myMessageProcessor;
private readonly ILogger<ISubscriber> _logger;
private const string _subscriptionId = "MySubs";
public Subscriber(IBus bus, IMessageProcessor myMessageProcessor, ILogger<ISubscriber> logger)
{
_bus = bus;
_myMessageProcessor = myMessageProcessor;
_logger = logger;
}
public void Subscribe()
{
_bus.PubSub.SubscribeAsync<MyCustomMessage>(_subscriptionId, OnMyCustomMessage, c => c.WithAutoDelete(false));
_bus.Advanced.Connected += Advanced_Connected;
}
private void Advanced_Connected(object sender, EventArgs e)
{
_bus.PubSub.SubscribeAsync<MyCustomMessage>(_subscriptionId, OnMyCustomMessage, c => c.WithAutoDelete(false));
}
private void OnMyCustomMessage(MyCustomMessage message)
{
try {
if (_logger.IsEnabled(LogLevel.Debug))
_logger.LogDebug($"Received message #{message.Id}. {Newtonsoft.Json.JsonConvert.SerializeObject(message)}");
_myMessageProcessor.ProcessCustomMessage(message);
}
catch (Exception ex)
{
_logger.LogError(ex, $"{nameof(Subscriber)}.{nameof(OnMyCustomMessage)}: args: {Newtonsoft.Json.JsonConvert.SerializeObject(message)}");
}
}
}
预期行为
预期的行为是对每条消息进行 100% 同步处理,无需异步,不需要异步代码。
版本控制:
EasyNetQ 版本:[例如7.3.6] RabbitMQ 版本 [例如25.1.1]
其他背景:
更新到 7.4.* 后,我降级了,因为 7.3.8 版本中有非阻塞调用的更新,但我只需要阻塞调用。 您能指导我如何在 PubSub 中使用消息阻止处理程序吗?
我尝试使用 _bus.PubSub.Subscribe 而不是 _bus.PubSub.SubscribeAsync,但没有成功。
好吧,正如 EasyNetQ 开发人员回答的那样,现在的解决方案可能是在声明或订阅队列时使用 WithPrefetchCount(1) 方法。
await _bus.PubSub.SubscribeAsync<MyCustomMessage>(_subscriptionId, OnMyCustomMessage, c => {
c.WithAutoDelete(false);
c.WithPrefetchCount(1);
});