我可以在后台运行多个进程缓慢使多个任务可以并行运行?

问题描述 投票:4回答:5

我使用C#核心.NET 2.2框架的顶部写一个控制台应用程序。

我的应用程序可以让我来触发使用Windows任务调度长时间运行管理工作。

一个管理员工作使得网络的API调用它下载大量文件,其上传到Azure的Blob存储在他们面前。下面是我的代码需要执行的逻辑步骤来完成任务

  1. 调用哪个与每个消息表示一个文件的Mime消息响应远程API。
  2. 解析出MIME消息和每个消息转换成MemoryStream创建MemoryStream的集合

一旦我有多个1000+ MemoryStream的集合,我想写每个Stream到Azure的Blob存储。由于写入到远程存储是缓慢的,我希望我能执行每个写重复使用其自己的进程或线程。这将让我有potintially 1000+线程并行,而不必等待每个写入操作的结果的同时运行。每个线程负责日志记录在写入/上传过程中可能出现的任何错误。任何记录的错误将处理与使用不同的工作,所以我不担心重试。

我的理解是调用写入/载流异步将会做的代码。换句话说,我会说:“有个Stream执行它并运行,只要需要,我真的不关心结果,只要任务完成得到。”

测试时,我发现我的呼唤async的理解是有些无效。我的印象是,在调用该定义与async会在后台线程/工人,直到该过程完成得到执行的方法时。但是,我的理解,当我测试的代码失败。我的代码给我看,不添加await是从来没有真正执行async代码的关键字。与此同时,当添加关键字await,代码将等待,直到它继续之前的过程中执行完毕。换句话说,增加了对我的需要将击败调用方法异步的目的await

下面是一个剥离下来的我的代码版本解释什么,我试图完成的缘故

public async Task Run()
{
    // This gets populated after calling the web-API and parsing out the result
    List<Stream> files = new List<MemoryStream>{.....};

    foreach (Stream file in files)
    {
        // This code should get executed in the background without having to await the result
        await Upload(file);
    }
}

// This method is responsible of upload a stream to a storage and log error if any
private async Task Upload(Stream stream)
{
    try
    {
        await Storage.Create(file, GetUniqueName());
    } 
    catch(Exception e)
    {
        // Log any errors
    }
}

从上面的代码,调用await Upload(file);作品,将上传的文件如预期。然而,由于我打电话的await方法在使用Upload(),我的循环之前,不会上传代码完成跳转到下一个迭代。与此同时,取出await关键字,循环不会等待上传过程,但流实际上从未写入到存储,就好像我从来没有所谓的代码。

我怎么能并行执行多个Upload方法,让我有一个线程在后台每次上传运行?

c# multithreading asynchronous async-await task-parallel-library
5个回答
8
投票

转换列表,列表“上传”的任务,并等待他们所有Task.WhenAll()

public async Task Run()
{
    // This gets populated after calling the web-API and parsing out the result
    List<Stream> files = new List<MemoryStream>{.....};
    var tasks = files.Select(Upload);

    await Task.WhenAll(tasks);
}

this post有关任务的一些详细信息/等待。


4
投票

我希望我能执行每个写重复使用其自己的进程或线程。

这是不是真的做到这一点的最好办法。进程和线程的资源有限。你的限制因素正在等待在网络上执行的操作。

什么,你会想要做的就是这样的:

var tasks = new List<Task>(queue.Count);

while (queue.Count > 0)
{
  var myobject = Queue.Dequeue();
  var task = blockBlob.UploadFromByteArrayAsync(myobject.content, 0, myobject.content.Length);
  tasks.Add(task);
}
await Task.WhenAll(tasks);

这里我们只是创建任务一样快,我们可以,然后等待他们全部完成。我们只让.net框架照顾休息。

这里最重要的是,线程不提高的等待网络资源的速度。任务委托需要做出来的线程手中的东西让你有更多的线程做力所能及的方式(如启动新的上传,或对成品上传)。如果线程只是等待上载完成,这是浪费资源。


3
投票

您可能需要这样的:

var tasks = files.Select(Upload);
await Task.WhenAll(tasks);

不过请注意,你有什么可以使这一进程/停机是否会有太多的文件,它会产卵许多任务。见Have a set of Tasks with only X running at a time当n例如如何解决这个问题。


3
投票

其他答案都很好,但另一种方法是将您的TPL数据流提供的NuGet从https://www.nuget.org/packages/System.Threading.Tasks.Dataflow/

public static async Task DoWorkLoads(List<Something> results)
{
   var options = new ExecutionDataflowBlockOptions
                     {
                        MaxDegreeOfParallelism = 50
                     };

   var block = new ActionBlock<Something>(MyMethodAsync, options);

   foreach (var result in results)
      block.Post(result );

   block.Complete();
   await block.Completion;

}

...

public async Task MyMethodAsync(Something result)
{       
   //  Do async work here
}

数据流的优点

  1. 它是自然与async作品一样WhenAll基于任务的解决方案
  2. 它也可以接上水管,以任务的大管道 您可以通过管道回在重试错误。 添加任何预处理调用到前面块
  3. 您可以限制MaxDegreeOfParallelism如果节流是一个问题
  4. 您可以使用更复杂的管道,数据流的故名

0
投票

你可以将代码转换成Azure Function并将它让Azure的处理大多数的并行性,向外扩展的,并上传到Azure的Blob存储工作。

你可以使用HTTP触发器或服务总线的触发启动每个下载,处理和上传的任务。

© www.soinside.com 2019 - 2024. All rights reserved.