我们有一个服务器 API 端点,可以将 HTTP Body 读取为
Stream
并将其解析为非常大的实体集合。我们使用 JsonTextReader (Newtonsoft.Json
包)实现了流的解析。这个过程工作正常,我们能够在加载流数据时部分读取流并将项目添加到集合中。
我们的客户端应用程序准备
StreamContent
并向服务器API发出HTTP请求。现在我们使用标准的 HttpClient 类来发送数据。我们已按照此处所述实施了请求:
public async Task<OutputModel> SendStreamAsync(Stream stream, CancellationToken cancellation)
{
if (stream.CanSeek)
{
stream.Seek(0, SeekOrigin.Begin);
}
var body = new StreamContent(stream);
var response = await _httpClient.PostAsync(url, body, cancellation);
var content = await response.Content.ReadAsStringAsync(cancellation);
...
return result;
}
但是我们在内存中准备了一个FULL流,只有在它之后我们才将
StreamContent
发送到当前实现中的HTTP端点。当然流可以很大,我们使用JsonWriter
将数据部分写入流。
我们的目标:
StreamContent
请求,同时我们仍在向现有 Stream
写入新数据。可以在.NET中实现吗? 我相信我们需要使用标准 HttpClient 实现之外的其他东西。
任何帮助将不胜感激。
更新1
我们的服务器API方法在这里:
[ApiController]
[Route("api/master")]
public class MasterController : ControllerBase
{
[HttpPost("event")]
public async Task<ActionResult> Load()
{
// Parse JSON from Request Body
var events = ParseStreamAsync(HttpContext.Request.Body, HttpContext.RequestAborted);
// Run external library with our events data
await _eventLoaderService.AppendEventsAsync(events, HttpContext.RequestAborted);
return NoContent();
}
private async IAsyncEnumerable<EventData> ParseStreamAsync(Stream stream, [EnumeratorCancellation] CancellationToken cancellation)
{
var serializer = new JsonSerializer
{
MissingMemberHandling = MissingMemberHandling.Error
};
using var sr = new StreamReader(stream);
await using var reader = new JsonTextReader(sr);
while (await ReadAsync(reader, cancellation))
{
if (reader.TokenType == JsonToken.PropertyName && reader.Path.Equals("Events", StringComparison.OrdinalIgnoreCase))
{
await ReadAsync(reader, cancellation);
while (await ReadAsync(reader, cancellation))
{
var @event = new Event();
...
yield return @event;
}
}
}
}
}
使用
PushStreamContent
代替 StreamContent
。
以下是如何设置客户端在写入时异步发送流的示例:
public async Task<OutputModel> SendStreamAsync(Stream stream, CancellationToken cancellationToken)
{
var requestMessage = new HttpRequestMessage(HttpMethod.Post, url);
requestMessage.Content = new PushStreamContent(async (requestStream, httpContent, transportContext) =>
{
using (yourstream)
{
await stream.CopyToAsync(requestStream);
}
});
using var response = await _httpClient.SendAsync(requestMessage, HttpCompletionOption.ResponseHeadersRead, cancellationToken);
if (!response.IsSuccessStatusCode)
{
// Handle error response
}
var content = await response.Content.ReadAsStringAsync(cancellationToken);
var result = JsonConvert.DeserializeObject<OutputModel>(content);
return result;
}
是的,可以将数据流式传输到 API 端点,而无需等待整个流在内存中准备好。 .NET 提供了异步流数据的方法,您可以修改客户端实现来实现此目的。
要实现将数据流式传输到 API,同时不断将新数据写入现有流,您可以将 HttpClient 与 HttpContent 派生类型结合使用,例如 PushStreamContent 或 MultipartContent。这些类允许在编写内容时流式传输内容。
以下是如何修改代码以实现此目的的示例:
public async Task<OutputModel> SendStreamAsync(Stream stream, CancellationToken cancellation)
{
// Seek if possible
if (stream.CanSeek)
{
stream.Seek(0, SeekOrigin.Begin);
}
// Create a PushStreamContent to allow streaming
var pushStreamContent = new PushStreamContent(async (outputStream, httpContent, transportContext) =>
{
// Use a buffer or other mechanism to write data to the outputStream as it becomes available
// Example: Write chunks of data to the outputStream using JsonWriter
// Ensure cancellation is respected for async operations within this block
// Example:
using (var jsonWriter = new JsonTextWriter(new StreamWriter(outputStream)))
{
// Your logic here to write JSON data chunks to the outputStream
// You can continuously write data to outputStream as it becomes available
// For demo purposes, let's assume you're writing a JSON array
await jsonWriter.WriteStartArrayAsync();
// Simulate writing data in chunks (replace this with your actual logic)
for (var i = 0; i < 10; i++)
{
var jsonString = $"{{ \"item\": {i} }}";
await jsonWriter.WriteRawValueAsync(jsonString);
await Task.Delay(100); // Simulate writing delay
}
await jsonWriter.WriteEndArrayAsync();
}
});
// Make the POST request with the PushStreamContent
var response = await _httpClient.PostAsync(url, pushStreamContent, cancellation);
var content = await response.Content.ReadAsStringAsync(cancellation);
// Process response...
return result;
}