我正在编写 ASP.NET Web API (.NET 7)。我需要创建一个通过流加载指定数据的服务,即时对其执行某些操作,然后将其作为流返回。
假设这是
StreamEncoder
服务类中的方法定义:
public async Task<Stream> EncodeStream(Stream input);
该方法需要做的是:
这个想法是,稍后可以在 API 端点中使用该服务。像这样的东西:
[HttpGet("image")]
public async Task<IActionResult> GetImage([FromQuery] string url, [FromQuery] string format)
{
// Perform checks ...
// Load stream
url = HttpUtility.UrlDecode(url);
var imageStream = await _imageLoaderService.LoadImage(url);
if (imageStream is null) return NotFound();
// Start processing the stream
var outputStream = await _streamEncoder(imageStream);
// Return immediately
return File(outputStream, Formats[format]);
}
批量同步处理流非常简单,但我似乎找不到一种即时处理的解决方案,以便 API 客户端在服务器有机会完成加载所有数据之前开始接收数据。
输入数据的大小通常超过 500MB,我需要同时处理其中的多个数据。我不能只是将所有数据加载到 RAM,然后对其进行处理,最后返回结果。
如何解决这个问题?有没有任何图书馆可以提供帮助?
这里有几件事在起作用。
首先,流式响应有一些限制。
HTTP 响应由标头和后面的数据组成。由于状态代码作为标头值返回,因此流响应的第一个限制是您无法更改状态代码。为了流式传输响应,您的 API 必须返回 200 和 then 流。如果您正在进行流式传输并且上游出现错误,则无法将该状态代码更改为 502 或 500;您所能做的就是抛出一个异常,然后 ASP.NET 将关闭连接,大多数客户端会将其解释为错误(某种一般的“通信错误”,而不是 500)。
另一个限制是您的代码可能直到发送后才知道响应的长度。由于正在进行编码,因此尤其如此。因此,这意味着您的回复不会有
Content-Length
标头,这意味着您的客户没有良好的进度更新。
但是,如果您接受这些限制,那么如何进行流式响应的具体细节就会发挥作用。
您可以通过调用
StartAsync
和 然后 复制到流来开始流式传输,如下所示:
[HttpGet("image")]
public async Task GetImage([FromQuery] string url, [FromQuery] string format)
{
// Load stream
url = HttpUtility.UrlDecode(url);
var imageStream = await _imageLoaderService.LoadImage(url);
if (imageStream is null)
{
Response.StatusCode = 404;
return;
}
// Set all the response headers.
Response.StatusCode = 200;
Response.ContentType = Formats[format];
Response.Headers[...] = ...
// Send the headers and start streaming.
await Response.StartAsync();
// Process the stream. This is just a straight copy as an example.
await imageStream.CopyToAsync(Response.Body);
}
请注意,使用这种方法你确实会失去好
IAsyncResult
的帮助者。 (特别是,如果您使用 File
和朋友来设置 Content-Disposition
,那么 IAsyncResult
帮助程序将处理所有必要的繁琐的标头值编码)。如果你想保留 IAsyncResult
助手,那么你不能直接使用 StartAsync
。在这种情况下,我建议您编写自己的 IAsyncResult
类型。
FileCallbackResult
类型,它将输出流传递给回调。使用我的类型看起来像这样:
[HttpGet("image")]
public async Task<IAsyncResult> GetImage([FromQuery] string url, [FromQuery] string format)
{
// Load stream
url = HttpUtility.UrlDecode(url);
var imageStream = await _imageLoaderService.LoadImage(url);
if (imageStream is null)
return NotFound();
return new FileCallbackResult(Formats[format], async (stream, context) =>
{
// Process the stream. This is just a straight copy as an example.
await imageStream.CopyToAsync(stream);
});
}
从技术上讲,也可以编写生产者/消费者流,但这会需要更多工作。目前不存在这样的类型(
NetworkStream
除外,但你无法控制该类型的两侧)。在过去,这是相当困难的,但今天我认为你可以使用管道来做到这一点。管道是一种更现代、更高效的流形式,也支持生产者/消费者语义。一旦你有了生产者/消费者流,你就可以将它传递给标准的 File
辅助方法。唯一棘手的部分是错误处理:您必须确保您的生产者委托包装在顶级 try
/catch
中,并且会捕获并向消费者重新引发任何异常。
更新:确实,由于管道的原因,创建生产者/消费者流并不困难:
public sealed class ProducerConsumerStream
{
public static Stream Create(Func<Stream, Task> producer, PipeOptions? options = null)
{
var pipe = new Pipe(options ?? PipeOptions.Default);
var readStream = pipe.Reader.AsStream();
var writeStream = pipe.Writer.AsStream();
Run();
return readStream;
async void Run()
{
try
{
await producer(writeStream);
await writeStream.FlushAsync();
pipe.Writer.Complete();
}
catch (Exception ex)
{
pipe.Writer.Complete(ex);
}
}
}
}
用法(请注意,实际用法应指定
PipeOptions.PauseWriterThreshold
):
public Stream EncodeStream(Stream input)
{
return ProducerConsumerStream.Create(async output =>
{
// Process the stream. This is just a straight copy as an example.
await input.CopyToAsync(output);
});
}
[HttpGet("image")]
public async Task<IAsyncResult> GetImage([FromQuery] string url, [FromQuery] string format)
{
// Load stream
url = HttpUtility.UrlDecode(url);
var imageStream = await _imageLoaderService.LoadImage(url);
if (imageStream is null)
return NotFound();
var outputStream = _streamEncoder.EncodeStream(imageStream);
return File(outputStream, Formats[format]);
}