我有一个线程负责入队,一个线程负责出队。然而,数据入队的频率远远超过出队+处理数据所需的时间。 当我执行以下操作时,数据处理出现了巨大的延迟:
public void HandleData()
{
while (true)
{
try
{
if (Queue.Count > 0)
{
Queue.TryDequeue(out item);
ProcessData(item);
}
else
{
Thread.Sleep(10);
}
}
catch (Exception e)
{
//...
}
}
}
接下来我尝试在单独的任务中处理数据,但这最终影响了项目中的其他任务,因为这种处理最终占用了分配给应用程序的大部分资源并产生了很高的线程数。
public void HandleData()
{
while (true)
{
try
{
if (Queue.Count > 0)
{
Queue.TryDequeue(out item);
Task.Run(() => ProcessData(item));
}
else
{
Thread.Sleep(10);
}
}
catch (Exception e)
{
//
}
}
}
接下来,我尝试了以下方法:
public void HandleData()
{
List<Task> taskList = new List<Task>();
while (true)
{
try
{
if (Queue.Count > 0)
{
Queue.TryDequeue(out item);
if (taskList.Count <= 20)
{
Task t = Task.Run(() => ProcessData(item));
taskList.Add(t);
}
else
{
ProcessData(item);
}
}
else
{
Thread.Sleep(10);
}
taskList.RemoveAll(x => x.IsCompleted);
}
catch (Exception e)
{
//...
}
}
}
这似乎已经解决了问题,但我想知道是否有更干净的方法来做到这一点?出队时设置最大并发线程数的方法?
ConcurrentQueue
不是正确的容器,特别是因为它不提供异步操作。更好的选择是使用 ActionBlock 或 Channel 与 Parallel.ForEachAsync 结合使用。
使用 ActionBlock
ActionBlock 结合了输入队列和工作线程来异步处理数据。一旦数据可用,工作人员就会对其进行处理。使用 ActionBlock,您可以创建一个具有一定数量工作人员的块并开始向其发布数据。该块将仅使用配置数量的工作任务来处理数据:
ActionBlock<Data> _block;
public void Initialize()
{
var options=new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 20
}
_block =new ActionBlock(ProcessData,options);
}
使用 Post 或 SendAsync 方法将数据/消息发布到块。当没有更多数据时,Complete方法会告诉块在处理完任何待处理项目后关闭。我们可以通过等待 Completion 属性
来等待挂起的项目完成public async Task Produce(CancellationToken cancel)
{
while(!cancel.IsCancellationRequested)
{
var data=ProduceSomething();
_block.Post(data);
}
_block.Complete();
await _block.Completion;
}
使用频道
另一种选择是使用
Channel
而不是 ConcurrentQueue
。此类相当于一个异步 ConcurrentQueue,它提供了一个 IAsyncEnumerable<T>
流,可以使用 await foreach 进行迭代。您可以创建特定数量的工作人员以从容器本身或通过 IAsyncEnumerable<T>
流读取。在 .NET 6 中,最后一部分使用具有固定并行度选项的 Parallel.ForEachAsync
会容易得多:
async Task<ChannelReader<T>> Producer(CancellationToken token)
{
var channel = Channel.CreateUnbounded<T>();
var writer = channel.Writer;
while(!token.IsCancellationRequested)
{
var someDate = ProduceData();
await writer.WriteAsync(someData);
}
writer.Complete();
return channel.Reader;
}
async Task Consumer<T>(ChannelReader<T> input,int dop=20)
{
ParallelOptions parallelOptions = new()
{
MaxDegreeOfParallelism = dop
};
await Parallel.ForEachAsync(input.ReadAllAsync(), options,
data=>ProcessData(data));
}
更简洁的解决方案是对 TPL Dataflow
库使用
ActionBlock<T>
,该库是 .NET 标准库的一部分(从 .NET Core 开始)。该组件对其接收到的每个项目调用一个操作。它内部有自己的输入队列,默认情况下是无界的,并使用可配置的 MaxDegreeOfParallelism
处理项目,默认情况下为 1。以下是如何使用它。首先你创建块:
var block = new ActionBlock<Item>(item =>
{
ProcessData(item);
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded // Configurable
});
然后你可以在任何你喜欢的时候给它做工作:
block.Post(new Item());
最后,当您要终止程序时,将该块标记为已完成,然后
await
(或 Wait
)要完成的块以干净终止:
block.Complete();
await block.Completion;
注意: 如果
ProcessData
失败一次,ActionBlock<T>
将停止接受更多项目(Post
方法将返回 false
而不是 true
)。如果您希望它是防失败的,您应该 catch
并处理 ProcessData
内(或调用 Action<T>
的 ProcessData
内)的所有错误。