在.net core的后台服务中并行处理多个队列

问题描述 投票:1回答:1

我和一些人合作 WIFI 相机等设备。

我实现的基本家伙。

  1. 有人按了一个按钮
  2. 这个按钮调用我的 Web API 端点。
  3. 我的 Web API 端点调用其中一个 API's 摄像机的 HttpRequest).
  4. 处理每个请求需要5秒。而每个请求之间应该有1秒的延迟。例如,如果你按了2次按钮,每次之后都有1秒的延迟。首先,我们希望5秒钟处理第一次按下的请求,然后是一秒钟的延迟,最后我们希望5秒钟处理最后一次请求(第二次按下)。

为了做到这一点,我使用了 Queued background tasks 基于 Fire and Forgot 方式 .NetCore 3.1 项目中,当我只处理一台摄像机时,它工作得很好,但项目的新要求是,后台任务应该处理多台摄像机。

但是项目的新要求是,后台任务应该处理多台摄像机。这意味着每个摄像机都有一个队列,而且队列应该基于我上面描述的同事并行工作。

例如,如果我们有2台设备 camera-001camera-002 和2个连接按钮 btn-cam-001btn-cam-002而按压的顺序(每次按压后延时0.5秒):2X btn-cam-001和1X btn-cam-002。

实际情况是 FIFO. 第一,要求 btn-cam-001 将被处理,然后 btn-cam-002.

我期待和需要什么。Camera-002 不应该等着收到请求和第一个请求后才去找这两台摄像机。001002 在同一时间处理(基于exmaple)。就像每台摄像机都有自己的队列和自己的进程一样,问题是在.NetCore 3.1中如何实现?

问题是如何在.NetCore 3.1中实现这个目标?

我目前的后台服务。

public class QueuedHostedService : BackgroundService
{
    public IBackgroundTaskQueue TaskQueue { get; }

    private readonly ILogger _logger;

    public QueuedHostedService(IBackgroundTaskQueue taskQueue, ILoggerFactory loggerFactory)
    {
        TaskQueue = taskQueue;
        _logger = loggerFactory.CreateLogger<QueuedHostedService>();
    }

    protected override async Task ExecuteAsync(CancellationToken cancellationToken)
    {
        _logger.LogInformation("Queued Hosted Service is starting.");

        while (!cancellationToken.IsCancellationRequested)
        {
            var workItem = await TaskQueue.DequeueAsync(cancellationToken);

            try
            {
                await workItem(cancellationToken);
            }
            catch (Exception exception)
            {
                _logger.LogError(exception, $"Error occurred executing {nameof(workItem)}.");
            }
        }

        _logger.LogInformation("Queued Hosted Service is stopping.");
    }
}

和当前的BackgroundTaskQueue。

public class BackgroundTaskQueue : IBackgroundTaskQueue
{
    private readonly SemaphoreSlim _signal = new SemaphoreSlim(0);

    private readonly ConcurrentQueue<Func<CancellationToken, Task>> _workItems =
        new ConcurrentQueue<Func<CancellationToken, Task>>();

    public void QueueBackgroundWorkItem(Func<CancellationToken, Task> workItem)
    {
        if (workItem is null)
        {
            throw new ArgumentNullException(nameof(workItem));
        }

        _workItems.Enqueue(workItem);

        _signal.Release();
    }

    public async Task<Func<CancellationToken, Task>> DequeueAsync(CancellationToken cancellationToken)
    {
        await _signal.WaitAsync(cancellationToken);

        _workItems.TryDequeue(out var workItem);

        return workItem;
    }
}

我当前的终端:

  [HttpPost("hit")]
    public ActionResult TurnOnAsync([FromBody] HitRequest request, CancellationToken cancellationToken = default)
    {
        try
        {
            var camera = ConfigurationHelper.GetAndValidateCamera(request.Device, _configuration);

            _taskQueue.QueueBackgroundWorkItem(async x =>
                {
                    await _cameraRelayService.TurnOnAsync(request.Device, cancellationToken);

                    Thread.Sleep(TimeSpan.FromSeconds(1));

                });

            return Ok();
        }
        catch (Exception exception)
        {
            _logger.LogError(exception, "Error when truning on the lamp {DeviceName}.", request.Device);

            return StatusCode(StatusCodes.Status500InternalServerError, exception.Message);
        }
    }
c# .net-core parallel-processing queue background-service
1个回答
1
投票

而不是一个单一的 BackgroundTaskQueue 你可以为每台摄像机设置一个队列。你可以将队列存储在字典中,并将摄像机作为键。

public IDictionary<IDevice, IBackgroundTaskQueue> TaskQueues { get; }

然后在你的端点中使用与请求的摄像机相关的队列。

_taskQueues[camera].QueueBackgroundWorkItem(async x =>
© www.soinside.com 2019 - 2024. All rights reserved.