我有一个作业调度程序(Windows 服务),它每隔几分钟运行一次,并尝试为长时间运行的任务执行异步作业。当多个作业运行时,我确实看到 CPU 使用率很高。我是否需要避免使用 Task.Run 并以不同的方式处理它?
public void Run()
{
var queueManager = new QueueManager();
foreach (var taskId in queueManager.GetTaskIdsToProcess())
{
Task.Run(async () => InvokeProcess(taskId));
}
}
private void InvokeProcess(long taskId)
{
try
{
//Long Running process code
}
catch (Exception ex)
{
//update queue failed
}
}
我是否需要避免使用
并以不同的方式处理它?Task.Run
Task.Run
很好。 CPU 峰值的原因是您分配给 Task.Run
的工作受 CPU 限制,至少部分受限。使用 Task.Run
本身的开销很小。你可以每秒启动一百万个Task.Run
s,CPU甚至不会注意到(假设每个Task.Run
的工作为零)。如果你想降低 CPU 使用率,你基本上有两个选择:
这两个选项都不重要。您可以优化调度或算法的范围取决于您对任务并行库、.NET API 和一般数据结构的熟悉程度和专业知识。
我认为它本身不是
Task.Run
,但是InvokeProcess
是 CPU 密集型的,或者您的代码可能会导致在线程池上同时调度相当多的任务,而线程池又可以调度到 ThreadPool.GetAvailableThreads(out var workerThreads, out var _)
线程,因此 CPU 负载高,或两者兼而有之。
通常的方法是有某种并发队列和有限数量的
Task
/Thread
处理它。例如,一个简单的看起来像:
class Processor
{
private readonly CancellationToken _stoppingToken;
private DefaultBackgroundTaskQueue Queue = new(1000);
private Task[] _Processors;
public Processor(CancellationToken stoppingToken)
{
_stoppingToken = stoppingToken;
var instances = 8;
_Processors = Enumerable.Range(0, instances)
.Select(_ => Task.Run(async () =>
{
while (!stoppingToken.IsCancellationRequested)
{
var taskId = Queue.DequeueAsync(stoppingToken);
try
{
// process taskId - InvokeProcess(taskId)
}
catch (Exception e)
{
Console.WriteLine(e); // log
}
}
}))
.ToArray();
}
public async Task Run()
{
var queueManager = new QueueManager();
foreach (var taskId in queueManager.GetTaskIdsToProcess())
{
await Queue.QueueBackgroundWorkItemAsync(taskId);
}
}
private void InvokeProcess(long taskId)
{
try
{
//Long Running process code
}
catch (Exception ex)
{
//update queue failed
}
}
}
public sealed class DefaultBackgroundTaskQueue
{
private readonly Channel<long> _queue;
public DefaultBackgroundTaskQueue(int capacity)
{
BoundedChannelOptions options = new(capacity)
{
FullMode = BoundedChannelFullMode.Wait
};
_queue = Channel.CreateBounded<long>(options);
}
public async ValueTask QueueBackgroundWorkItemAsync(long workItem)
{
await _queue.Writer.WriteAsync(workItem);
}
public async ValueTask<long> DequeueAsync(CancellationToken cancellationToken)
{
var workItem = await _queue.Reader.ReadAsync(cancellationToken);
return workItem;
}
}
另请查看: