我创建了一个包含BlockingCollection的简单类。它表示将按照接收顺序执行的操作队列。我已经阅读了很多有关TPL的文章,似乎我应该使用它而不是目前使用的东西。原因之一是单元测试会更容易,并且编写的代码也会更少。我知道您可以使用Task.Factory.StartNew()等轻松地启动新任务,但不确定如何以与当前类相似的方式使用它。我如何使用TPL完成同一件事?
根据要求,这是我创建的类:
public class MyService
{
/// <summary>Queue of actions to be consumed on a separate thread</summary>
private BlockingCollection<MyObject> queue = new BlockingCollection<MyObject>();
public MyService()
{
StartService();
}
public void AddToQueue(MyObject newObject)
{
queue.Add(newObject);
}
private void StartService()
{
System.Threading.Tasks.Task.Factory.StartNew(() =>
{
while (true)
{
try
{
MyObject myObject = queue.Take(); // blocks until new object received
// Do work...
}
catch (Exception e)
{
// Log...
}
}
});
}
}
BlockingCollection
和异步集合系列创建于简单的生产者-消费者方案。 (例如,一个作家和多个读者)
确定-您可以使用Task.Run
构建几乎相同的对象,以将其添加,删除,清理等。将项目添加到List<T>
这样的非同步集合中,但随后您必须自己管理所有多线程问题(并且有很多问题。
例如:
public class MyService
{
/// <summary>Queue of actions to be consumed on a separate thread</summary>
private BlockingCollection<MyObject> queue = new BlockingCollection<MyObject>();
private IEnumerable<Task> readers = Enumerable.Range(0, 10).Select((t) => new Task(() => this.StartService()));
public MyService()
{
StartService();
readers.AsParallel().ForAll(t => t.Start());
}
public void AddToQueue(MyObject newObject)
{
queue.Add(newObject);
}
private void StartService()
{
while (true)
{
try
{
MyObject myObject = queue.Take(); // blocks until new object received
// Do work...
}
catch (Exception e)
{
// Log...
}
}
}
}
[您看到-同一收藏夹中有多个“阅读器”。如果您自己完成了BlockingCollection
,则应该处理集合中的所有lock
,等等。
旧式,阻止同步和基于任务的异步效果不太好。
Task.Run(() =>
{
while (true)
{
// some thing that sometimes blocks
}
});
只是一种奇特的写作方式
new Thread(() =>
{
while (true)
{
// some thing that sometimes blocks
}
});
这两个都将永远占据一个线程。第一个将使用线程池中的一个,这比专门创建的线程池要好,但是由于以后再也不会发布它,因此好处就消失了。
如果要使用任务和TPL并从中获得好处,则应尽可能避免任何阻碍因素。例如,您可以使用ConcurrentQueue
作为后备队列,并执行以下操作:
public class MyService
{
/// <summary>Queue of actions to be consumed by separate task</summary>
private ConcurrentQueue<MyObject> queue = new ConcurrentQueue<MyObject>();
private bool _isRunning = false;
private Task _consumingTask;
public MyService()
{
}
public void AddToQueue(MyObject newObject)
{
queue.Add(newObject);
}
private void StartService()
{
_isRunning = true;
Task.Run( async () =>
{
while (_isRunning )
{
MyObject myObject;
while(_isRunning && queue.TryDequeue(out myObject)
{
try
{
// Do work...
}
catch (Exception e)
{
// Log...
}
}
await Task.Delay(100); // tune this value to one pertinent to your use case
}
});
}
public void StopService()
{
_isRunning = false;
_consumingTask.Wait();
}
}
此实现永不阻塞,仅在确实需要计算时才占用线程。优雅地将其他Task
与它混合在一起也非常容易。
TLDR:如果采用Task
方式,请一直进行。中间点实际上不是您想要的位置,您将获得所有的复杂性,而没有任何优势。