我有一个异步任务流,通过将异步lambda应用于项目流来生成:
IAsyncEnumerable<int> streamOfItems = AsyncEnumerable.Range(1, 10);
IAsyncEnumerable<Task<string>> streamOfTasks = streamOfItems.Select(async x =>
{
await Task.Delay(100);
return x.ToString();
})
AsyncEnumerable.Range
包中提供了上述方法Select
和System.Linq.Async
。
我想要的结果是结果流,表示为System.Linq.Async
。结果必须以与原始任务相同的顺序流式传输。另外,还必须限制流的枚举,以便在任何给定时间激活的任务数量不得超过指定数量。
[我想以IAsyncEnumerable<string>
类型的扩展方法形式的解决方案,以便我可以多次链接它并形成处理管线,其功能与IAsyncEnumerable<Task<T>>
管线相似,但表达流畅。下面是理想的扩展方法的签名:
TPL Dataflow
也接受public async static IAsyncEnumerable<TResult> AwaitResults<TResult>(
this IAsyncEnumerable<Task<TResult>> source,
int concurrencyLevel);
作为参数将是一个不错的功能。
Update:为完整起见,我将通过链接两次CancellationToken
方法形成的流利处理管道示例。该管道以PLINQ块开始,只是为了证明可以混合使用PLINQ和Linq.Async。
AwaitResults
预期输出:
结果:1、2、3、4、5、6、7、8、9、10、11、12、13、14、15、16、17、18、19、20
[这是我对int[] results = await Enumerable.Range(1, 20)
.AsParallel()
.AsOrdered()
.WithDegreeOfParallelism(2)
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
.Select(x =>
{
Thread.Sleep(100); // Simulate some CPU-bound operation
return x;
})
.ToAsyncEnumerable()
.Select(async x =>
{
await Task.Delay(300); // Simulate some I/O operation
return x;
})
.AwaitResults(concurrencyLevel: 5)
.Select(x => Task.Run(() =>
{
Thread.Sleep(100); // Simulate another CPU-bound operation
return x;
}))
.AwaitResults(concurrencyLevel: 2)
.ToArrayAsync();
Console.WriteLine($"Results: {String.Join(", ", results)}");
方法的实现。它基于用于控制并发级别的AwaitResults
,以及用作异步队列的SemaphoreSlim
。源SemaphoreSlim
的枚举发生在即发即弃任务(馈送器)内部,该任务将热门任务推送到通道。它还将每个任务的续传附加到信号量释放的位置。
该方法的最后一部分是让步循环,在该循环中,任务从通道一个接一个地出队,然后顺序等待。这样,结果的生成顺序与源流中的任务相同。
此实现要求每个任务等待两次,这意味着它无法用于类型为Channel<Task<TResult>>
的源,因为Channel<Task<TResult>>
IAsyncEnumerable<Task<TResult>>
。
IAsyncEnumerable<ValueTask<TResult>>
一个重要的细节是最终屈服循环周围的try-finally块。对于方法的调用者过早地放弃结果流的枚举的情况,这是必需的。在那种情况下,源流的枚举也应该终止,并且此终止使用ValueTask
向后传播。没有它,馈线任务将永远无法完成,对象将永远不会被垃圾回收,并且内存将被泄漏。