我正在设计一个.net核心web api,它消耗了一个我无法控制的外部api。我在堆栈溢出时找到了一些很好的答案,这使得我可以在使用semaphoreslim的同一个线程中限制我对这个外部API的请求。我想知道如何最好地将此限制扩展为应用程序范围而不是仅限制特定的任务列表。我一直在学习HttpMessageHandlers,这似乎是拦截所有传出消息和应用限制的可能方法。但我担心线程安全和锁定问题,我可能不明白。我包含了我当前的限制代码,并希望这可能有助于理解我正在尝试做什么,但是跨多个线程,并且不断添加任务而不是预定义的任务列表。
private static async Task<List<iMISPagedResultResponse>> GetAsyncThrottled(List<int> pages, int throttle, IiMISClient client, string url, int limit)
{
var rtn = new List<PagedResultResponse>();
var allTasks = new List<Task>();
var throttler = new SemaphoreSlim(initialCount: throttle);
foreach (var page in pages)
{
await throttler.WaitAsync();
allTasks.Add(
Task.Run(async () =>
{
try
{
var result = await GetPagedResult(client, url, page);
return result;
}
finally
{
throttler.Release();
}
}));
}
await Task.WhenAll(allTasks);
foreach (var task in allTasks)
{
var result = ((Task<PagedResultResponse>)task).Result;
rtn.Add(result);
}
return rtn;
}
SemaphoreSlim
是线程安全的,因此没有线程安全或锁定关注将其用作跨多个线程的并行性限制。HttpMessageHandler
s确实是一个outbound middleware mechanism to intercept calls placed through HttpClient
。所以它们是使用SemaphoreSlim
对Http调用应用并行性限制的理想方法。所以ThrottlingDelegatingHandler
可能看起来像这样:
public class ThrottlingDelegatingHandler : DelegatingHandler
{
private SemaphoreSlim _throttler;
public ThrottlingDelegatingHandler(SemaphoreSlim throttler)
{
_throttler = throttler ?? throw new ArgumentNullException(nameof(throttler));
}
protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
{
if (request == null) throw new ArgumentNullException(nameof(request));
await _throttler.WaitAsync(cancellationToken);
try
{
return await base.SendAsync(request, cancellationToken);
}
finally
{
_throttler.Release();
}
}
}
以单例形式创建和维护实例:
int maxParallelism = 10;
var throttle = new ThrottlingDelegatingHandler(new SemaphoreSlim(maxParallelism));
将DelegatingHandler
应用于您希望并行调节调用的所有HttpClient
实例:
HttpClient throttledClient = new HttpClient(throttle);
那HttpClient
不需要是一个单身人士:只有throttle
实例。
为简洁起见,我省略了Dot Net Core DI代码,但是您将使用.Net Core容器注册单例ThrottlingDelegatingHandler
实例,在使用点通过DI获取该单例,并在如上所示构造的HttpClient
s中使用它。
但:
上面仍然提出了如何管理HttpClient
生命周期的问题:
HttpClient
s do not pick up DNS updates。您的应用程序将无知DNS更新,除非您杀死并重新启动它(可能不合需要)。using (HttpClient client = ) { }
,另一方面,can cause socket exhaustion。HttpClientFactory
的设计目标之一是管理HttpClient
实例及其委托处理程序的生命周期,以避免这些问题。
在.NET Core 2.1中,您可以使用HttpClientFactory
将其连接到ConfigureServices(IServiceCollection services)
类的Startup
中,如下所示:
int maxParallelism = 10;
services.AddSingleton<ThrottlingDelegatingHandler>(new ThrottlingDelegatingHandler(new SemaphoreSlim(maxParallelism)));
services.AddHttpClient("MyThrottledClient")
.AddHttpMessageHandler<ThrottlingDelegatingHandler>();
(“MyThrottledClient”这里是一个named-client approach只是为了保持这个例子简短; typed clients避免字符串命名。)
在使用点,通过DI(IHttpClientFactory
)获得reference,然后致电
var client = _clientFactory.CreateClient("MyThrottledClient");
获得预先配置单身HttpClient
的ThrottlingDelegatingHandler
实例。
通过以这种方式获得的HttpClient
实例的所有调用将被限制(通常,在应用程序中)到最初配置的int maxParallelism
。
HttpClientFactory神奇地处理所有HttpClient
终身问题。
波莉是deeply integrated with IHttpClientFactory,波利也提供Bulkhead policy works as a parallelism throttle by an identical SemaphoreSlim mechanism。
因此,作为手动滚动ThrottlingDelegatingHandler
的替代方法,您也可以直接使用Polly Bulkhead策略和IHttpClientFactory。在您的Startup
课程中,只需:
int maxParallelism = 10;
var throttler = Policy.BulkheadAsync<HttpResponseMessage>(maxParallelism, Int32.MaxValue);
services.AddHttpClient("MyThrottledClient")
.AddPolicyHandler(throttler);
如前所述从HttpClientFactory获取预配置的HttpClient
实例。和以前一样,通过这样的“MyThrottledClient”HttpClient
实例的所有调用都将被并行限制到配置的maxParallelism
。
Polly Bulkhead策略还提供了配置允许同时为主信号量中的执行槽“排队”的操作数量的功能。所以,例如:
var throttler = Policy.BulkheadAsync<HttpResponseMessage>(10, 100);
当如上配置为HttpClient
时,将允许10个并行的http调用,以及最多100个http调用'queue'作为执行槽。这可以通过防止故障下游系统导致上游排队呼叫的过度资源膨胀,为高吞吐量系统提供额外的弹性。
要将Polly选项与HttpClientFactory一起使用,请引入Microsoft.Extensions.Http.Polly
和Polly
nuget包。
参考文献:Polly deep doco on Polly and IHttpClientFactory; Bulkhead policy。
这个问题使用Task.Run(...)
并提到:
一个消耗外部api的.net核心web api
和:
不断添加任务而不是预定义的任务列表。
如果你的.net核心web api每个请求只消耗一次.net核心web api处理的外部API,你采用本答案其余部分讨论的方法,将下游外部http调用卸载到Task
的新Task.Run(...)
将是不必要的,只在额外的Task
实例和线程切换中创建开销。 Dot net core已经在线程池上的多个线程上运行传入请求。