如何等待IAsyncEnumerable >,具有特定的并发级别的结果

问题描述 投票:0回答:1

我有一个异步任务流,通过将异步lambda应用于项目流来生成:

IAsyncEnumerable<int> streamOfItems = AsyncEnumerable.Range(1, 10);
IAsyncEnumerable<Task<string>> streamOfTasks = streamOfItems.Select(async x =>
{
    await Task.Delay(100);
    return x.ToString();
})

AsyncEnumerable.Range包中提供了上述方法SelectSystem.Linq.Async

我想要的结果是结果流,表示为System.Linq.Async。结果必须以与原始任务相同的顺序流式传输。另外,还必须限制流的枚举,以便在任何给定时间激活的任务数量不得超过指定数量。

[我想以IAsyncEnumerable<string>类型的扩展方法形式的解决方案,以便我可以多次链接它并形成处理管线,其功能与IAsyncEnumerable<Task<T>>管线相似,但表达流畅。下面是理想的扩展方法的签名:

TPL Dataflow

也接受public async static IAsyncEnumerable<TResult> AwaitResults<TResult>( this IAsyncEnumerable<Task<TResult>> source, int concurrencyLevel); 作为参数将是一个不错的功能。


Update:为完整起见,我将通过链接两次CancellationToken方法形成的流利处理管道示例。该管道以PLINQ块开始,只是为了证明可以混合使用PLINQ和Linq.Async。

AwaitResults

预期输出:

结果:1、2、3、4、5、6、7、8、9、10、11、12、13、14、15、16、17、18、19、20

c# async-await task-parallel-library iasyncenumerable
1个回答
0
投票

[这是我对int[] results = await Enumerable.Range(1, 20) .AsParallel() .AsOrdered() .WithDegreeOfParallelism(2) .WithMergeOptions(ParallelMergeOptions.NotBuffered) .Select(x => { Thread.Sleep(100); // Simulate some CPU-bound operation return x; }) .ToAsyncEnumerable() .Select(async x => { await Task.Delay(300); // Simulate some I/O operation return x; }) .AwaitResults(concurrencyLevel: 5) .Select(x => Task.Run(() => { Thread.Sleep(100); // Simulate another CPU-bound operation return x; })) .AwaitResults(concurrencyLevel: 2) .ToArrayAsync(); Console.WriteLine($"Results: {String.Join(", ", results)}"); 方法的实现。它基于用于控制并发级别的AwaitResults,以及用作异步队列的SemaphoreSlim。源SemaphoreSlim的枚举发生在即发即弃任务(馈送器)内部,该任务将热门任务推送到通道。它还将每个任务的续传附加到信号量释放的位置。

该方法的最后一部分是让步循环,在该循环中,任务从通道一个接一个地出队,然后顺序等待。这样,结果的生成顺序与源流中的任务相同。

此实现要求每个任务等待两次,这意味着它无法用于类型为Channel<Task<TResult>>的源,因为Channel<Task<TResult>> IAsyncEnumerable<Task<TResult>>

IAsyncEnumerable<ValueTask<TResult>>

一个重要的细节是最终屈服循环周围的try-finally块。对于方法的调用者过早地放弃结果流的枚举的情况,这是必需的。在那种情况下,源流的枚举也应该终止,并且此终止使用ValueTask向后传播。没有它,馈线任务将永远无法完成,对象将永远不会被垃圾回收,并且内存将被泄漏。

© www.soinside.com 2019 - 2024. All rights reserved.