我如何获得与TPL中的BlockingCollection类似的功能?

问题描述 投票:0回答:2

我创建了一个包含BlockingCollection的简单类。它表示将按照接收顺序执行的操作队列。我已经阅读了很多有关TPL的文章,似乎我应该使用它而不是目前使用的东西。原因之一是单元测试会更容易,并且编写的代码也会更少。我知道您可以使用Task.Factory.StartNew()等轻松地启动新任务,但不确定如何以与当前类相似的方式使用它。我如何使用TPL完成同一件事?

根据要求,这是我创建的类:

public class MyService
{
    /// <summary>Queue of actions to be consumed on a separate thread</summary>
    private BlockingCollection<MyObject> queue = new BlockingCollection<MyObject>();

    public MyService()
    {
        StartService();
    }

    public void AddToQueue(MyObject newObject)
    {
        queue.Add(newObject);
    }

    private void StartService()
    {
        System.Threading.Tasks.Task.Factory.StartNew(() =>
        {
            while (true)
            {
                try
                {
                    MyObject myObject = queue.Take(); // blocks until new object received

                    // Do work...
                }
                catch (Exception e)
                {
                    // Log...
                }
            }
        });
    }
}
c# task-parallel-library
2个回答
0
投票

BlockingCollection和异步集合系列创建于简单的生产者-消费者方案。 (例如,一个作家和多个读者)

确定-您可以使用Task.Run构建几乎相同的对象,以将其添加,删除,清理等。将项目添加到List<T>这样的非同步集合中,但随后您必须自己管理所有多线程问题(并且有很多问题。

例如:

public class MyService
{
    /// <summary>Queue of actions to be consumed on a separate thread</summary>
    private BlockingCollection<MyObject> queue = new BlockingCollection<MyObject>();
    private IEnumerable<Task> readers = Enumerable.Range(0, 10).Select((t) => new Task(() => this.StartService()));

public MyService()
{
    StartService();
    readers.AsParallel().ForAll(t => t.Start());
}

public void AddToQueue(MyObject newObject)
{
    queue.Add(newObject);
}

private void StartService()
{
        while (true)
        {
            try
            {
                MyObject myObject = queue.Take(); // blocks until new object received

                // Do work...
            }
            catch (Exception e)
            {
                // Log...
            }
        }
    }
}

[您看到-同一收藏夹中有多个“阅读器”。如果您自己完成了BlockingCollection,则应该处理集合中的所有lock,等等。


0
投票

旧式,阻止同步和基于任务的异步效果不太好。

Task.Run(() =>
    {
        while (true)
        {
            // some thing that sometimes blocks
        }
    });

只是一种奇特的写作方式

new Thread(() =>
   {
      while (true)
      {
          // some thing that sometimes blocks
      }
});

这两个都将永远占据一个线程。第一个将使用线程池中的一个,这比专门创建的线程池要好,但是由于以后再也不会发布它,因此好处就消失了。

如果要使用任务和TPL并从中获得好处,则应尽可能避免任何阻碍因素。例如,您可以使用ConcurrentQueue作为后备队列,并执行以下操作:

public class MyService
{
    /// <summary>Queue of actions to be consumed by separate task</summary>
    private ConcurrentQueue<MyObject> queue = new ConcurrentQueue<MyObject>();

    private bool _isRunning = false;
    private Task _consumingTask;

    public MyService()
    {
    }

    public void AddToQueue(MyObject newObject)
    {
        queue.Add(newObject);
    }

    private void StartService()
    {
        _isRunning = true;
        Task.Run( async () =>
        {
            while (_isRunning )
            {
                MyObject myObject;

                while(_isRunning && queue.TryDequeue(out myObject)
                {
                    try
                    {
                        // Do work...
                    }
                    catch (Exception e)
                    {
                        // Log...
                    }
                }
                await Task.Delay(100); // tune this value to one pertinent to your use case 
            }
        });
    }

    public void StopService()
    {
        _isRunning = false;
        _consumingTask.Wait();
    }
}

此实现永不阻塞,仅在确实需要计算时才占用线程。优雅地将其他Task与它混合在一起也非常容易。

TLDR:如果采用Task方式,请一直进行。中间点实际上不是您想要的位置,您将获得所有的复杂性,而没有任何优势。

© www.soinside.com 2019 - 2024. All rights reserved.