我正在尝试创建
SelectAwait
(和其他)的并发版本,作为 System.Linq.Async
的一部分,它为 IAsyncEnumerable
提供扩展方法。这是我正在使用的代码:
private async IAsyncEnumerable<TOut> SelectParallelAsync<T, TOut>(
this IAsyncEnumerable<T> enumerable, Func<T, Task<TOut>> predicate)
{
var sem = new SemaphoreSlim(1, 10);
var retVal = enumerable.Select(item => {
var task = Task.Run(async () => {
await sem.WaitAsync();
var retVal = await predicate(item);
sem.Release();
return retVal;
});
return task;
});
await foreach (var item in retVal)
yield return await item;
}
Enumerable 是一个从 0-1000 的简单枚举。代码被称为
.SelectParallelAsync(async i =>
{
Console.WriteLine($"In Select : {i}");
await Task.Delay(1000);
return i + 5;
});
我期待所有任务立即开始并一次运行 10 个。但是,它们会一个接一个地被触发。有什么办法可以实现这样的目标吗?非常感谢。
编辑:我正在使用信号量而不是
Parallel.ForEach
或 .AsParallel().WithMaxDegreeOfParallelism
因为我想在多个方法之间共享这个信号量。此外,PLINQ 的可扩展性并不是很好,我无法向其添加自己的扩展方法。
编辑 2:为了完成添加了我自己的解决方案。
源
IAsyncEnumerable<T> enumerable
的枚举由结果AsyncEnumerable<TOut>
的枚举驱动。当结果序列的消费者请求序列的第一个 TOut
元素时,此时将从源 T
请求一个 IAsyncEnumerable<T> enumerable
值。然后这个值会被投影到一个Task<TOut>
,然后等待这个任务,最后把任务的结果返回给消费者。一切都按顺序发生。没有并发性。在消费者请求元素之前和元素交付给消费者之后没有内部活动。
向 LINQ 运算符添加并发比乍一看要复杂得多。这意味着当消费者请求第一个元素时,必须同时启动 10 个任务。当这些任务中的任何一个完成时,另一个任务必须在其位置自动开始,而无需消费者请求。并且必须限制可以在内部存储多少任务,这些任务尚未被消费者请求。并且在达到此限制时不应启动更多任务,直到消费者拿走一个任务并创建一个空插槽。并且您必须考虑如何处理主动启动任务并观察其完成的内部机制,以防消费者认为它已经足够,并且不会再请求任何元素(通过退出消费循环)。您还必须考虑如何处理存储的任务,以防即将交付给消费者的任务失败。如果不止一项任务失败怎么办?如果用
CancellationToken
取消枚举怎么办?
仅使用
TaskCompletionSource
s 和 SemaphoreSlim
s 等原始工具正确完成所有这些操作,而不使用 Channel<T>
等高级工具,是非常困难的。如果您不熟悉Channel<T>
,我的建议是花一些时间熟悉它。这是一个非常简单的机制。如果您对 BlockingCollection<T>
类有所了解,Channel<T>
是它的异步版本。
AwaitResults
方法,可以用来很容易地实现 SelectParallelAsync
运算符:
private IAsyncEnumerable<TOut> SelectParallelAsync<T, TOut>(
this IAsyncEnumerable<T> enumerable, Func<T, Task<TOut>> predicate)
{
return enumerable
.Select(item => predicate(item))
.AwaitResults(maxConcurrency: 10);
}
您可以研究该实现,并根据您的需要对其进行更改。