提供的锁无效。锁已过期,或者消息已从队列中删除

问题描述 投票:0回答:5

我正在使用 Microsoft Azure 服务总线队列来处理计算,并且我的程序运行良好几个小时,但随后我开始为从那时起处理的每条消息获取此异常。我不知道从哪里开始,因为前几个小时一切都运行良好。我的代码似乎也是准确的。这是我处理 Azure 服务总线消息的方法。

public static async Task processCalculations(BrokeredMessage message)
{
    try
    {
        if (message != null)
        {
            if (connection == null || !connection.IsConnected)
            {
                connection = await ConnectionMultiplexer.ConnectAsync("connection,SyncTimeout=10000,ConnectTimeout=10000");
                //connection = ConnectionMultiplexer.Connect("connection,SyncTimeout=10000,ConnectTimeout=10000");
            }
            cache = connection.GetDatabase();
            string sandpKey = message.Properties["sandp"].ToString();
            string dateKey = message.Properties["date"].ToString();
            string symbolclassKey = message.Properties["symbolclass"].ToString();
            string stockdataKey = message.Properties["stockdata"].ToString();
            string stockcomparedataKey = message.Properties["stockcomparedata"].ToString();
            var sandpTask = cache.GetAsync<List<StockData>>(sandpKey);
            var dateTask = cache.GetAsync<DateTime>(dateKey);
            var symbolinfoTask = cache.GetAsync<SymbolInfo>(symbolclassKey);
            var stockdataTask = cache.GetAsync<List<StockData>>(stockdataKey);
            var stockcomparedataTask = cache.GetAsync<List<StockMarketCompare>>(stockcomparedataKey);
            await Task.WhenAll(sandpTask, dateTask, symbolinfoTask,
                stockdataTask, stockcomparedataTask);
            List<StockData> sandp = sandpTask.Result;
            DateTime date = dateTask.Result;
            SymbolInfo symbolinfo = symbolinfoTask.Result;
            List<StockData> stockdata = stockdataTask.Result;
            List<StockMarketCompare> stockcomparedata = stockcomparedataTask.Result;
            StockRating rating = performCalculations(symbolinfo, date, sandp, stockdata, stockcomparedata);
            if (rating != null)
            {
                saveToTable(rating);
                if (message.LockedUntilUtc.Minute <= 1)
                {
                    await message.RenewLockAsync();
                }
                await message.CompleteAsync(); // getting exception here
            }
            else
            {
                Console.WriteLine("Message " + message.MessageId + " Completed!");
                await message.CompleteAsync();
            }
        }
    }
    catch (TimeoutException time)
    {
        Console.WriteLine(time.Message);
    }
    catch (MessageLockLostException locks)
    {
        Console.WriteLine(locks.Message);
    }
    catch (RedisConnectionException redis)
    {
        Console.WriteLine("Start the redis server service!");
    }
    catch (MessagingCommunicationException communication)
    {
        Console.WriteLine(communication.Message);
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.Message);
        Console.WriteLine(ex.StackTrace);
    }
}

我检查锁定到期之前的时间,如果需要,我会调用锁定更新 - 它更新锁定时没有错误,但我仍然收到此异常。

timeLeft = message.LockedUntilUtc - DateTime.UtcNow;
if (timeLeft.TotalMinutes <= 2)
{
    //Console.WriteLine("Renewed lock! " + ((TimeSpan)(message.LockedUntilUtc - DateTime.UtcNow)).TotalMinutes);
    message.RenewLock();
}

catch (MessageLockLostException locks)
{
    Console.WriteLine("Delivery Count: " + message.DeliveryCount);
    Console.WriteLine("Enqueued Time: " + message.EnqueuedTimeUtc);
    Console.WriteLine("Expires Time: " + message.ExpiresAtUtc);
    Console.WriteLine("Locked Until Time: " + message.LockedUntilUtc);
    Console.WriteLine("Scheduled Enqueue Time: " + message.ScheduledEnqueueTimeUtc);
    Console.WriteLine("Current Time: " + DateTime.UtcNow);
    Console.WriteLine("Time Left: " + timeLeft);
}

到目前为止我所知道的是我的代码运行良好一段时间,并且更新锁被调用并工作,但我仍然收到锁异常并在该异常内。

我输出了剩余的时间,随着代码的运行,它不断增加时间差,这让我相信锁到期之前的时间不会以某种方式改变?

c# azure distributed azureservicebus
5个回答
45
投票

我花了几个小时试图理解为什么我得到了

MessageLockLostException
。对我来说,原因是 AutoComplete 默认为 true。

如果您要调用

messsage.Complete()
(或
CompleteAsync()
),那么您应该实例化一个
OnMessageOptions
对象,将
AutoComplete
设置为 false,并将其传递到您的
OnMessage
调用中。

var options = new OnMessageOptions();
options.AutoComplete = false;

client.OnMessage(processCalculations, options);

22
投票

我花了 2 天的时间来解决类似的问题 - 同样的异常。
这个异常可能有多种原因,我将描述几个配置选项,可以帮助你陌生人......

ServiceBus 队列或主题订阅配置:

    队列/主题订阅上的
  • 消息锁定持续时间太低,将其设置为大约。消息处理时间

ServiceBusClient 选项配置:

  • tryTimeout 太短,将其设置为 ~10 秒以进行诊断

ServiceBusProcessor 选项配置:

  • AutoCompleteMessages 默认设置为 true,设置为 false
  • PrefetchCount 太高,为了诊断将其设置为 0
  • ReceiveMode将其设置为ServiceBusReceiveMode.PeekLock
  • MaxConcurrentCalls 用于诊断,将其设置为 1

找到正确的值(针对给定系统进行优化)后,我不再观察到任何问题。


18
投票

我也遇到了类似的问题。消息已成功处理,但当消息完成时,服务总线不再具有有效的锁。事实证明我的 TopicClient.PrefetchCount 太高了。

看来,所有预取的消息一旦被获取,就会开始锁定。如果您的累积消息处理时间超过锁定超时,则所有其他预取消息将无法完成。它将返回服务巴士。


2
投票

就我而言,我只是在本地计算机上开发 V2,而 V1 已经部署并运行。

由于 V1 部署在 Azure 中(更接近同一个队列)并在发布模式下编译(相对于我的本地版本在调试模式下),部署的版本始终“赢得”队列资源的并发性。

这就是消息不再在队列中的原因:它被我的代码的部署版本消耗了。我知道这有点愚蠢。


1
投票

当您创建客户端订阅时,不要手动更新锁定,而是尝试使用客户端的 OnMessageOptions() 自动更新它,如下所示:

        OnMessageOptions options = new OnMessageOptions();

        options.AutoRenewTimeout = TimeSpan.FromMinutes(1);

        try
        {
            client = Subscription.CreateClient();

            client.OnMessageAsync(MessageReceivedComplete, options);
        }
        catch (Exception ex)
        {
            throw new Exception (ex);
        }
© www.soinside.com 2019 - 2024. All rights reserved.