Task.Run 不适用于作业调度程序服务

问题描述 投票:0回答:2

我有一个作业调度程序(Windows 服务),它每隔几分钟运行一次,并尝试为长时间运行的任务执行异步作业。当多个作业运行时,我确实看到 CPU 使用率很高。我是否需要避免使用 Task.Run 并以不同的方式处理它?

 public void Run()
    {
        var queueManager = new QueueManager();
        foreach (var taskId in queueManager.GetTaskIdsToProcess())
        {
            Task.Run(async () => InvokeProcess(taskId));
        }            
    }

    private void InvokeProcess(long taskId)
    {
        try
        {
            //Long Running process code
        }
        catch (Exception ex)
        {
            //update queue failed
        }
    }
c# .net job-scheduling
2个回答
0
投票

我是否需要避免使用

Task.Run
并以不同的方式处理它?

不,

Task.Run
很好。 CPU 峰值的原因是您分配给
Task.Run
的工作受 CPU 限制,至少部分受限。使用
Task.Run
本身的开销很小。你可以每秒启动一百万个
Task.Run
s,CPU甚至不会注意到(假设每个
Task.Run
的工作为零)。如果你想降低 CPU 使用率,你基本上有两个选择:

  1. 减少在任何给定时刻同时运行的任务数。
  2. 优化每个任务执行的工作。

这两个选项都不重要。您可以优化调度或算法的范围取决于您对任务并行库、.NET API 和一般数据结构的熟悉程度和专业知识。


0
投票

我认为它本身不是

Task.Run
,但是
InvokeProcess
是 CPU 密集型的,或者您的代码可能会导致在线程池上同时调度相当多的任务,而线程池又可以调度到
ThreadPool.GetAvailableThreads(out var workerThreads, out var _) 
线程,因此 CPU 负载高,或两者兼而有之。

通常的方法是有某种并发队列和有限数量的

Task
/
Thread
处理它。例如,一个简单的看起来像:

class Processor
{
    private readonly CancellationToken _stoppingToken;
    private DefaultBackgroundTaskQueue Queue = new(1000);
    private Task[] _Processors;

    public Processor(CancellationToken stoppingToken)
    {
        _stoppingToken = stoppingToken;
        var instances = 8;
        _Processors = Enumerable.Range(0, instances)
            .Select(_ => Task.Run(async () =>
            {
                while (!stoppingToken.IsCancellationRequested)
                {
                    var taskId = Queue.DequeueAsync(stoppingToken);
                    try
                    {
                        // process taskId - InvokeProcess(taskId)
                    }
                    catch (Exception e)
                    {
                        Console.WriteLine(e); // log
                    }

                }
            }))
            .ToArray();
    }
    public async Task Run()
    {
        var queueManager = new QueueManager();
        foreach (var taskId in queueManager.GetTaskIdsToProcess())
        {
            await Queue.QueueBackgroundWorkItemAsync(taskId);
        }            
    }

    private void InvokeProcess(long taskId)
    {
        try
        {
            //Long Running process code
        }
        catch (Exception ex)
        {
            //update queue failed
        }
    }
}

public sealed class DefaultBackgroundTaskQueue 
{
    private readonly Channel<long> _queue;

    public DefaultBackgroundTaskQueue(int capacity)
    {
        BoundedChannelOptions options = new(capacity)
        {
            FullMode = BoundedChannelFullMode.Wait
        };
        _queue = Channel.CreateBounded<long>(options);
    }

    public async ValueTask QueueBackgroundWorkItemAsync(long workItem)
    {
        await _queue.Writer.WriteAsync(workItem);
    }

    public async ValueTask<long> DequeueAsync(CancellationToken cancellationToken)
    {
        var workItem = await _queue.Reader.ReadAsync(cancellationToken);

        return workItem;
    }
}

另请查看:

© www.soinside.com 2019 - 2024. All rights reserved.