如何处理来自一个流的数据并将其作为 ASP.NET 中的另一个异步流返回

问题描述 投票:0回答:1

我正在编写 ASP.NET Web API (.NET 7)。我需要创建一个通过流加载指定数据的服务,即时对其执行某些操作,然后将其作为流返回。

假设这是

StreamEncoder
服务类中的方法定义:

public async Task<Stream> EncodeStream(Stream input);

该方法需要做的是:

  1. 立即返回输出流
  2. 继续以块的形式加载输入流数据(例如 32 字节)
  3. 通过将其编码为 Base64 来处理该块
  4. 将该编码块传递到输出流。

这个想法是,稍后可以在 API 端点中使用该服务。像这样的东西:

[HttpGet("image")]
public async Task<IActionResult> GetImage([FromQuery] string url, [FromQuery] string format)
{
    // Perform checks ...
    
    // Load stream
    url = HttpUtility.UrlDecode(url);
    var imageStream = await _imageLoaderService.LoadImage(url);
    if (imageStream is null) return NotFound();

    // Start processing the stream
    var outputStream = await _streamEncoder(imageStream);

    // Return immediately
    return File(outputStream, Formats[format]);
}

批量同步处理流非常简单,但我似乎找不到一种即时处理的解决方案,以便 API 客户端在服务器有机会完成加载所有数据之前开始接收数据。

输入数据的大小通常超过 500MB,我需要同时处理其中的多个数据。我不能只是将所有数据加载到 RAM,然后对其进行处理,最后返回结果。

如何解决这个问题?有没有任何图书馆可以提供帮助?

c# asp.net .net asynchronous asp.net-web-api
1个回答
2
投票

这里有几件事在起作用。

首先,流式响应有一些限制。

HTTP 响应由标头和后面的数据组成。由于状态代码作为标头值返回,因此流响应的第一个限制是您无法更改状态代码。为了流式传输响应,您的 API 必须返回 200 和 then 流。如果您正在进行流式传输并且上游出现错误,则无法将该状态代码更改为 502 或 500;您所能做的就是抛出一个异常,然后 ASP.NET 将关闭连接,大多数客户端会将其解释为错误(某种一般的“通信错误”,而不是 500)。

另一个限制是您的代码可能直到发送后才知道响应的长度。由于正在进行编码,因此尤其如此。因此,这意味着您的回复不会有

Content-Length
标头,这意味着您的客户没有良好的进度更新。

但是,如果您接受这些限制,那么如何进行流式响应的具体细节就会发挥作用。

您可以通过调用

StartAsync
然后 复制到流来开始流式传输,如下所示:

[HttpGet("image")]
public async Task GetImage([FromQuery] string url, [FromQuery] string format)
{
  // Load stream
  url = HttpUtility.UrlDecode(url);
  var imageStream = await _imageLoaderService.LoadImage(url);
  if (imageStream is null)
  {
    Response.StatusCode = 404;
    return;
  }

  // Set all the response headers.
  Response.StatusCode = 200;
  Response.ContentType = Formats[format];
  Response.Headers[...] = ...

  // Send the headers and start streaming.
  await Response.StartAsync();

  // Process the stream. This is just a straight copy as an example.
  await imageStream.CopyToAsync(Response.Body);
}

请注意,使用这种方法你确实会失去好

IAsyncResult
的帮助者。 (特别是,如果您使用
File
和朋友来设置
Content-Disposition
,那么
IAsyncResult
帮助程序将处理所有必要的繁琐的标头值编码)。如果你想保留
IAsyncResult
助手,那么你不能直接使用
StartAsync
。在这种情况下,我建议您编写自己的
IAsyncResult
类型。

我在 GitHub 上有一个

FileCallbackResult
类型,它将输出流传递给回调。使用我的类型看起来像这样:

[HttpGet("image")]
public async Task<IAsyncResult> GetImage([FromQuery] string url, [FromQuery] string format)
{
  // Load stream
  url = HttpUtility.UrlDecode(url);
  var imageStream = await _imageLoaderService.LoadImage(url);
  if (imageStream is null)
    return NotFound();

  return new FileCallbackResult(Formats[format], async (stream, context) =>
  {
    // Process the stream. This is just a straight copy as an example.
    await imageStream.CopyToAsync(stream);
  });
}

从技术上讲,也可以编写生产者/消费者流,但这会需要更多工作。目前不存在这样的类型(

NetworkStream
除外,但你无法控制该类型的两侧)。在过去,这是相当困难的,但今天我认为你可以使用管道来做到这一点。管道是一种更现代、更高效的流形式,也支持生产者/消费者语义。一旦你有了生产者/消费者流,你就可以将它传递给标准的
File
辅助方法。唯一棘手的部分是错误处理:您必须确保您的生产者委托包装在顶级
try
/
catch
中,并且会捕获并向消费者重新引发任何异常。

更新:确实,由于管道的原因,创建生产者/消费者流并不困难

public sealed class ProducerConsumerStream
{
    public static Stream Create(Func<Stream, Task> producer, PipeOptions? options = null)
    {
        var pipe = new Pipe(options ?? PipeOptions.Default);
        var readStream = pipe.Reader.AsStream();
        var writeStream = pipe.Writer.AsStream();
        Run();
        return readStream;

        async void Run()
        {
            try
            {
                await producer(writeStream);
                await writeStream.FlushAsync();
                pipe.Writer.Complete();
            }
            catch (Exception ex)
            {
                pipe.Writer.Complete(ex);
            }
        }
    }
}

用法(请注意,实际用法应指定

PipeOptions.PauseWriterThreshold
):

public Stream EncodeStream(Stream input)
{
  return ProducerConsumerStream.Create(async output =>
  {
    // Process the stream. This is just a straight copy as an example.
    await input.CopyToAsync(output);
  });
}

[HttpGet("image")]
public async Task<IAsyncResult> GetImage([FromQuery] string url, [FromQuery] string format)
{
  // Load stream
  url = HttpUtility.UrlDecode(url);
  var imageStream = await _imageLoaderService.LoadImage(url);
  if (imageStream is null)
    return NotFound();

  var outputStream = _streamEncoder.EncodeStream(imageStream);
  return File(outputStream, Formats[format]);
}
© www.soinside.com 2019 - 2024. All rights reserved.