LiteDB 死锁 - 我可能错误地使用了 LiteDB

问题描述 投票:0回答:1

所以我正在用 C# 编写一个服务器。它有一个 LiteDB 数据库来存储消息。每次服务器收到消息时,都会将消息存储在 DB 中,并将副本发送到正确的目的地。

由于 LiteDB 应该是线程安全的,所以我不使用锁来同步对数据库的读/写。

我做了一个测试(在 WPF 应用程序上),连接 100 个客户端,每个客户端都在一个单独的任务中。每个客户端向所有其他客户端发送一条消息,并期望从所有其他客户端接收消息。然后,我确保所有消息均按预期发送/接收。对于每个客户端和消息,我添加一些随机延迟。 (稍后会详细介绍。)

我在测试中观察到一些奇怪的行为,我希望得到一些帮助。 前 400-2000 条消息被服务器正确接收、插入到数据库中并转发到目的地。然而,过了一段时间,一切都变慢了。每秒处理一次消息。也就是说,我看到一条文本(输出到控制台)说收到了一条消息,然后将该消息插入到数据库中,但我没有看到一条文本说它已成功插入。我假设此消息正在等待某些内部 LiteDB 锁解锁或其他东西。

最终,1 分钟后,我得到以下异常:

Exception thrown: 'LiteDB.LiteException' in LiteDB.dll
Database lock timeout when entering in transaction mode after 00:01:00

我猜抛出这个异常是因为存在一些死锁。我只是不明白是什么原因造成的。 (所以我并不真正关心异常本身,我只是提到它,因为它可能有助于确定实际问题的原因。)

当消息之间的延迟为 0 时,此问题发生得更快。当我设置大约 50 毫秒的延迟时,我设法通过了测试。我假设我在 LiteDB 中出现了一些死锁,或者(在低延迟的情况下)我有太多同时运行的任务? (不确定后者是否属实。)

我添加了服务器代码的简化版本。

public class Worker : BackgroundService
{
    private static readonly TimeSpan _collectionInterval = TimeSpan.FromSeconds(1);
    private static readonly TimeSpan _expirationThreshold = TimeSpan.FromMinutes(2);
    
    private readonly ConcurrentDictionary<string, Client> Clients = new();

    private readonly DBHandler _dbHandler = DBHandler.GetInstance(AppDomain.CurrentDomain.BaseDirectory);

    protected override async Task ExecuteAsync(CancellationToken cancellationToken)
    {
        CancellationTokenSource cancellationTokenSource = CancellationTokenSource
                .CreateLinkedTokenSource(cancellationToken);
        CancellationToken token = cancellationTokenSource.Token;
    
        List<Task> tasks = new();

        try
        {
            listener.Start();

            while (!token.IsCancellationRequested)
            {
                TcpClient tcpClient;
                try
                {
                    // Accept an incoming client connection
                    tcpClient = await listener.AcceptTcpClientAsync(token);
                }
                catch (Exception ex)
                {
                    cancellationTokenSource.Cancel();
                    break;
                }

                // Handle the client connection in a separate task
                tasks.Add(
                    HandleClientConnectionAsync(tcpClient, token)
                );
            }

            cancellationTokenSource.Cancel();
            await Task.WhenAll(tasks);
        }
        finally
        {
            _dbHandler.Dispose();
            listener.Stop();
            cancellationTokenSource.Dispose();
            token.ThrowIfCancellationRequested();
        }
    }

    private async Task HandleClientConnectionAsync(TcpClient tcpClient, CancellationToken token)
    {
        Client? client = null;

        try
        {
            client = new(tcpClient);
            client.EnableKeepAlive();

            await SendMessageAsync(new WelcomeMessage(), client, token);

            Message? message = await ReceiveMessageAsync(client, token);
            
            if (Clients.TryAdd(message.FromStationID, client))
            {
                await ReceiveMessagesAsync(client, token);
            }
        }
        finally
        {
            // Clean up closed connection
            if (client is not null)
            {
                if (client.StationID is not null)
                {
                    Clients.TryRemove(client.StationID, out _);
                }
                client.CloseTcpAndStream();
            }
        }
    }
    
    private static async Task SendMessageAsync(Message message, Client client, CancellationToken token)
    {
        //use message.Serialize(), then add 4 byte header to create byte[] buffer
        
        await client.WriteAsync(buffer, token);
    }
    
    private static async Task<Message?> ReceiveMessageAsync(Client client, CancellationToken token)
    {
        // use await client.ReadAsync to receive 4 byte header
        // use await client.ReadAsync to receive the message bytes into byte[] buffer
        
        return
            buffer is null ?
            null :
            Message.Deserialize(Encoding.UTF8.GetString(buffer, 0, buffer.Length));
    }

    private async Task ReceiveMessagesAsync(Client client, CancellationToken token)
    {
        while (client.IsConnected && !token.IsCancellationRequested)
        {
            Message? message = await ReceiveMessageAsync(client, token);
            
            if (token.IsCancellationRequested || message is null)
            {
                break;
            }

            await ProcessMessageByTypeAsync(message, token);
        }
    }

    private async Task ProcessMessageByTypeAsync(Message message, CancellationToken token)
    {
        if (message is null)
        {
            return;
        }
        else if (message is AckMessage ackMessage)
        {
            await ProcessAckMessageAsync(ackMessage, token);
        }
        else if (message is DataMessage || message is FeedbackMessage)
        {
            await ProcessDataMessageAsync(message, token);
        }
        // Ignore other messages
    }

    private async Task ProcessDataMessageAsync(Message message, CancellationToken token)
    {
        if (message is null)
        {
            return;
        }

        if (message.ToStationID != null)
        {
            _dbHandler.Insert(message);

            Client client;

            if (Clients.TryGetValue(message.ToStationID, out client))
            {
                await SendMessageAsync(message, client, token);
            }
        }
    }

    private async Task ProcessAckMessageAsync(AckMessage ackMessage, CancellationToken token)
    {
        _dbHandler.DeleteMessageByID(ackMessage.AckedMessageID);
    }
}

以及相关的DB代码:

private ILiteCollection<Message> GetCollection(Message message)
{
    var msgCollection = _dataBase
        .GetCollection<Message>(DB.MESSAGES);

    msgCollection.EnsureIndex((Message m) => m.MessageID);

    return msgCollection;
}

private ILiteCollection<Message> GetMessages()
{
    return GetCollection((Message)null);
}

public BsonValue Insert(Message message)
{
    message.MessageID = 0;
    return GetMessages()
        .Insert(message);
}

public bool DeleteMessageByID(BsonValue messageID)
{
    return GetMessages()
        .Delete(messageID);
}
c# server task deadlock litedb
1个回答
0
投票

我可能也遇到过类似的问题。

例外是:

Collection 'MyCollection' lock timeout when entering in write mode after 00:01:00

...由在等待 UI 同步上下文时保持稳定的相同逻辑引发。 但是,一旦

SignalR
处理程序尝试执行此方法,就会发生这些异常。这就是我看到与你的相似之处
BackgroundService

再次从 UI 同步上下文手动等待此方法立即修复了所有问题。 像这样:

private void MySignalrEventHandler()
{
    Task.Factory.StartNew(
        async () =>
        {
            await MyUsuallyRockstableMethod(); // executing up to thousands of upserts on tens of collections
        },
        CancellationToken.None,
        TaskCreationOptions.None,
        taskSchedulerForDesiredSyncContext // eg get via TaskScheduler.FromCurrentSynchronizationContext(); while on UI thread and pass around
        ); 
}

希望有帮助!

© www.soinside.com 2019 - 2024. All rights reserved.