我建立了一个服务,以支持多个队列订阅Azure中服务总线,但我发现了一些奇怪的行为。
我的订阅单例类有看起来像这样的方法:
public void Subscribe<TMessage>(Func<TMessage, Task> execution, int maxDop = 1, int ttl = 60) where TMessage : IServiceBusMessage
{
try
{
var messageLifespan = TimeSpan.FromSeconds(ttl);
var messageType = typeof(TMessage);
if (!_activeSubscriptionClients.TryGetValue(messageType, out var subscriptionClient))
{
subscriptionClient = _subscriptionClientFactory.Create(typeof(TMessage)).GetAwaiter().GetResult();
if (subscriptionClient.OperationTimeout < messageLifespan) subscriptionClient.OperationTimeout = messageLifespan;
if (subscriptionClient.ServiceBusConnection.OperationTimeout < messageLifespan)
subscriptionClient.ServiceBusConnection.OperationTimeout = messageLifespan;
_activeSubscriptionClients.AddOrUpdate(messageType, subscriptionClient, (key, value) => value);
}
var messageHandlerOptions = new MessageHandlerOptions(OnException)
{
MaxConcurrentCalls = maxDop,
AutoComplete = false,
MaxAutoRenewDuration = messageLifespan,
};
subscriptionClient.RegisterMessageHandler(
async (azureMessage, cancellationToken) =>
{
try
{
var textPayload = _encoding.GetString(azureMessage.Body);
var message = JsonConvert.DeserializeObject<TMessage>(textPayload);
if (message == null)
throw new FormatException($"Cannot deserialize the message payload to type '{typeof(TMessage).FullName}'.");
await execution.Invoke(message);
await subscriptionClient.CompleteAsync(azureMessage.SystemProperties.LockToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "ProcessMessagesAsync(Message, CancellationToken)");
await subscriptionClient.AbandonAsync(azureMessage.SystemProperties.LockToken);
}
}
, messageHandlerOptions);
}
catch (Exception ex)
{
_logger.LogError(ex, "Subscribe(Action<TMessage>)");
throw;
}
}
我们的想法是,你订阅了Azure的服务总线为特定类型的消息,并直接对应到一个队列。在您的订阅,您在如何处理消息的代表通过。
这似乎工作...有一点需要注意。
无论什么样的我设置为ttl
或MaxAutoRenewDuration
的OperationTimeout
,在长期运行过程中对于任何给定的消息,一分钟的消息从队列解锁之后和另一用户拾起,开始处理。
我的理解是,也正是MaxAutoRenewDuration
应该防止...但它似乎并没有阻止任何东西。
谁能告诉我什么,我需要做的不同,以确保消费者通过完成拥有该消息?
还有我能想到的,你可能想看看几个选项。
ReceiveMode = PeekLock
的,将其设置为ReceiveAndDelete所以一旦消息被消耗掉,它会从队列中删除,并不会被任何其他客户端被消耗掉,这是否意味着你必须妥善地处理异常,执行重试自己;OperationTimeout
其中根据DOCO它是Duration after which individual operations will timeout
事实证明,消费者正在运行是失败默默的,而不是返回一个失败状态代码(或任何其他)的远程进程;自动刷新机制挂着等待结果,所以该消息最终超时。
我不是如何防止清楚了,但一旦我固定在远程进程的问题,这个问题不再重现性。
这个故事的寓意:如果一切看起来正常,它仍然超时,似乎自动刷新机制共享一些资源,与正在等待异步操作。这可能是另一个地方去寻找故障。