想找出我到底哪里出了问题。
我有一个查询 API 的程序,该 API 以块的形式返回数据,并将返回一个响应标头,其中包含下一个数据块的 URL(如果还有更多数据)。
我正在尝试调用 API 查询,当我得到响应时,我想开始处理该数据,因为我正在等待下一个数据块到达。但看起来我正在等待数据,处理它,然后等待/请求下一组数据
JArray jsonObjects = new JArray();
Task.Run(async () =>
{
jsonObjects = await GetObjectsJSONAsync();
}).GetAwaiter().GetResult();
private async Task<JArray> GetObjectsJSONAsync()
{
/*
* HttpClient setup here
*/
var response = "";
var headers = "";
var total = 0;
JArray productsFromURL = new JArray();
HttpResponseMessage result = await client.GetAsync("/products/skus/");
if (result.IsSuccessStatusCode)
{
headers = result.Headers.GetValues("Link").FirstOrDefault();
response = await result.Content.ReadAsStringAsync();
jsonObjectsFromURL = JArray.Parse(response);
ProcessObjects(productsFromURL);
while (headers.Contains("rel=\"next\"")) //while header contains next link
{
var url = headers.Split(';')[0].Replace("<", "").Replace(">", "");
result = await client.GetAsync(url);
try
{
response = await result.Content.ReadAsStringAsync();
jsonObjectsFromURL = JArray.Parse(response);
ProcessObjects(jsonObjectsFromURL); //adds processed objects to a ConcurrentBag
headers = result.Headers.GetValues("Link").FirstOrDefault();
}
catch (Exception e)
{
//This means that there are no more pages to get
headers = "";
}
}
}
return jsonObjectsFromURL; //dont care about the return here
}
我假设您可以修改
GetObjectsJSONAsync
,使其返回一个包含三个元素的元组:当前 URL、下一个 URL 和 JArray
结果:
private async Task<(string, string, JArray)> GetObjectsJSONAsync(string url)
{
string nextUrl;
JArray jsArray;
/* ... */
return (url, nextUrl, jsArray);
}
现在您可以使用下面的
GetObjectsJSONAsync
来抽象调用 JArray
和处理 ParallelizeTwoActions
的并行化:
/// <remarks>
/// Invokes two actions for each element in a sequence, sequentially.
/// The action1 is invoked sequentially for one element at a time.
/// The action2 is also invoked sequentially for one element at a time.
/// The action1 for an element is invoked in parallel with the action2 for its
/// previous element. The action1 returns the next input in the sequence,
/// or null if it's the last element.
/// </remarks>
public static async Task ParallelizeTwoActions<TInput, TResult>(
TInput startingInput,
Func<TInput, Task<(TInput, TInput, TResult)>> action1,
Func<TInput, TResult, Task> action2)
where TInput : class
{
if (startingInput is null) return;
Task<(TInput, TInput, TResult)> task1 = Task.Run(() => action1(startingInput));
Task task2 = Task.CompletedTask;
try
{
while (true)
{
await Task.WhenAll(task1, task2).ConfigureAwait(false);
(TInput previousInput, TInput nextInput, TResult result) = task1.Result;
task2 = Task.Run(() => action2(previousInput, result));
if (nextInput is null) break;
task1 = Task.Run(() => action1(nextInput));
}
}
finally
{
await Task.WhenAll(task1, task2).ConfigureAwait(false);
}
}
这是我在
this旧答案中发布的
ParallelizeTwoActions
的修改版本。该答案包括一个弹珠图,演示了该方法的行为。
Task.Run
确保两个操作将并行,即使它们不是完全异步的(即使它们在返回不完整的任务之前阻塞当前线程一段时间)。它通过将操作的调用卸载到 ThreadPool
来实现这一点。
您可以像这样使用
ParallelizeTwoActions
:
await ParallelizeTwoActions("https://stackoverflow.com", url =>
{
return GetObjectsJSONAsync(url);
},
async (url, jsArray) =>
{
// Process the jsArray asynchronously
});