在将 Task[] 任务转换为 IEnumerable 时遇到问题<Task<T>>

问题描述 投票:0回答:1

Error Task[] tasks converison IEnumerable>

//await Task.WhenAll(WithMaxConcurrency(tasks,3)); // getting error at tasks conversion
 var tasks = new Func<Task>[]
        {
            () => jack(),
            () => roy(),
            () => sam()
        };

        await Task.WhenAll(WithMaxConcurrency(tasks,3)); // getting error at tasks conversion
private static IEnumerable<Task<T>> WithMaxConcurrency<T>(IEnumerable<Task<T>> tasks, int maxParallelism)
    {
        SemaphoreSlim maxOperations = new SemaphoreSlim(maxParallelism);
        // The original tasks get wrapped in a new task that must first await a semaphore before the original task is called.
        return tasks.Select(task => maxOperations.WaitAsync().ContinueWith(_ =>
        {
            try { return task; }
            finally { maxOperations.Release(); }
        }).Unwrap());
    }

想要Task[]任务进入IEnumerable>

c# .net asp.net-core asynchronous async-await
1个回答
0
投票

问题似乎是如何限制任务的执行。目前尚不清楚这些任务的作用。 .NET 具有高级功能,允许在管道中并发处理大量数据。一种选择是使用

Parallel.ForEachAsync
使用特定数量的工作人员处理数据流

使用 Parallel.ForEachAsync

这个例子展示了如何检索 Github 用户 bios,一次 3 个:

var userHandlers = new []
{
    "users/okyrylchuk",
    "users/shanselman",
    "users/jaredpar",
    "users/davidfowl"
};
 

ParallelOptions parallelOptions = new()
{
    MaxDegreeOfParallelism = 3
};
 
await Parallel.ForEachAsync(userHandlers, parallelOptions, async (uri, token) =>
{
    var user = await client.GetFromJsonAsync<GitHubUser>(uri, token);

    Console.WriteLine($"Name: {user.Name}\nBio: {user.Bio}\n");
});

这也可用于处理一组

Func<Task>
调用,充当作业队列。不过,这不如处理输入流有用:

var funcs= new Func<Task>[]
        {
            () => jack(),
            () => roy(),
            () => sam()
        };
 

ParallelOptions parallelOptions = new()
{
    MaxDegreeOfParallelism = 3
};
 
await Parallel.ForEachAsync(funcs, parallelOptions, async (func, _) =>
{
    await func();
});

Parallel.ForEachAsync
不返回任何结果。代表必须将它们存储在 ConcurrentQueue 中:

var results=ConcurrentQueue<Whatever>();

await Parallel.ForEachAsync(funcs, parallelOptions, async (func, _) =>
{
    var result=await func();
    results.Enqueue(result);
});

使用数据流块

另一种选择是使用数据流块,例如 ActionBlock 来处理具有固定 DOP 的数据/调用:

var dop=new ExecutionDataflowBlockOptions
         {
            MaxDegreeOfParallelism = 3
         };

var downloader=new ActionBlock<string>(async uri=>{
    var user = await client.GetFromJsonAsync<GitHubUser>(uri);
    Console.WriteLine($"Name: {user.Name}\nBio: {user.Bio}\n");
},dop);

foreach(var uri in userHandlers)
{
    await downloader.SendAsync(uri);
}
downloader.Complete();

await downloader.Completion;

块可以返回结果并组合成处理步骤的管道:

var downloader=new TransformBlock<Uri,FileInfo>(DownloadToCsv,dop);
var importer=new ActionBlock<FileInfo>(Importer);

downloader.LinkTo(importer,new DataflowLinkOptions {PropagateCompletion=true});


foreach(var uri in fileUris)
{
    await downloader.SendAsync(uri);
}
downloader.Complete();

await importer.Completion;

在这种情况下,

downloader
块一次检索3个文件并将
FileInfo
对象发送到
Importer
块。该块一次将一个文件导入数据库。所有块同时工作,同时下载和导入数据。

当所有文件都被请求时,

downloader.Complete()
告诉 head 块我们完成了。之后,我们等待尾块处理所有待处理的工作
await importer.Completion;

数据流块也可以用作异步工作队列,但这不如正确使用它们灵活:

var queue=new TransformBlock<Func<Task<Whatever>>,Whatever>(func=>func(),dop);
var buffer=new BufferBlock<Whatever>();

queue.LinkTo(buffer);


foreach(var func in funcs)
{
    await queue.SendAsync(func);
}
queue.Complete();
await queue.Completion;

if (buffer.TryReceiveAll(out var results))
{
   //Use the results
}

© www.soinside.com 2019 - 2024. All rights reserved.