如何等待一系列任务并停止等待第一个异常?

问题描述 投票:0回答:4

我有一系列任务,我正在等待它们

Task.WhenAll
。我的任务经常失败,在这种情况下,我会用消息框通知用户,以便她可以重试。我的问题是报告错误被延迟,直到所有任务完成。相反,我想在第一个任务抛出异常时立即通知用户。换句话说,我想要一个快速失败的
Task.WhenAll
版本。由于不存在这样的内置方法,我尝试创建自己的方法,但我的实现不符合我想要的方式。这是我想到的:

public static async Task<TResult[]> WhenAllFailFast<TResult>(
    params Task<TResult>[] tasks)
{
    foreach (var task in tasks)
    {
        await task.ConfigureAwait(false);
    }
    return await Task.WhenAll(tasks).ConfigureAwait(false);
}

这通常比原生的

Task.WhenAll
抛出得更快,但通常不够快。在任务 #1 完成之前,不会观察到出现故障的任务 #2。我该如何改进它才能尽快失败?


更新:关于取消,目前不在我的要求之内,但可以说,为了保持一致性,第一个取消的任务应该立即停止等待。在这种情况下,从

WhenAllFailFast
返回的组合任务应该有
Status == TaskStatus.Canceled

澄清:取消场景是指用户单击“取消”按钮来停止任务完成。这并不是在出现异常时自动取消未完成的任务。

c# .net asynchronous async-await task
4个回答
9
投票

WhenAllFailFast

 构建您的 TaskCompletionSource
 方法。您可以使用同步延续来 .ContinueWith() 每个输入任务,当任务以“Faulted”状态结束时(使用相同的异常对象),该延续会导致 TCS 出错。
也许类似(未完全测试):

using System; using System.Threading; using System.Threading.Tasks; namespace stackoverflow { class Program { static async Task Main(string[] args) { var cts = new CancellationTokenSource(); cts.Cancel(); var arr = await WhenAllFastFail( Task.FromResult(42), Task.Delay(2000).ContinueWith<int>(t => throw new Exception("ouch")), Task.FromCanceled<int>(cts.Token)); Console.WriteLine("Hello World!"); } public static Task<TResult[]> WhenAllFastFail<TResult>(params Task<TResult>[] tasks) { if (tasks is null || tasks.Length == 0) return Task.FromResult(Array.Empty<TResult>()); // defensive copy. var defensive = tasks.Clone() as Task<TResult>[]; var tcs = new TaskCompletionSource<TResult[]>(); var remaining = defensive.Length; Action<Task> check = t => { switch (t.Status) { case TaskStatus.Faulted: // we 'try' as some other task may beat us to the punch. tcs.TrySetException(t.Exception.InnerException); break; case TaskStatus.Canceled: // we 'try' as some other task may beat us to the punch. tcs.TrySetCanceled(); break; default: // we can safely set here as no other task remains to run. if (Interlocked.Decrement(ref remaining) == 0) { // get the results into an array. var results = new TResult[defensive.Length]; for (var i = 0; i < tasks.Length; ++i) results[i] = defensive[i].Result; tcs.SetResult(results); } break; } }; foreach (var task in defensive) { task.ContinueWith(check, default, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); } return tcs.Task; } } }

编辑

:解开AggregateException,取消支持,返回结果数组。防御数组突变、null 和empty。显式任务调度程序。


7
投票
WhenAllFailFast

方法,并且我修改了@ZaldronGG的

优秀解决方案
以使其性能更高一些(并且更符合Stephen Cleary的建议)。下面的实现在我的 PC 中每秒处理大约 3,500,000 个任务。 public static Task<TResult[]> WhenAllFailFast<TResult>(params Task<TResult>[] tasks) { if (tasks is null) throw new ArgumentNullException(nameof(tasks)); if (tasks.Length == 0) return Task.FromResult(new TResult[0]); var results = new TResult[tasks.Length]; var remaining = tasks.Length; var tcs = new TaskCompletionSource<TResult[]>( TaskCreationOptions.RunContinuationsAsynchronously); for (int i = 0; i < tasks.Length; i++) { var task = tasks[i]; if (task == null) throw new ArgumentException( $"The {nameof(tasks)} argument included a null value.", nameof(tasks)); HandleCompletion(task, i); } return tcs.Task; async void HandleCompletion(Task<TResult> task, int index) { try { var result = await task.ConfigureAwait(false); results[index] = result; if (Interlocked.Decrement(ref remaining) == 0) { tcs.TrySetResult(results); } } catch (OperationCanceledException) { tcs.TrySetCanceled(); } catch (Exception ex) { tcs.TrySetException(ex); } } }



2
投票

您可能会发现这篇文章对第一次失败后中止的模式很有帮助:

http://gigi.nullneuron.net/gigilabs/patterns-for-asynchronous-composite-tasks-in-c/

public static async Task<TResult[]> WhenAllFailFast<TResult>( params Task<TResult>[] tasks) { var taskList = tasks.ToList(); while (taskList.Count > 0) { var task = await Task.WhenAny(taskList).ConfigureAwait(false); if(task.Exception != null) { // Left as an exercise for the reader: // properly unwrap the AggregateException; // handle the exception(s); // cancel the other running tasks. throw task.Exception.InnerException; } taskList.Remove(task); } return await Task.WhenAll(tasks).ConfigureAwait(false); }



1
投票
async void

上启动多个

SynchronizationContext
操作有点怀疑。我在这里提出的解决方案要慢得多。它比 @ZaldronGG 的
优秀解决方案
慢了大约 3 倍,比我之前基于 async void
 的实现慢了大约 10 倍。它的优点是,在完成返回的 
Task<TResult[]>
后,它不会泄漏观察到的任务所附加的即发即忘的延续。当此任务完成时,所有由
WhenAllFailFast
方法内部创建的延续都已被清理。对于 API 来说,哪种行为是理想的行为是普遍的,但在许多情况下它可能并不重要。
public static Task<TResult[]> WhenAllFailFast<TResult>(params Task<TResult>[] tasks)
{
    ArgumentNullException.ThrowIfNull(tasks);
    CancellationTokenSource cts = new();
    Task<TResult> failedTask = null;
    TaskContinuationOptions flags = TaskContinuationOptions.DenyChildAttach |
        TaskContinuationOptions.ExecuteSynchronously;
    Action<Task<TResult>> continuationAction = new(task =>
    {
        if (!task.IsCompletedSuccessfully)
            if (Interlocked.CompareExchange(ref failedTask, task, null) is null)
                cts.Cancel();
    });
    IEnumerable<Task> continuations = tasks.Select(task => task
        .ContinueWith(continuationAction, cts.Token, flags, TaskScheduler.Default));

    return Task.WhenAll(continuations).ContinueWith(allContinuations =>
    {
        cts.Dispose();
        Task<TResult> localFailedTask = Volatile.Read(ref failedTask);
        if (localFailedTask is not null)
            return Task.WhenAll(localFailedTask);
        // At this point all the tasks are completed successfully
        Debug.Assert(tasks.All(t => t.IsCompletedSuccessfully));
        Debug.Assert(allContinuations.IsCompletedSuccessfully);
        return Task.WhenAll(tasks);
    }, default, flags, TaskScheduler.Default).Unwrap();
}

此实现与 ZaldronGG 的类似,它为每个任务附加一个延续,不同之处在于这些延续是可以取消的,并且当观察到第一个不成功的任务时,它们会被集体取消。它还使用了我最近发现的 

Unwrap 技术

,这消除了手动完成 
TaskCompletionSource<TResult[]> 实例的需要,并且通常可以实现简洁的实现。
    

© www.soinside.com 2019 - 2024. All rights reserved.