我有一个FileSystemWatcher,用于查找文件夹中的文件。创建文件的时刻触发了Created事件。我将每个创建的文件名添加到队列中。
void Main(){
FileSystemWatcher fsw = new FileSystemWatcher();
fsw.Path = System.Configuration.ConfigurationManager.AppSettings["PathToDataFolder"];
//fsw.Filter = "*.docx";
fsw.EnableRaisingEvents = true;
fsw.IncludeSubdirectories = true;
fsw.Created += new FileSystemEventHandler(fsw_Created);
}
private void fsw_Created(object sender, FileSystemEventArgs e)
{
queue.Enqueue(e.FullPath);
}
超时文件将增加,队列将变大。所以队列是动态的。我想并行处理每个文件。但是我不想一次处理很多文件,因为这是非常耗费资源的。处理完文件后,我想将其出列并选择另一个文件进行处理。
我怎样才能在C#中实现这一目标?
您可以使用具有可配置的并行度的ActionBlock。默认情况下,ActionBlock仅使用一个Task来处理传入的消息。您可以使用multiple tasks并行处理文件。 FSW的事件应该直接将路径发布到块:
ActionBlock<string> _block;
void Main()
{
...
var options= new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4
};
_block=new ActionBlock<string>(path=>MyPathProcessingFunction(path), options);
//Configure the FSW as before
}
private void fsw_Created(object sender, FileSystemEventArgs e)
{
_block.Post(e.FullPath);
}
您可以通过使用生产者/消费者模式来实现这一目标。在.Net中,BlockingCollection类为此模式提供支持。每当触发事件处理程序时,它都会将路径添加到队列中,并创建一个新任务来处理队列。因此,对于每个受监视文件,都会创建一个新任务您可以根据需要更改任务创建策略,还可以使用TaskScheduler管理它们的安排方式。
public class Watcher
{
public Watcher()
{
_queue = new BlockingCollection<string>();
}
private BlockingCollection<string> _queue;
public void Start()
{
FileSystemWatcher fsw = new FileSystemWatcher();
fsw.Path = @"F:\a";
fsw.EnableRaisingEvents = true;
fsw.IncludeSubdirectories = true;
fsw.Created += Fsw_Created;
}
private void Fsw_Created(object sender, FileSystemEventArgs e)
{
_queue.Add(e.FullPath);
Task.Factory.StartNew(() =>
{
var path = _queue.Take();
// process the queue here
});
}
}
您正在寻找生产者/消费者模式,在C#中可以通过BlockingCollection实现,例如
private static async Task Perform() {
// Be careful with this parameter: what do you expect the system
// to do if the pipeline contains pipelineMaxLength items?
int pipelineMaxLength = 100;
int consumersCount = 10;
using (BlockingCollection<string> pipeline =
new BlockingCollection<string>(pipelineMaxLength)) {
// Producer(s)
using (FileSystemWatcher fsw = new FileSystemWatcher()) {
...
fsw.Created += (s, e) => {
// whenever new file has been created, add it to the pipeline
if (!pipeline.IsAddingCompleted)
pipeline.Add(e.FullPath);
// Whenever you have no files to add and you want quit processing call
// pipeline.CompleteAdding();
};
// Consumers (consumersCount of them are working in parallel)
var consumers = Enumerable
.Range(0, consumersCount) //
.Select(index => Task.Run(() => {
// each consumer extracts file from the pipeline and processes it
foreach (var file in pipeline.GetConsumingEnumerable()) {
//TODO: process the file here
} }))
.ToArray();
// (a)wait until all the consumers finish their work
await Task
.WhenAll(consumers);
}
}
}