自定义 LINQ 运算符未同时运行的任务

问题描述 投票:0回答:1

我正在尝试创建

SelectAwait
(和其他)的并发版本,作为
System.Linq.Async
的一部分,它为
IAsyncEnumerable
提供扩展方法。这是我正在使用的代码:

private async IAsyncEnumerable<TOut> SelectParallelAsync<T, TOut>(
    this IAsyncEnumerable<T> enumerable, Func<T, Task<TOut>> predicate)
{
    var sem = new SemaphoreSlim(1, 10);
    
    var retVal = enumerable.Select(item => {
        var task = Task.Run(async () => {
            await sem.WaitAsync();
            var retVal = await predicate(item);
            sem.Release();

            return retVal;
        });

        return task;
    });

    await foreach (var item in retVal)
        yield return await item;
}

Enumerable 是一个从 0-1000 的简单枚举。代码被称为

.SelectParallelAsync(async i =>
{
    Console.WriteLine($"In Select : {i}");
    await Task.Delay(1000);
    return i + 5;
});

我期待所有任务立即开始并一次运行 10 个。但是,它们会一个接一个地被触发。有什么办法可以实现这样的目标吗?非常感谢。

编辑:我正在使用信号量而不是

Parallel.ForEach
.AsParallel().WithMaxDegreeOfParallelism
因为我想在多个方法之间共享这个信号量。此外,PLINQ 的可扩展性并不是很好,我无法向其添加自己的扩展方法。

编辑 2:为了完成添加了我自己的解决方案。

c# linq async-await concurrency iasyncenumerable
1个回答
1
投票

IAsyncEnumerable<T> enumerable
的枚举由结果
AsyncEnumerable<TOut>
的枚举驱动。当结果序列的消费者请求序列的第一个
TOut
元素时,此时将从源
T
请求一个
IAsyncEnumerable<T> enumerable
值。然后这个值会被投影到一个
Task<TOut>
,然后等待这个任务,最后把任务的结果返回给消费者。一切都按顺序发生。没有并发性。在消费者请求元素之前和元素交付给消费者之后没有内部活动。

向 LINQ 运算符添加并发比乍一看要复杂得多。这意味着当消费者请求第一个元素时,必须同时启动 10 个任务。当这些任务中的任何一个完成时,另一个任务必须在其位置自动开始,而无需消费者请求。并且必须限制可以在内部存储多少任务,这些任务尚未被消费者请求。并且在达到此限制时不应启动更多任务,直到消费者拿走一个任务并创建一个空插槽。并且您必须考虑如何处理主动启动任务并观察其完成的内部机制,以防消费者认为它已经足够,并且不会再请求任何元素(通过退出消费循环)。您还必须考虑如何处理存储的任务,以防即将交付给消费者的任务失败。如果不止一项任务失败怎么办?如果用

CancellationToken
取消枚举怎么办?

仅使用

TaskCompletionSource
s 和
SemaphoreSlim
s 等原始工具正确完成所有这些操作,而不使用
Channel<T>
等高级工具,是非常困难的。如果您不熟悉
Channel<T>
,我的建议是花一些时间熟悉它。这是一个非常简单的机制。如果您对
BlockingCollection<T>
类有所了解,
Channel<T>
是它的异步版本。

在另一个问题中,我发布了一个

AwaitResults
方法,可以用来很容易地实现
SelectParallelAsync
运算符:

private IAsyncEnumerable<TOut> SelectParallelAsync<T, TOut>(
    this IAsyncEnumerable<T> enumerable, Func<T, Task<TOut>> predicate)
{
    return enumerable
        .Select(item => predicate(item))
        .AwaitResults(maxConcurrency: 10);
}

您可以研究该实现,并根据您的需要对其进行更改。

© www.soinside.com 2019 - 2024. All rights reserved.