我有一个 Worker 类,用于处理作业消息。
我希望它串行处理消息,这样我就不需要在多线程调用者之间同步(锁定)其内部数据更改。
为此,我使用通道(多写,单读)。
有时调用用户(作业消息编写者)希望等待作业完成,并获得一些输出结果(假设是一个字符串)。
因为用户希望根据Job结果继续执行以下步骤。
为此,我将通道创建为
Channel<Task<string>>
(多写,单读)。
private Channel<Task<string>> Tasks;
private void Start()
{
Tasks = Channel.CreateUnbounded<Task<string>>(new UnboundedChannelOptions()
{
SingleReader = true,
SingleWriter = false,
AllowSynchronousContinuations = false,
});
Task.Run(async () => {
while (true)
await JobHandle(await Tasks.Reader.ReadAsync());
});
}
private async Task JobHandle(Task<string> task) { task.Start(); await task; }
public async Task UserCall()
{
Task job;
await Tasks.Writer.WriteAsync(job = new Task<string>(JobLengthy));
if (job.Result == "Ok")
Console.WriteLine("Success.")
else
Console.WriteLine("Failed.")
}
private string JobLengthy() { return "Ok"; }
*。这是服务器每秒处理多条 (1-10) 条消息的小样本。
我尝试了,它按预期工作,但我不知道这是否是合法的方法。
有更简单的解决方案可以解决您的问题,无需使用
Channel<T>
。您可以使用容量为 SemaphoreSlim
的
1
强制执行消息的串行处理:
SemaphoreSlim _semaphore = new(1, 1);
public async Task UserCall()
{
string result;
await _semaphore.WaitAsync();
try
{
result = await Task.Run(() => JobLengthy());
}
finally { _semaphore.Release(); }
if (result == "Ok")
Console.WriteLine("Success.")
else
Console.WriteLine("Failed.")
}
SemaphoreSlim
未记录为 FIFO,但 WaitAsync
方法的当前实现 (.NET 7) 将待处理任务放置在链表结构中(
m_asyncHead
/m_asyncTail
字段) ,具有 FIFO 行为。