在托管服务中添加多个队列

问题描述 投票:0回答:1

我正在使用https://docs.microsoft.com/en-us/aspnet/core/fundamentals/host/hosted-services?view=aspnetcore-3.1&tabs=visual-studio示例实现队列。

这是我的代码的外观:

在startup.cs中,我正在添加托管服务和后台队列

services.AddHostedService<QueuedHostedService>(); services.AddSingleton<IBackgroundTaskQueue, BackgroundTaskQueue>();

然后,我实现如下范围服务,托管服务和后台队列:

namespace Services.Services {
  public class QueuedHostedService: BackgroundService {
    private readonly ILogger _logger;
    private readonly IServiceProvider _serviceProvider;
    public QueuedHostedService(IServiceProvider serviceProvider, IBackgroundTaskQueue taskQueue, ILoggerFactory loggerFactory) {
      _serviceProvider = serviceProvider;
      TaskQueue = taskQueue;
      _logger = loggerFactory.CreateLogger < QueuedHostedService > ();
    }
    public IBackgroundTaskQueue TaskQueue {
      get;
    }

    protected override async Task ExecuteAsync(CancellationToken cancellationToken) {
      while (!cancellationToken.IsCancellationRequested) {
        var workItem = await TaskQueue.DequeueAsync(cancellationToken);

        try {
          await workItem(cancellationToken);
        } catch (Exception ex) {

        }
      }
    }
  }
}


public interface IBackgroundTaskQueue {
  void QueueBackgroundWorkItem(Func < CancellationToken, Task > workItem);
  Task < Func < CancellationToken, Task >> DequeueAsync(CancellationToken cancellationToken);
}

namespace Services.Services {
  public class BackgroundTaskQueue: IBackgroundTaskQueue {
    private ConcurrentQueue < Func < CancellationToken, Task >> _workItems = new ConcurrentQueue < Func < CancellationToken, Task >> ();
    private SemaphoreSlim _signal = new SemaphoreSlim(0);
    public void QueueBackgroundWorkItem(Func < CancellationToken, Task > workItem) {
      if (workItem == null) {
        throw new ArgumentNullException(nameof(workItem));
      }
      _workItems.Enqueue(workItem);
      _signal.Release();
    }

    public async Task < Func < CancellationToken, Task >> DequeueAsync(CancellationToken cancellationToken) {
      await _signal.WaitAsync(cancellationToken);
      _workItems.TryDequeue(out
        var workItem);
      return workItem;
    }
  }
}


// scoped service
namespace Services.Services {
  public class ImportService: BaseService, IImportService {
    private readonly IFileProcessingService _scopedProcessingService;

    private readonly ConfigurationSettings _configurationSettings;
    public IBackgroundTaskQueue Queue {
      get;
    }
    private
    const string AZURE_BLOB_CONTAINER = "blobcontainer";

    public IServiceProvider Services {
      get;
    }

    public ImportService(IServiceProvider services, IBackgroundTaskQueue queue): base(services) {
      Services = services;
      _configurationSettings = services.GetService < ConfigurationSettings > ();
      _scopedProcessingService = services.GetProcessingService();
      Queue = queue;
    }

    // ---- Main file
    public async Task ImportFile(string filePath, long fileSize, int userId, FileFormatType fileFormat, TransactionsDataHeadersMap dataHeadersMap, string delimiter, string dateFormat) {
      await _scopedProcessingService.ImportFile(filePath, fileSize, userId, fileFormat, dataHeadersMap, delimiter, dateFormat);
    }

    public async Task UploadToBlobStorage(IFormFile file, int userId, TransactionalDataFileType type) {
      var fileFormat = GetFileFormat(file);
      var tempFilePath = await GetTemporaryPath(file);
      var fileName = userId.ToString() + "-" + DateTime.Now + "." + fileFormat;
      // ....... //

      ProcessFile(tempFilePath, fileFormat, file, type, userId);
    }

    private void ProcessFile(string tempFilePath, FileFormatType fileFormat, IFormFile file, Tyoe type, int userId) {
      var delimiter = ",";
      Queue.QueueBackgroundWorkItem(async token => {
        using(var scope = Services.CreateScope()) {
          var scopedProcessingService =
            scope.ServiceProvider
            .GetRequiredService < IFileProcessingService > ();

          // do the processing
          switch (type) {
            case "csv":
              await scopedProcessingService.ImportFile(tempFilePath, file.Length, userId, fileFormat, new Headers(), delimiter ? ? ",", "yyyy-MM-dd");
              break;
          }
        }
      });
    }
  }
}

我正在根据控制器中的请求添加元素。现在,我想添加另一个队列来处理其他请求。是否可以使用同一托管服务使用另一个队列?我很难找到如何做到这一点的例子。我应该只添加另一个作用域的servide和另一个后台队列吗?

c# asp.net-core .net-core queue backgroundworker
1个回答
1
投票
  1. 第一个选项是最简单的-您只需创建一堆类和接口QueuedHostedServiceAQueuedHostedServiceBIBackgroundTaskQueueA ..(可以使用继承来减少代码重复)

  2. 此外,您还可以引入“处理程序”的概念并使所有这些东西通用:

interface IHandler<T> { Task Handle(T msg, CancelationToken ...)}
interface IBackgroundMessageQueue<T> {...} // same impl but with T instead of Func<CancellationToken,Task>
class IBackgroundMessageQueue<T> {...} // same impl but with T instead of Func<CancellationToken,Task>

class QueuedHostedService<T>
{
   public QueuedHostedService(..., IBackgroundMessageQueue<T> queue, IHandler<T> h) {... }
   protected override async Task ExecuteAsync(CancellationToken cancellationToken) {
      while (!cancellationToken.IsCancellationRequested) {
        T message = await queue.DequeueAsync(cancellationToken);

        try {
            using(var scp = serviceProvider.CreateScope())
            {
                var handler = ServiceProvider.GetRequiredService<IHandler<T>>;
              await handler.Handle(message, cancellationToken);
            }
        } catch (Exception ex) {

        }
      }
    }
}

并且为每种消息类型创建自己的处理程序:

class ProcessFile(string tempFilePath, FileFormatType fileFormat, IFormFile file, Tyoe type, int userId){}

FileProcessor: IHandler<ProcessFile> {implement your logic from ImportService.ProcessFile}

然后您注册所有内容:

services.AddScoped<IHandler<ProcessFile>, FileProcessor>()
services.AddSingleton<IBackgroundTaskQueue<ProcessFile>, BackgroundTaskQueue<ProcessFile>>();

services.AddHostedService<QueuedHostedService<ProcessFile>>();

并且在您的ImportService中,您可以解决输入队列:

public ImportService(IBackgroundMessageQueue<ProcessFile> queue) 

并在需要时将消息放入其中。

© www.soinside.com 2019 - 2024. All rights reserved.