所以我正在用 C# 编写一个服务器。它有一个 LiteDB 数据库来存储消息。每次服务器收到消息时,都会将消息存储在 DB 中,并将副本发送到正确的目的地。
由于 LiteDB 应该是线程安全的,所以我不使用锁来同步对数据库的读/写。
我做了一个测试(在 WPF 应用程序上),连接 100 个客户端,每个客户端都在一个单独的任务中。每个客户端向所有其他客户端发送一条消息,并期望从所有其他客户端接收消息。然后,我确保所有消息均按预期发送/接收。对于每个客户端和消息,我添加一些随机延迟。 (稍后会详细介绍。)
我在测试中观察到一些奇怪的行为,我希望得到一些帮助。 前 400-2000 条消息被服务器正确接收、插入到数据库中并转发到目的地。然而,过了一段时间,一切都变慢了。每秒处理一次消息。也就是说,我看到一条文本(输出到控制台)说收到了一条消息,然后将该消息插入到数据库中,但我没有看到一条文本说它已成功插入。我假设此消息正在等待某些内部 LiteDB 锁解锁或其他东西。
最终,1 分钟后,我得到以下异常:
Exception thrown: 'LiteDB.LiteException' in LiteDB.dll
Database lock timeout when entering in transaction mode after 00:01:00
我猜抛出这个异常是因为存在一些死锁。我只是不明白是什么原因造成的。 (所以我并不真正关心异常本身,我只是提到它,因为它可能有助于确定实际问题的原因。)
当消息之间的延迟为 0 时,此问题发生得更快。当我设置大约 50 毫秒的延迟时,我设法通过了测试。我假设我在 LiteDB 中出现了一些死锁,或者(在低延迟的情况下)我有太多同时运行的任务? (不确定后者是否属实。)
我添加了服务器代码的简化版本。
public class Worker : BackgroundService
{
private static readonly TimeSpan _collectionInterval = TimeSpan.FromSeconds(1);
private static readonly TimeSpan _expirationThreshold = TimeSpan.FromMinutes(2);
private readonly ConcurrentDictionary<string, Client> Clients = new();
private readonly DBHandler _dbHandler = DBHandler.GetInstance(AppDomain.CurrentDomain.BaseDirectory);
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
CancellationTokenSource cancellationTokenSource = CancellationTokenSource
.CreateLinkedTokenSource(cancellationToken);
CancellationToken token = cancellationTokenSource.Token;
List<Task> tasks = new();
try
{
listener.Start();
while (!token.IsCancellationRequested)
{
TcpClient tcpClient;
try
{
// Accept an incoming client connection
tcpClient = await listener.AcceptTcpClientAsync(token);
}
catch (Exception ex)
{
cancellationTokenSource.Cancel();
break;
}
// Handle the client connection in a separate task
tasks.Add(
HandleClientConnectionAsync(tcpClient, token)
);
}
cancellationTokenSource.Cancel();
await Task.WhenAll(tasks);
}
finally
{
_dbHandler.Dispose();
listener.Stop();
cancellationTokenSource.Dispose();
token.ThrowIfCancellationRequested();
}
}
private async Task HandleClientConnectionAsync(TcpClient tcpClient, CancellationToken token)
{
Client? client = null;
try
{
client = new(tcpClient);
client.EnableKeepAlive();
await SendMessageAsync(new WelcomeMessage(), client, token);
Message? message = await ReceiveMessageAsync(client, token);
if (Clients.TryAdd(message.FromStationID, client))
{
await ReceiveMessagesAsync(client, token);
}
}
finally
{
// Clean up closed connection
if (client is not null)
{
if (client.StationID is not null)
{
Clients.TryRemove(client.StationID, out _);
}
client.CloseTcpAndStream();
}
}
}
private static async Task SendMessageAsync(Message message, Client client, CancellationToken token)
{
//use message.Serialize(), then add 4 byte header to create byte[] buffer
await client.WriteAsync(buffer, token);
}
private static async Task<Message?> ReceiveMessageAsync(Client client, CancellationToken token)
{
// use await client.ReadAsync to receive 4 byte header
// use await client.ReadAsync to receive the message bytes into byte[] buffer
return
buffer is null ?
null :
Message.Deserialize(Encoding.UTF8.GetString(buffer, 0, buffer.Length));
}
private async Task ReceiveMessagesAsync(Client client, CancellationToken token)
{
while (client.IsConnected && !token.IsCancellationRequested)
{
Message? message = await ReceiveMessageAsync(client, token);
if (token.IsCancellationRequested || message is null)
{
break;
}
await ProcessMessageByTypeAsync(message, token);
}
}
private async Task ProcessMessageByTypeAsync(Message message, CancellationToken token)
{
if (message is null)
{
return;
}
else if (message is AckMessage ackMessage)
{
await ProcessAckMessageAsync(ackMessage, token);
}
else if (message is DataMessage || message is FeedbackMessage)
{
await ProcessDataMessageAsync(message, token);
}
// Ignore other messages
}
private async Task ProcessDataMessageAsync(Message message, CancellationToken token)
{
if (message is null)
{
return;
}
if (message.ToStationID != null)
{
_dbHandler.Insert(message);
Client client;
if (Clients.TryGetValue(message.ToStationID, out client))
{
await SendMessageAsync(message, client, token);
}
}
}
private async Task ProcessAckMessageAsync(AckMessage ackMessage, CancellationToken token)
{
_dbHandler.DeleteMessageByID(ackMessage.AckedMessageID);
}
}
以及相关的DB代码:
private ILiteCollection<Message> GetCollection(Message message)
{
var msgCollection = _dataBase
.GetCollection<Message>(DB.MESSAGES);
msgCollection.EnsureIndex((Message m) => m.MessageID);
return msgCollection;
}
private ILiteCollection<Message> GetMessages()
{
return GetCollection((Message)null);
}
public BsonValue Insert(Message message)
{
message.MessageID = 0;
return GetMessages()
.Insert(message);
}
public bool DeleteMessageByID(BsonValue messageID)
{
return GetMessages()
.Delete(messageID);
}
我可能也遇到过类似的问题。
例外是:
Collection 'MyCollection' lock timeout when entering in write mode after 00:01:00
...由在等待 UI 同步上下文时保持稳定的相同逻辑引发。 但是,一旦
SignalR
处理程序尝试执行此方法,就会发生这些异常。这就是我看到与你的相似之处BackgroundService
。
再次从 UI 同步上下文手动等待此方法立即修复了所有问题。 像这样:
private void MySignalrEventHandler()
{
Task.Factory.StartNew(
async () =>
{
await MyUsuallyRockstableMethod(); // executing up to thousands of upserts on tens of collections
},
CancellationToken.None,
TaskCreationOptions.None,
taskSchedulerForDesiredSyncContext // eg get via TaskScheduler.FromCurrentSynchronizationContext(); while on UI thread and pass around
);
}
希望有帮助!