尝试在 ConcurrentQueue 中出队

问题描述 投票:0回答:3

如果队列中没有项目,

TryDequeue
中的
ConcurrentQueue<T>
将返回 false。

如果队列为空,我需要我的队列将等待,直到新项目添加到队列中,并将新项目从队列中取出,并且该过程将像这样继续。

我应该使用

Monitor.Enter
Wait
Pulse
或 C# 4.0 中任何更好的选项吗?

c# concurrency producer-consumer concurrent-queue
3个回答
55
投票

这不是BlockingCollection的设计目的吗?

据我了解,您可以用其中之一包装 ConcurrentQueue,然后调用 Take


2
投票

您可以使用BlockingCollection

做类似的事情:

private BlockingCollection<string> rowsQueue;
private void ProcessFiles() {
   this.rowsQueue = new BlockingCollection<string>(new ConcurrentBag<string>(), 1000);
   ReadFiles(new List<string>() { "file1.txt", "file2.txt" });


   while (!this.rowsQueue.IsCompleted || this.rowsQueue.Count > 0)
   {
       string line = this.rowsQueue.Take();

       // Do something
   }
}

private Task ReadFiles(List<string> fileNames)
{
    Task task = new Task(() =>
    {
        Parallel.ForEach(
        fileNames,
        new ParallelOptions
        {
            MaxDegreeOfParallelism = 10
        },
            (fileName) =>
            {
                using (StreamReader sr = File.OpenText(fileName))
                {
                    string line = String.Empty;
                    while ((line = sr.ReadLine()) != null)
                    {
                           this.rowsQueue.Add(line);
                    }
                }
            });

        this.rowsQueue.CompleteAdding();
    });

    task.Start();

    return task;
}

-2
投票

您可以定期检查队列中的元素数量,当元素数量大于零时,您可以使用例如发出信号ManualResetEvent 到线程,该线程使元素出列,直到队列为空。

这是伪代码:

检查主题:

while(true)
{
  int QueueLength = 0;
  lock(Queue)
  {
    queueLength = Queue.Length;
  }

  if (Queue.Length > 0)
  {
    manualResetEvent.Set();
  }
  else
  {
    Thread.Sleep(...);
  }       
}    

出列线程:

while(true)
{
  if(manualResetEvent.WaitOne(timeout))
  {
    DequeueUntilQueueEmpty();
  }
}

也考虑在 DequeueUntilQueueEmpty 中使用锁。

© www.soinside.com 2019 - 2024. All rights reserved.