我需要在 Linux 操作系统上托管的 ASP.NET 6 服务器上实现并发 WebRequest。我创建了一个屏障实现(BarrierTask 类)来处理并行处理。但是,偶尔(如下所述),WebRequest 会花费很长时间并最终失败。
在启动时我设置线程池:
System.Net.ServicePointManager.DefaultConnectionLimit = 500;
System.Threading.ThreadPool.SetMinThreads(1000, 500);
运行多个并行 WebRequest 的示例:
BarrierTask<int, TestDto> barrierTask = new BarrierTask<int, TestDto>();
int[] arr = new int[count];
for (int i = 0; i < arr.Length; i++)
{
arr[i] = i;
}
TestDto[] results = barrierTask.Run((a) =>
{
var id = new TestDto()
{
TimestampStart = DateTime.UtcNow,
TimestampEnd = new DateTime(),
Delay = delay,
Name = $"Test request {a}",
Error = "OK"
};
string str = Newtonsoft.Json.JsonConvert.SerializeObject(id);
try
{
string json = DoRequest("https://.../apidebug/delay", str);
var ret = Newtonsoft.Json.JsonConvert.DeserializeObject<TestDto>(json);
return ret;
}
catch (Exception ex)
{
id.Error = ex.ToString().Replace("\r","\\r").Replace("\n","\\n");
id.TimestampEnd = DateTime.UtcNow;
return id;
}
}
, arr);
string DoRequest(string uri, string body)
{
using var client = new HttpClient();
using (var webRequest = new HttpRequestMessage(HttpMethod.Post, uri)
{
Content = new StringContent(body, Encoding.UTF8, "application/json"),
})
{
using (var response = client.Send(webRequest))
{
using (var reader = new StreamReader(response.Content.ReadAsStream()))
{
return reader.ReadToEnd();
}
}
}
}
障碍任务:
public class BarrierTask<TArgs,TResult>
{
Barrier barrier;
public TResult[] Run(Func<TArgs, TResult> action, TArgs[] args)
{
List<TResult> ret = new List<TResult>();
barrier = new Barrier(args.Length + 1);
foreach (TArgs arg in args)
{
Task.Run(() =>
{
try
{
TResult result = action(arg);
lock (this)
{
ret.Add(result);
}
}
catch (Exception ex)
{
AppLog.Log("BarrierTask.Run: Input: " + arg.ToString() + " Ex: " + ex.Message, AppLog.MessageType.Error);
}
finally
{
barrier.SignalAndWait();
}
});
}
barrier.SignalAndWait();
return ret.ToArray();
}
}
端点示例:
[Route("api/delay")]
[ApiController]
public class DelayController : ControllerBase
{
[HttpPost]
public TestDto Post(TestDto testDto)
{
System.Threading.Thread.Sleep(testDto.Delay);
testDto.TimestampEnd = DateTime.UtcNow;
return testDto;
}
}
案例 1 - 启动服务器并执行 100 个并发 WebRequest,延迟 4000 毫秒: 部分 WebRequest 处理成功,但许多请求失败,并出现以下异常:
System.IO.IOException: The response ended prematurely.
此异常的可能原因是端点上的 Apache 处于“R - Reading Request”状态,并在 20 秒后终止连接。
第二次尝试大约在 4300 毫秒内处理完毕,表明一切正常。
案例 2 - 启动服务器,预热,并执行 100 个并发 WebRequest,延迟 4000 毫秒: 最初,我发起了一个请求(使用相同的代码)。处理完预热请求后,我发起了 100 个并发请求,一切正常。
情况 3 - 几分钟后: 几分钟后,再次需要预热请求;否则,某些请求会导致异常。
请问,有什么想法吗?
您似乎发起了太多并发网络请求。如果 100 个请求中有 80% 失败,那么限制并发请求的数量可能会更有效。
您可以使用
[Parallel.ForEachAsync][1]
来管理并发执行并限制最大并行度。这是一个例子:
using HttpClient client = new();
ParallelOptions parallelOptions = new()
{
MaxDegreeOfParallelism = 10
};
await Parallel.ForEachAsync([Some Source], parallelOptions, async (item, token) =>
{
// Do something here
await client.SendAsync(/*Some Code…*/);
});
正如您在上面的示例中所看到的,
HttpClient
实例没有被释放。它被设计为“重用于多个 Web 请求”。如果您在每次请求后处理它,它可能会耗尽所有可用端口,而这些端口不会通过处理 HttpClient
立即释放。另外,尽可能尝试使用这些方法的异步版本。关于您的BarrierTask
课程,我不完全确定您想要实现什么目标。我建议使用
Parallel.ForEachAsync
或图书馆。以下是如何使用 Parallel.ForEachAsync
的示例:public async Task StartRunAsync(List<int> ints)
{
HttpClient client = new HttpClient();
await Parallel.ForEachAsync(ints, new ParallelOptions() { MaxDegreeOfParallelism = 10 }, async (aInt, cancellationToken) =>
{
var id = new TestDto()
{
TimestampStart = DateTime.UtcNow,
TimestampEnd = new DateTime(),
Delay = delay,
Name = $"Test request {aInt}",
Error = "OK"
};
string str = Newtonsoft.Json.JsonConvert.SerializeObject(id);
try
{
string json = DoRequest("https://.../apidebug/delay", str, client);
var ret = Newtonsoft.Json.JsonConvert.DeserializeObject<TestDto>(json);
return ret;
}
catch (Exception ex)
{
id.Error = ex.ToString().Replace("\r", "\\r").Replace("\n", "\\n");
id.TimestampEnd = DateTime.UtcNow;
return id;
}
});
}
如果您想使用提供更多功能的库,您可以尝试使用NuGet Manager 中的请求库
。以下是如何使用它的示例:
public void StartRun(List<int> ints)
{
HttpClient httpClient = new HttpClient();
foreach (var aInt in ints)
{
new OwnRequest(async (cancellationToken) =>
{
var id = new TestDto()
{
TimestampStart = DateTime.UtcNow,
TimestampEnd = new DateTime(),
Delay = delay,
Name = $"Test request {aInt}",
Error = "OK"
};
string str = Newtonsoft.Json.JsonConvert.SerializeObject(id);
try
{
string json = DoRequest("https://.../apidebug/delay", str, httpClient);
var ret = Newtonsoft.Json.JsonConvert.DeserializeObject<TestDto>(json);
return ret;
}
catch (Exception ex)
{
id.Error = ex.ToString().Replace("\r", "\\r").Replace("\n", "\\n");
id.TimestampEnd = DateTime.UtcNow;
return id;
}
});
}
}
如果失败,将会重试,并且您可以动态指定并发程度、延迟并发或类似的操作。您也可以为此目的编写自己的类。
public class SendRequest : Request<RequestOptions<string, Exception>, string, Exception>
{
private static Lazy<HttpClient> _client = new(() => new HttpClient());
private int _id;
private Uri _uri;
private int _delay;
public SendRequest(Uri uri, int id, int delay, RequestOptions<string, Exception> options = null) : base(options)
{
_id = id;
_delay = delay;
_uri = uri;
}
protected override async Task<RequestReturn> RunRequestAsync()
{
RequestReturn requestReturn = new RequestReturn();
var id = new TestDto()
{
TimestampStart = DateTime.UtcNow,
TimestampEnd = new DateTime(),
Delay = _delay,
Name = $"Test request {_id}",
Error = "OK"
};
string str = Newtonsoft.Json.JsonConvert.SerializeObject(id);
try
{
using HttpRequestMessage webRequest = new(HttpMethod.Post, _uri)
{
Content = new StringContent(str, Encoding.UTF8, "application/json"),
};
using HttpResponseMessage response = await _client.Value.SendAsync(webRequest);
using StreamReader reader = new(response.Content.ReadAsStream());
requestReturn.CompletedReturn = reader.ReadToEnd();
requestReturn.Successful = true;
}
catch (Exception ex)
{
requestReturn.Successful = false;
requestReturn.FailedReturn = ex;
}
return requestReturn;
}
}
// To make some posts, you could do:
public void StartRun(List<int> ints)
{
RequestHandler handler = new RequestHandler();
handler.StaticDegreeOfParallelism = 10;
foreach (int aInt in ints)
{
new SendRequest(new Uri("https://.../apidebug/delay"), aInt, delay, new RequestOptions<string, Exception>()
{
RequestCompleted = (request, yourString) => { /*Some code here*/ },
Handler = handler
});
}
}
使用
StaticDegreeOfParallelism
,您可以更改并行度。这也适用于
OwnRequest
。
我认为这种方法应该可以消除你提到的问题。请注意,此处的代码没有经过测试,而是根据您提供的代码编写的。它不会开箱即用。
我也没有真正明白你想要做什么,如果这没有帮助,我很抱歉。