Azure的服务总线的消息锁不被续约?

问题描述 投票:1回答:2

我建立了一个服务,以支持多个队列订阅Azure中服务总线,但我发现了一些奇怪的行为。

我的订阅单例类有看起来像这样的方法:

    public void Subscribe<TMessage>(Func<TMessage, Task> execution, int maxDop = 1, int ttl = 60) where TMessage : IServiceBusMessage
    {
        try
        {
            var messageLifespan = TimeSpan.FromSeconds(ttl);
            var messageType = typeof(TMessage);
            if (!_activeSubscriptionClients.TryGetValue(messageType, out var subscriptionClient))
            {
                subscriptionClient = _subscriptionClientFactory.Create(typeof(TMessage)).GetAwaiter().GetResult();
                if (subscriptionClient.OperationTimeout < messageLifespan) subscriptionClient.OperationTimeout = messageLifespan;
                if (subscriptionClient.ServiceBusConnection.OperationTimeout < messageLifespan)
                    subscriptionClient.ServiceBusConnection.OperationTimeout = messageLifespan;
                _activeSubscriptionClients.AddOrUpdate(messageType, subscriptionClient, (key, value) => value);
            }

            var messageHandlerOptions = new MessageHandlerOptions(OnException)
            {
                MaxConcurrentCalls = maxDop,
                AutoComplete = false,
                MaxAutoRenewDuration = messageLifespan,
            };


            subscriptionClient.RegisterMessageHandler(
                async (azureMessage, cancellationToken) =>
                {
                    try
                    {
                        var textPayload = _encoding.GetString(azureMessage.Body);
                        var message = JsonConvert.DeserializeObject<TMessage>(textPayload);
                        if (message == null)
                            throw new FormatException($"Cannot deserialize the message payload to type '{typeof(TMessage).FullName}'.");
                        await execution.Invoke(message);
                        await subscriptionClient.CompleteAsync(azureMessage.SystemProperties.LockToken);
                    }
                    catch (Exception ex)
                    {
                        _logger.LogError(ex, "ProcessMessagesAsync(Message, CancellationToken)");
                        await subscriptionClient.AbandonAsync(azureMessage.SystemProperties.LockToken);
                    }
                }
                , messageHandlerOptions);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Subscribe(Action<TMessage>)");
            throw;
        }
    }

我们的想法是,你订阅了Azure的服务总线为特定类型的消息,并直接对应到一个队列。在您的订阅,您在如何处理消息的代表通过。

这似乎工作...有一点需要注意。

无论什么样的我设置为ttlMaxAutoRenewDurationOperationTimeout,在长期运行过程中对于任何给定的消息,一分钟的消息从队列解锁之后和另一用户拾起,开始处理。

我的理解是,也正是MaxAutoRenewDuration应该防止...但它似乎并没有阻止任何东西。

谁能告诉我什么,我需要做的不同,以确保消费者通过完成拥有该消息?

c# .net-core azureservicebus azure-servicebus-queues azure-servicebus-subscriptions
2个回答
0
投票

还有我能想到的,你可能想看看几个选项。

  1. 相反,在SubscriptionClient使用默认ReceiveMode = PeekLock的,将其设置为ReceiveAndDelete所以一旦消息被消耗掉,它会从队列中删除,并不会被任何其他客户端被消耗掉,这是否意味着你必须妥善地处理异常,执行重试自己;
  2. 看看OperationTimeout其中根据DOCO它是Duration after which individual operations will timeout

0
投票

事实证明,消费者正在运行是失败默默的,而不是返回一个失败状态代码(或任何其他)的远程进程;自动刷新机制挂着等待结果,所以该消息最终超时。

我不是如何防止清楚了,但一旦我固定在远程进程的问题,这个问题不再重现性。

这个故事的寓意:如果一切看起来正常,它仍然超时,似乎自动刷新机制共享一些资源,与正在等待异步操作。这可能是另一个地方去寻找故障。

© www.soinside.com 2019 - 2024. All rights reserved.