我们正在使用 .NET Core Web Api,并寻找一种轻量级解决方案来将具有可变强度的请求记录到数据库中,但不希望客户端等待保存过程。
不幸的是,
HostingEnvironment.QueueBackgroundWorkItem(..)
中没有实现dnx
,并且Task.Run(..)
并不安全。正如@axelheer提到的IHostedService是进入.NET Core 2.0及以上版本的方法。
我需要一个轻量级的类似 ASP.NET Core 的 HostingEnvironment.QueueBackgroundWorkItem 替代品,所以我编写了 DalSoft.Hosting.BackgroundQueue,它使用 .NET Core 2.0 IHostedService。
PM> 安装包 DalSoft.Hosting.BackgroundQueue
在 ASP.NET Core Startup.cs 中:
public void ConfigureServices(IServiceCollection services)
{
services.AddBackgroundQueue(onException:exception =>
{
});
}
要对后台任务进行排队,只需将
BackgroundQueue
添加到控制器的构造函数中并调用 Enqueue
。
public EmailController(BackgroundQueue backgroundQueue)
{
_backgroundQueue = backgroundQueue;
}
[HttpPost, Route("/")]
public IActionResult SendEmail([FromBody]emailRequest)
{
_backgroundQueue.Enqueue(async cancellationToken =>
{
await _smtp.SendMailAsync(emailRequest.From, emailRequest.To, request.Body);
});
return Ok();
}
QueueBackgroundWorkItem
消失了,但我们有了 IApplicationLifetime
而不是 IRegisteredObject
,后者正在被前一个使用。我认为,对于这种情况,它看起来很有希望。
这个想法(我仍然不太确定,这是否是一个非常糟糕的想法;因此,要小心!)是注册一个单例,它会产生 并且 观察新任务。在该单例中,我们还可以注册一个“停止事件”,以便正确等待仍在运行的任务。
这个“概念”可以用于短期运行的东西,例如日志记录、邮件发送等。事情,应该不会花费太多时间,但会给当前请求带来不必要的延迟。
public class BackgroundPool
{
protected ILogger<BackgroundPool> Logger { get; }
public BackgroundPool(ILogger<BackgroundPool> logger, IApplicationLifetime lifetime)
{
if (logger == null)
throw new ArgumentNullException(nameof(logger));
if (lifetime == null)
throw new ArgumentNullException(nameof(lifetime));
lifetime.ApplicationStopped.Register(() =>
{
lock (currentTasksLock)
{
Task.WaitAll(currentTasks.ToArray());
}
logger.LogInformation(BackgroundEvents.Close, "Background pool closed.");
});
Logger = logger;
}
private readonly object currentTasksLock = new object();
private readonly List<Task> currentTasks = new List<Task>();
public void SendStuff(Stuff whatever)
{
var task = Task.Run(async () =>
{
Logger.LogInformation(BackgroundEvents.Send, "Sending stuff...");
try
{
// do THE stuff
Logger.LogInformation(BackgroundEvents.SendDone, "Send stuff returns.");
}
catch (Exception ex)
{
Logger.LogError(BackgroundEvents.SendFail, ex, "Send stuff failed.");
}
});
lock (currentTasksLock)
{
currentTasks.Add(task);
currentTasks.RemoveAll(t => t.IsCompleted);
}
}
}
这样的
BackgroundPool
应该注册为单例,并且可以通过 DI 被任何其他组件使用。我目前正在使用它发送邮件,效果很好(也在应用程序关闭期间测试了邮件发送)。
注意: 在后台任务中访问当前
HttpContext
之类的内容应该不起作用。 旧解决方案使用UnsafeQueueUserWorkItem
来禁止这种情况。
你觉得怎么样?
更新:
ASP.NET Core 2.0 为后台任务提供了新功能,而 ASP.NET Core 2.1 则变得更好:使用 IHostedService 和 BackgroundService 类在 .NET Core 2.x Web 应用或微服务中实现后台任务
您可以使用 Hangfire (http://hangfire.io/) 进行 .NET Core 中的后台作业。
例如:
var jobId = BackgroundJob.Enqueue(
() => Console.WriteLine("Fire-and-forget!"));
这是 Axel 的答案的调整版本,它可以让您传递委托并对已完成的任务进行更积极的清理。
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Logging;
namespace Example
{
public class BackgroundPool
{
private readonly ILogger<BackgroundPool> _logger;
private readonly IApplicationLifetime _lifetime;
private readonly object _currentTasksLock = new object();
private readonly List<Task> _currentTasks = new List<Task>();
public BackgroundPool(ILogger<BackgroundPool> logger, IApplicationLifetime lifetime)
{
if (logger == null)
throw new ArgumentNullException(nameof(logger));
if (lifetime == null)
throw new ArgumentNullException(nameof(lifetime));
_logger = logger;
_lifetime = lifetime;
_lifetime.ApplicationStopped.Register(() =>
{
lock (_currentTasksLock)
{
Task.WaitAll(_currentTasks.ToArray());
}
_logger.LogInformation("Background pool closed.");
});
}
public void QueueBackgroundWork(Action action)
{
#pragma warning disable 1998
async Task Wrapper() => action();
#pragma warning restore 1998
QueueBackgroundWork(Wrapper);
}
public void QueueBackgroundWork(Func<Task> func)
{
var task = Task.Run(async () =>
{
_logger.LogTrace("Queuing background work.");
try
{
await func();
_logger.LogTrace("Background work returns.");
}
catch (Exception ex)
{
_logger.LogError(ex.HResult, ex, "Background work failed.");
}
}, _lifetime.ApplicationStopped);
lock (_currentTasksLock)
{
_currentTasks.Add(task);
}
task.ContinueWith(CleanupOnComplete, _lifetime.ApplicationStopping);
}
private void CleanupOnComplete(Task oldTask)
{
lock (_currentTasksLock)
{
_currentTasks.Remove(oldTask);
}
}
}
}
我知道这有点晚了,但我们也遇到了这个问题。因此,在阅读了很多想法之后,这就是我们提出的解决方案。
/// <summary>
/// Defines a simple interface for scheduling background tasks. Useful for UnitTesting ASP.net code
/// </summary>
public interface ITaskScheduler
{
/// <summary>
/// Schedules a task which can run in the background, independent of any request.
/// </summary>
/// <param name="workItem">A unit of execution.</param>
[SecurityPermission(SecurityAction.LinkDemand, Unrestricted = true)]
void QueueBackgroundWorkItem(Action<CancellationToken> workItem);
/// <summary>
/// Schedules a task which can run in the background, independent of any request.
/// </summary>
/// <param name="workItem">A unit of execution.</param>
[SecurityPermission(SecurityAction.LinkDemand, Unrestricted = true)]
void QueueBackgroundWorkItem(Func<CancellationToken, Task> workItem);
}
public class BackgroundTaskScheduler : BackgroundService, ITaskScheduler
{
public BackgroundTaskScheduler(ILogger<BackgroundTaskScheduler> logger)
{
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogTrace("BackgroundTaskScheduler Service started.");
_stoppingToken = stoppingToken;
_isRunning = true;
try
{
await Task.Delay(-1, stoppingToken);
}
catch (TaskCanceledException)
{
}
finally
{
_isRunning = false;
_logger.LogTrace("BackgroundTaskScheduler Service stopped.");
}
}
public void QueueBackgroundWorkItem(Action<CancellationToken> workItem)
{
if (workItem == null)
{
throw new ArgumentNullException(nameof(workItem));
}
if (!_isRunning)
throw new Exception("BackgroundTaskScheduler is not running.");
_ = Task.Run(() => workItem(_stoppingToken), _stoppingToken);
}
public void QueueBackgroundWorkItem(Func<CancellationToken, Task> workItem)
{
if (workItem == null)
{
throw new ArgumentNullException(nameof(workItem));
}
if (!_isRunning)
throw new Exception("BackgroundTaskScheduler is not running.");
_ = Task.Run(async () =>
{
try
{
await workItem(_stoppingToken);
}
catch (Exception e)
{
_logger.LogError(e, "When executing background task.");
throw;
}
}, _stoppingToken);
}
private readonly ILogger _logger;
private volatile bool _isRunning;
private CancellationToken _stoppingToken;
}
ITaskScheduler
(我们已经在旧的 ASP.NET 客户端代码中定义用于 UTest 测试目的)允许客户端添加后台任务。 BackgroundTaskScheduler
的主要目的是捕获停止取消令牌(由主机拥有)并将其传递到所有后台Task
;根据定义,它在 System.Threading.ThreadPool
中运行,因此无需创建我们自己的。
要正确配置托管服务,请参阅这篇文章。
享受吧!
我使用了 Quartz.NET(不需要 SQL Server)和以下扩展方法来轻松设置和运行作业:
public static class QuartzUtils
{
public static async Task<JobKey> CreateSingleJob<JOB>(this IScheduler scheduler,
string jobName, object data) where JOB : IJob
{
var jm = new JobDataMap { { "data", data } };
var jobKey = new JobKey(jobName);
await scheduler.ScheduleJob(
JobBuilder.Create<JOB>()
.WithIdentity(jobKey)
.Build(),
TriggerBuilder.Create()
.WithIdentity(jobName)
.UsingJobData(jm)
.StartNow()
.Build());
return jobKey;
}
}
数据作为必须可序列化的对象传递。创建一个像这样处理作业的 IJob:
public class MyJobAsync :IJob
{
public async Task Execute(IJobExecutionContext context)
{
var data = (MyDataType)context.MergedJobDataMap["data"];
....
像这样执行:
await SchedulerInstance.CreateSingleJob<MyJobAsync>("JobTitle 123", myData);
Microsoft 最近发布了一篇文章 ASP.NET Core 中托管服务的后台任务,日期为 5/24/2023。示例排队后台任务与@Dalsoft的解决方案非常相似。
public interface IBackgroundTaskQueue
{
ValueTask QueueBackgroundWorkItemAsync(Func<CancellationToken, ValueTask> workItem);
ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(
CancellationToken cancellationToken);
}
public class BackgroundTaskQueue : IBackgroundTaskQueue
{
private readonly Channel<Func<CancellationToken, ValueTask>> _queue;
public BackgroundTaskQueue(int capacity)
{
// Capacity should be set based on the expected application load and
// number of concurrent threads accessing the queue.
// BoundedChannelFullMode.Wait will cause calls to WriteAsync() to return a task,
// which completes only when space became available. This leads to backpressure,
// in case too many publishers/calls start accumulating.
var options = new BoundedChannelOptions(capacity)
{
FullMode = BoundedChannelFullMode.Wait
};
_queue = Channel.CreateBounded<Func<CancellationToken, ValueTask>>(options);
}
public async ValueTask QueueBackgroundWorkItemAsync(
Func<CancellationToken, ValueTask> workItem)
{
if (workItem == null)
{
throw new ArgumentNullException(nameof(workItem));
}
await _queue.Writer.WriteAsync(workItem);
}
public async ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(
CancellationToken cancellationToken)
{
var workItem = await _queue.Reader.ReadAsync(cancellationToken);
return workItem;
}
}
public class QueuedHostedService : BackgroundService
{
private readonly ILogger<QueuedHostedService> _logger;
public QueuedHostedService(IBackgroundTaskQueue taskQueue,
ILogger<QueuedHostedService> logger)
{
TaskQueue = taskQueue;
_logger = logger;
}
public IBackgroundTaskQueue TaskQueue { get; }
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation(
$"Queued Hosted Service is running.{Environment.NewLine}" +
$"{Environment.NewLine}Tap W to add a work item to the " +
$"background queue.{Environment.NewLine}");
await BackgroundProcessing(stoppingToken);
}
private async Task BackgroundProcessing(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var workItem =
await TaskQueue.DequeueAsync(stoppingToken);
try
{
await workItem(stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex,
"Error occurred executing {WorkItem}.", nameof(workItem));
}
}
}
public override async Task StopAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Queued Hosted Service is stopping.");
await base.StopAsync(stoppingToken);
}
}
原来的
HostingEnvironment.QueueBackgroundWorkItem
是单线的,使用起来非常方便。
在 ASP Core 2.x 中执行此操作的“新”方法需要阅读大量神秘文档并编写大量代码。
为了避免这种情况,您可以使用以下替代方法
public static ConcurrentBag<Boolean> bs = new ConcurrentBag<Boolean>();
[HttpPost("/save")]
public async Task<IActionResult> SaveAsync(dynamic postData)
{
var id = (String)postData.id;
Task.Run(() =>
{
bs.Add(Create(id));
});
return new OkResult();
}
private Boolean Create(String id)
{
/// do work
return true;
}
静态
ConcurrentBag<Boolean> bs
将保存对该对象的引用,这将阻止垃圾收集器在控制器返回后收集任务。