通道<Task> - 编写器在下一步逻辑之前等待任务结果

问题描述 投票:0回答:1

我有一个 Worker 类,用于处理作业消息。

  1. 我希望它串行处理消息,这样我就不需要在多线程调用者之间同步(锁定)其内部数据更改。
    为此,我使用通道(多写,单读)。

  2. 有时调用用户(作业消息编写者)希望等待作业完成,并获得一些输出结果(假设是一个字符串)。 因为用户希望根据Job结果继续执行以下步骤。
    为此,我将通道创建为

    Channel<Task<string>>
    (多写,单读)。

private Channel<Task<string>> Tasks;

private void Start()
{
    Tasks = Channel.CreateUnbounded<Task<string>>(new UnboundedChannelOptions()
    {
        SingleReader = true,
        SingleWriter = false,
        AllowSynchronousContinuations = false,
    });

    Task.Run(async () => {
        while (true)
            await JobHandle(await Tasks.Reader.ReadAsync());
    });
}

private async Task JobHandle(Task<string> task) { task.Start(); await task; }

public async Task UserCall()
{
    Task job;
    await Tasks.Writer.WriteAsync(job = new Task<string>(JobLengthy));

    if (job.Result == "Ok")
        Console.WriteLine("Success.")
    else
        Console.WriteLine("Failed.")

}

private string JobLengthy() { return "Ok"; }

*。这是服务器每秒处理多条 (1-10) 条消息的小样本。

我尝试了,它按预期工作,但我不知道这是否是合法的方法。

  1. 您认为这是合法的做法吗?
  2. 也许你知道其他更好的方法?
c# asynchronous async-await channel producer-consumer
1个回答
0
投票

有更简单的解决方案可以解决您的问题,无需使用

Channel<T>
。您可以使用容量为 SemaphoreSlim
1
 强制执行消息的串行处理:

SemaphoreSlim _semaphore = new(1, 1);

public async Task UserCall()
{
    string result;

    await _semaphore.WaitAsync();
    try
    {
        result = await Task.Run(() => JobLengthy());
    }
    finally { _semaphore.Release(); }

    if (result == "Ok")
        Console.WriteLine("Success.")
    else
        Console.WriteLine("Failed.")
}

SemaphoreSlim
未记录为 FIFO,但 WaitAsync
 方法的
当前实现
(.NET 7) 将待处理任务放置在链表结构中(
m_asyncHead
/
m_asyncTail
字段) ,具有 FIFO 行为。

© www.soinside.com 2019 - 2024. All rights reserved.