写入流并同时发送到 .NET 中的 HTTP 控制器

问题描述 投票:0回答:2

我们有一个服务器 API 端点,可以将 HTTP Body 读取为

Stream
并将其解析为非常大的实体集合。我们使用 JsonTextReader
Newtonsoft.Json
包)实现了流的解析。这个过程工作正常,我们能够在加载流数据时部分读取流并将项目添加到集合中。

我们的客户端应用程序准备

StreamContent
并向服务器API发出HTTP请求。现在我们使用标准的 HttpClient 类来发送数据。我们已按照此处所述实施了请求:

public async Task<OutputModel> SendStreamAsync(Stream stream, CancellationToken cancellation)
{
    if (stream.CanSeek)
    {
        stream.Seek(0, SeekOrigin.Begin);
    }

    var body = new StreamContent(stream);

    var response = await _httpClient.PostAsync(url, body, cancellation);

    var content = await response.Content.ReadAsStringAsync(cancellation);

    ...

    return result;
}

但是我们在内存中准备了一个FULL流,只有在它之后我们才将

StreamContent
发送到当前实现中的HTTP端点。当然流可以很大,我们使用
JsonWriter
将数据部分写入流。

我们的目标

  1. 开始向 HTTP 端点发送
    StreamContent
    请求,同时我们仍在向现有
    Stream
    写入新数据。
  2. 当服务器API收到第一部分数据时,我们要开始读取和解析上传的数据(无需等待完整的数据上传)。

可以在.NET中实现吗? 我相信我们需要使用标准 HttpClient 实现之外的其他东西。

任何帮助将不胜感激。


更新1

我们的服务器API方法在这里:

[ApiController]
[Route("api/master")]
public class MasterController : ControllerBase
{
    [HttpPost("event")]
    public async Task<ActionResult> Load()
    {
        // Parse JSON from Request Body
        var events = ParseStreamAsync(HttpContext.Request.Body, HttpContext.RequestAborted);

        // Run external library with our events data
        await _eventLoaderService.AppendEventsAsync(events, HttpContext.RequestAborted);

        return NoContent();
    }
    
    private async IAsyncEnumerable<EventData> ParseStreamAsync(Stream stream, [EnumeratorCancellation] CancellationToken cancellation)
    {
        var serializer = new JsonSerializer
        {
            MissingMemberHandling = MissingMemberHandling.Error
        };

        using var sr = new StreamReader(stream);
        await using var reader = new JsonTextReader(sr);
        while (await ReadAsync(reader, cancellation))
        {
            if (reader.TokenType == JsonToken.PropertyName && reader.Path.Equals("Events", StringComparison.OrdinalIgnoreCase))
            {
                await ReadAsync(reader, cancellation);

                while (await ReadAsync(reader, cancellation))
                {
                    var @event = new Event();
                
                    ...
                    
                    yield return @event;
                }
            }
        }
    }
}
c# .net stream
2个回答
1
投票

使用

PushStreamContent
代替
StreamContent

以下是如何设置客户端在写入时异步发送流的示例:

public async Task<OutputModel> SendStreamAsync(Stream stream, CancellationToken cancellationToken)
{
    var requestMessage = new HttpRequestMessage(HttpMethod.Post, url);
    requestMessage.Content = new PushStreamContent(async (requestStream, httpContent, transportContext) =>
    {
        using (yourstream)
        {
            await stream.CopyToAsync(requestStream);
        }
    });

    using var response = await _httpClient.SendAsync(requestMessage, HttpCompletionOption.ResponseHeadersRead, cancellationToken);
  
    if (!response.IsSuccessStatusCode)
    {
        // Handle error response
    }

    var content = await response.Content.ReadAsStringAsync(cancellationToken);
  
    var result = JsonConvert.DeserializeObject<OutputModel>(content);

    return result;
}

0
投票

是的,可以将数据流式传输到 API 端点,而无需等待整个流在内存中准备好。 .NET 提供了异步流数据的方法,您可以修改客户端实现来实现此目的。

要实现将数据流式传输到 API,同时不断将新数据写入现有流,您可以将 HttpClient 与 HttpContent 派生类型结合使用,例如 PushStreamContent 或 MultipartContent。这些类允许在编写内容时流式传输内容。

以下是如何修改代码以实现此目的的示例:

public async Task<OutputModel> SendStreamAsync(Stream stream, CancellationToken cancellation)
{
    // Seek if possible
    if (stream.CanSeek)
    {
        stream.Seek(0, SeekOrigin.Begin);
    }

    // Create a PushStreamContent to allow streaming
    var pushStreamContent = new PushStreamContent(async (outputStream, httpContent, transportContext) =>
    {
        // Use a buffer or other mechanism to write data to the outputStream as it becomes available
        // Example: Write chunks of data to the outputStream using JsonWriter
        // Ensure cancellation is respected for async operations within this block
        // Example:
        using (var jsonWriter = new JsonTextWriter(new StreamWriter(outputStream)))
        {
            // Your logic here to write JSON data chunks to the outputStream
            // You can continuously write data to outputStream as it becomes available
            // For demo purposes, let's assume you're writing a JSON array
            await jsonWriter.WriteStartArrayAsync();

            // Simulate writing data in chunks (replace this with your actual logic)
            for (var i = 0; i < 10; i++)
            {
                var jsonString = $"{{ \"item\": {i} }}";
                await jsonWriter.WriteRawValueAsync(jsonString);
                await Task.Delay(100); // Simulate writing delay
            }

            await jsonWriter.WriteEndArrayAsync();
        }
    });

    // Make the POST request with the PushStreamContent
    var response = await _httpClient.PostAsync(url, pushStreamContent, cancellation);

    var content = await response.Content.ReadAsStringAsync(cancellation);

    // Process response...

    return result;
}
© www.soinside.com 2019 - 2024. All rights reserved.