如何在.net Core API项目中跨多个线程阻止对HttpClient的所有传出异步调用

问题描述 投票:0回答:1

我正在设计一个.net核心web api,它消耗了一个我无法控制的外部api。我在堆栈溢出时找到了一些很好的答案,这使得我可以在使用semaphoreslim的同一个线程中限制我对这个外部API的请求。我想知道如何最好地将此限制扩展为应用程序范围而不是仅限制特定的任务列表。我一直在学习HttpMessageHandlers,这似乎是拦截所有传出消息和应用限制的可能方法。但我担心线程安全和锁定问题,我可能不明白。我包含了我当前的限制代码,并希望这可能有助于理解我正在尝试做什么,但是跨多个线程,并且不断添加任务而不是预定义的任务列表。

private static async Task<List<iMISPagedResultResponse>> GetAsyncThrottled(List<int> pages, int throttle, IiMISClient client, string url, int limit)
{
        var rtn = new List<PagedResultResponse>();
        var allTasks = new List<Task>();
        var throttler = new SemaphoreSlim(initialCount: throttle);
        foreach (var page in pages)
        {
            await throttler.WaitAsync();
            allTasks.Add(
                Task.Run(async () =>
                {
                    try
                    {
                        var result = await GetPagedResult(client, url, page);
                        return result;
                    }
                    finally
                    {
                        throttler.Release();
                    }
                }));
        }
        await Task.WhenAll(allTasks);
        foreach (var task in allTasks)
        {
            var result = ((Task<PagedResultResponse>)task).Result;
            rtn.Add(result);
        }
        return rtn;
}
asp.net-core semaphore dotnet-httpclient httpclientfactory
1个回答
1
投票

概念问题

简单的实施

所以ThrottlingDelegatingHandler可能看起来像这样:

public class ThrottlingDelegatingHandler : DelegatingHandler
{
    private SemaphoreSlim _throttler;

    public ThrottlingDelegatingHandler(SemaphoreSlim throttler)
    {
        _throttler = throttler ?? throw new ArgumentNullException(nameof(throttler));
    }

    protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
    {
        if (request == null) throw new ArgumentNullException(nameof(request));

        await _throttler.WaitAsync(cancellationToken);
        try
        {
            return await base.SendAsync(request, cancellationToken);
        }
        finally
        {
            _throttler.Release();
        }
    }
}

以单例形式创建和维护实例:

int maxParallelism = 10;
var throttle = new ThrottlingDelegatingHandler(new SemaphoreSlim(maxParallelism)); 

DelegatingHandler应用于您希望并行调节调用的所有HttpClient实例:

HttpClient throttledClient = new HttpClient(throttle);

HttpClient不需要是一个单身人士:只有throttle实例。

为简洁起见,我省略了Dot Net Core DI代码,但是您将使用.Net Core容器注册单例ThrottlingDelegatingHandler实例,在使用点通过DI获取该单例,并在如上所示构造的HttpClients中使用它。

但:

使用HttpClientFactory更好地实现(.NET Core 2.1)

上面仍然提出了如何管理HttpClient生命周期的问题:

  • Singleton(app-scoped)HttpClients do not pick up DNS updates。您的应用程序将无知DNS更新,除非您杀死并重新启动它(可能不合需要)。
  • 经常创造和处置的模式,using (HttpClient client = ) { },另一方面,can cause socket exhaustion

HttpClientFactory的设计目标之一是管理HttpClient实例及其委托处理程序的生命周期,以避免这些问题。

在.NET Core 2.1中,您可以使用HttpClientFactory将其连接到ConfigureServices(IServiceCollection services)类的Startup中,如下所示:

int maxParallelism = 10;
services.AddSingleton<ThrottlingDelegatingHandler>(new ThrottlingDelegatingHandler(new SemaphoreSlim(maxParallelism)));

services.AddHttpClient("MyThrottledClient")
    .AddHttpMessageHandler<ThrottlingDelegatingHandler>();

(“MyThrottledClient”这里是一个named-client approach只是为了保持这个例子简短; typed clients避免字符串命名。)

在使用点,通过DI(IHttpClientFactory)获得reference,然后致电

var client = _clientFactory.CreateClient("MyThrottledClient");

获得预先配置单身HttpClientThrottlingDelegatingHandler实例。

通过以这种方式获得的HttpClient实例的所有调用将被限制(通常,在应用程序中)到最初配置的int maxParallelism

HttpClientFactory神奇地处理所有HttpClient终身问题。

使用Polly和IHttpClientFactory来获得所有这些“开箱即用”的功能

波莉是deeply integrated with IHttpClientFactory,波利也提供Bulkhead policy works as a parallelism throttle by an identical SemaphoreSlim mechanism

因此,作为手动滚动ThrottlingDelegatingHandler的替代方法,您也可以直接使用Polly Bulkhead策略和IHttpClientFactory。在您的Startup课程中,只需:

int maxParallelism = 10;
var throttler = Policy.BulkheadAsync<HttpResponseMessage>(maxParallelism, Int32.MaxValue);

services.AddHttpClient("MyThrottledClient")
    .AddPolicyHandler(throttler);

如前所述从HttpClientFactory获取预配置的HttpClient实例。和以前一样,通过这样的“MyThrottledClient”HttpClient实例的所有调用都将被并行限制到配置的maxParallelism

Polly Bulkhead策略还提供了配置允许同时为主信号量中的执行槽“排队”的操作数量的功能。所以,例如:

var throttler = Policy.BulkheadAsync<HttpResponseMessage>(10, 100);

当如上配置为HttpClient时,将允许10个并行的http调用,以及最多100个http调用'queue'作为执行槽。这可以通过防止故障下游系统导致上游排队呼叫的过度资源膨胀,为高吞吐量系统提供额外的弹性。

要将Polly选项与HttpClientFactory一起使用,请引入Microsoft.Extensions.Http.PollyPolly nuget包。

参考文献:Polly deep doco on Polly and IHttpClientFactory; Bulkhead policy


附录重新任务

这个问题使用Task.Run(...)并提到:

一个消耗外部api的.net核心web api

和:

不断添加任务而不是预定义的任务列表。

如果你的.net核心web api每个请求只消耗一次.net核心web api处理的外部API,你采用本答案其余部分讨论的方法,将下游外部http调用卸载到Task的新Task.Run(...)将是不必要的,只在额外的Task实例和线程切换中创建开销。 Dot net core已经在线程池上的多个线程上运行传入请求。

© www.soinside.com 2019 - 2024. All rights reserved.