并行处理项目

问题描述 投票:0回答:3

我有一个FileSystemWatcher,用于查找文件夹中的文件。创建文件的时刻触发了Created事件。我将每个创建的文件名添加到队列中。

void Main(){
            FileSystemWatcher fsw = new FileSystemWatcher();
            fsw.Path = System.Configuration.ConfigurationManager.AppSettings["PathToDataFolder"];
            //fsw.Filter = "*.docx";
            fsw.EnableRaisingEvents = true;
            fsw.IncludeSubdirectories = true;
            fsw.Created += new FileSystemEventHandler(fsw_Created);
}
private void fsw_Created(object sender, FileSystemEventArgs e)
{
            queue.Enqueue(e.FullPath);    
}

超时文件将增加,队列将变大。所以队列是动态的。我想并行处理每个文件。但是我不想一次处理很多文件,因为这是非常耗费资源的。处理完文件后,我想将其出列并选择另一个文件进行处理。

我怎样才能在C#中实现这一目标?

c# multithreading
3个回答
3
投票

您可以使用具有可配置的并行度的ActionBlock。默认情况下,ActionBlock仅使用一个Task来处理传入的消息。您可以使用multiple tasks并行处理文件。 FSW的事件应该直接将路径发布到块:

ActionBlock<string> _block;

void Main()
{

    ...
    var options= new ExecutionDataflowBlockOptions
     {
        MaxDegreeOfParallelism = 4
     };

    _block=new ActionBlock<string>(path=>MyPathProcessingFunction(path), options);

   //Configure the FSW as before
}

private void fsw_Created(object sender, FileSystemEventArgs e)
{
    _block.Post(e.FullPath);
}

1
投票

您可以通过使用生产者/消费者模式来实现这一目标。在.Net中,BlockingCollection类为此模式提供支持。每当触发事件处理程序时,它都会将路径添加到队列中,并创建一个新任务来处理队列。因此,对于每个受监视文件,都会创建一个新任务您可以根据需要更改任务创建策略,还可以使用TaskScheduler管理它们的安排方式。

public class Watcher
{
    public Watcher()
    {
        _queue = new BlockingCollection<string>();
    }

    private BlockingCollection<string> _queue;

    public void Start()
    {
        FileSystemWatcher fsw = new FileSystemWatcher();
        fsw.Path = @"F:\a";
        fsw.EnableRaisingEvents = true;
        fsw.IncludeSubdirectories = true;
        fsw.Created += Fsw_Created;
    }

    private void Fsw_Created(object sender, FileSystemEventArgs e)
    {
        _queue.Add(e.FullPath);

        Task.Factory.StartNew(() =>
        {
            var path = _queue.Take();
            // process the queue here
        });
    }
}

1
投票

您正在寻找生产者/消费者模式,在C#中可以通过BlockingCollection实现,例如

 private static async Task Perform() {
   // Be careful with this parameter: what do you expect the system
   // to do if the pipeline contains pipelineMaxLength items?
   int pipelineMaxLength = 100;
   int consumersCount = 10;

   using (BlockingCollection<string> pipeline = 
     new BlockingCollection<string>(pipelineMaxLength)) {

     // Producer(s)
     using (FileSystemWatcher fsw = new FileSystemWatcher()) {
       ...
       fsw.Created += (s, e) => {
         // whenever new file has been created, add it to the pipeline 
         if (!pipeline.IsAddingCompleted)
           pipeline.Add(e.FullPath);

         // Whenever you have no files to add and you want quit processing call
         // pipeline.CompleteAdding();
       };

       // Consumers (consumersCount of them are working in parallel)
       var consumers = Enumerable
        .Range(0, consumersCount) // 
        .Select(index => Task.Run(() => {
           // each consumer extracts file from the pipeline and processes it
           foreach (var file in pipeline.GetConsumingEnumerable()) {
             //TODO: process the file here 
           } }))
        .ToArray();

       // (a)wait until all the consumers finish their work
       await Task
         .WhenAll(consumers);
     }
   }
 }
© www.soinside.com 2019 - 2024. All rights reserved.