在 C# 中并发处理该数据的同时重复调用 API

问题描述 投票:0回答:1

想找出我到底哪里出了问题。

我有一个查询 API 的程序,该 API 以块的形式返回数据,并将返回一个响应标头,其中包含下一个数据块的 URL(如果还有更多数据)。

我正在尝试调用 API 查询,当我得到响应时,我想开始处理该数据,因为我正在等待下一个数据块到达。但看起来我正在等待数据,处理它,然后等待/请求下一组数据

JArray jsonObjects = new JArray();

Task.Run(async () =>
{
    jsonObjects = await GetObjectsJSONAsync();
}).GetAwaiter().GetResult();
private async Task<JArray> GetObjectsJSONAsync()
{
    /*
     * HttpClient setup here
     */

    var response = "";
    var headers = "";
    var total = 0;
    JArray productsFromURL = new JArray();
    HttpResponseMessage result = await client.GetAsync("/products/skus/");
    if (result.IsSuccessStatusCode)
    {
        headers = result.Headers.GetValues("Link").FirstOrDefault();
        response = await result.Content.ReadAsStringAsync();
        jsonObjectsFromURL = JArray.Parse(response);
        ProcessObjects(productsFromURL);
        while (headers.Contains("rel=\"next\"")) //while header contains next link
        {
            var url = headers.Split(';')[0].Replace("<", "").Replace(">", "");
            result = await client.GetAsync(url);
            try
            {
                response = await result.Content.ReadAsStringAsync();
                jsonObjectsFromURL = JArray.Parse(response);
                ProcessObjects(jsonObjectsFromURL); //adds processed objects to a ConcurrentBag
                headers = result.Headers.GetValues("Link").FirstOrDefault();
            }
            catch (Exception e)
            {
                //This means that there are no more pages to get
                headers = "";
            }
        }
    }

    return jsonObjectsFromURL; //dont care about the return here
}
c# asynchronous async-await concurrency parallel-processing
1个回答
0
投票

我假设您可以修改

GetObjectsJSONAsync
,使其返回一个包含三个元素的元组:当前 URL、下一个 URL 和
JArray
结果:

private async Task<(string, string, JArray)> GetObjectsJSONAsync(string url)
{
    string nextUrl;
    JArray jsArray;
    /* ... */
    return (url, nextUrl, jsArray);
}

现在您可以使用下面的

GetObjectsJSONAsync
来抽象调用
JArray
和处理
ParallelizeTwoActions
的并行化:

/// <remarks>
/// Invokes two actions for each element in a sequence, sequentially.
/// The action1 is invoked sequentially for one element at a time.
/// The action2 is also invoked sequentially for one element at a time.
/// The action1 for an element is invoked in parallel with the action2 for its
/// previous element. The action1 returns the next input in the sequence,
/// or null if it's the last element.
/// </remarks>
public static async Task ParallelizeTwoActions<TInput, TResult>(
    TInput startingInput,
    Func<TInput, Task<(TInput, TInput, TResult)>> action1,
    Func<TInput, TResult, Task> action2)
    where TInput : class
{
    if (startingInput is null) return;
    Task<(TInput, TInput, TResult)> task1 = Task.Run(() => action1(startingInput));
    Task task2 = Task.CompletedTask;
    try
    {
        while (true)
        {
            await Task.WhenAll(task1, task2).ConfigureAwait(false);
            (TInput previousInput, TInput nextInput, TResult result) = task1.Result;
            task2 = Task.Run(() => action2(previousInput, result));
            if (nextInput is null) break;
            task1 = Task.Run(() => action1(nextInput));
        }
    }
    finally
    {
        await Task.WhenAll(task1, task2).ConfigureAwait(false);
    }
}

这是我在

this
旧答案中发布的ParallelizeTwoActions的修改版本。该答案包括一个弹珠图,演示了该方法的行为。

Task.Run
确保两个操作将并行,即使它们不是完全异步的(即使它们在返回不完整的任务之前阻塞当前线程一段时间)。它通过将操作的调用卸载到
ThreadPool
来实现这一点。

您可以像这样使用

ParallelizeTwoActions

await ParallelizeTwoActions("https://stackoverflow.com", url =>
{
    return GetObjectsJSONAsync(url);
},
async (url, jsArray) =>
{
    // Process the jsArray asynchronously
});
© www.soinside.com 2019 - 2024. All rights reserved.