让我们说我有几个任务:
void Sample(IEnumerable<int> someInts)
{
var taskList = someInts.Select(x => DownloadSomeString(x));
}
async Task<string> DownloadSomeString(int x) {...}
我想得到第一个成功任务的结果。所以,基本的解决方案是写下这样的东西:
var taskList = someInts.Select(x => DownloadSomeString(x));
string content = string.Empty;
Task<string> firstOne = null;
while (string.IsNullOrWhiteSpace(content)){
try
{
firstOne = await Task.WhenAny(taskList);
if (firstOne.Status != TaskStatus.RanToCompletion)
{
taskList = taskList.Where(x => x != firstOne);
continue;
}
content = await firstOne;
}
catch(...){taskList = taskList.Where(x => x != firstOne);}
}
但这个解决方案似乎运行N
+(N
-1)+ .. + K
任务。其中N
是someInts.Count
,K
是任务中第一个成功任务的位置,因此它重新运行除WhenAny捕获的任务之外的所有任务。那么,有没有办法让第一个任务成功完成并运行最大的N
任务? (如果成功的任务将是最后一个)
“第一个成功的任务”的问题是如果所有任务都失败了怎么办?这是一个really bad idea to have a task that never completes。
我假设你想要传播最后一个任务的异常,如果它们都失败了。考虑到这一点,我会说这样的事情是合适的:
async Task<Task<T>> FirstSuccessfulTask(IEnumerable<Task<T>> tasks)
{
Task<T>[] ordered = tasks.OrderByCompletion();
for (int i = 0; i != ordered.Length; ++i)
{
var task = ordered[i];
try
{
await task.ConfigureAwait(false);
return task;
}
catch
{
if (i == ordered.Length - 1)
return task;
continue;
}
}
return null; // Never reached
}
这个解决方案建立在OrderByCompletion
extension method part的my AsyncEx library上; Jon Skeet和Stephen Toub也存在替代实现。
您需要做的就是创建一个TaskCompletionSource
,为每个任务添加一个延续,并在第一个任务成功完成后设置:
public static Task<T> FirstSuccessfulTask<T>(IEnumerable<Task<T>> tasks)
{
var taskList = tasks.ToList();
var tcs = new TaskCompletionSource<T>();
int remainingTasks = taskList.Count;
foreach (var task in taskList)
{
task.ContinueWith(t =>
{
if (task.Status == TaskStatus.RanToCompletion)
tcs.TrySetResult(t.Result);
else
if (Interlocked.Decrement(ref remainingTasks) == 0)
tcs.SetException(new AggregateException(tasks.SelectMany(t1 => t1.Exception.InnerExceptions)));
});
}
return tcs.Task;
}
没有结果的任务版本:
public static Task FirstSuccessfulTask(IEnumerable<Task> tasks)
{
var taskList = tasks.ToList();
var tcs = new TaskCompletionSource<bool>();
int remainingTasks = taskList.Count;
foreach (var task in taskList)
{
task.ContinueWith(t =>
{
if (task.Status == TaskStatus.RanToCompletion)
tcs.TrySetResult(true);
else
if (Interlocked.Decrement(ref remainingTasks) == 0)
tcs.SetException(new AggregateException(
tasks.SelectMany(t1 => t1.Exception.InnerExceptions)));
});
}
return tcs.Task;
}
作为一个直接的解决方案是等待任何任务,检查它是否处于RanToCompletion状态,如果没有,再等待除已经完成的任务之外的任何任务。
async Task<TResult> WaitForFirstCompleted<TResult>( IEnumerable<Task<TResult>> tasks )
{
var taskList = new List<Task<TResult>>( tasks );
Task<TResult> firstCompleted;
while ( taskList.Count > 0 )
{
firstCompleted = await Task.WhenAny( taskList );
if ( firstCompleted.Status == TaskStatus.RanToCompletion )
{
return firstCompleted.Result;
}
taskList.Remove( firstCompleted );
}
throw new InvalidOperationException( "No task completed successful" );
}
@Servy代码的修改版本,因为它包含一些编译错误和一些陷阱。我的变体是:
public static class AsyncExtensions
{
public static Task<T> GetFirstSuccessfulTask<T>(this IReadOnlyCollection<Task<T>> tasks)
{
var tcs = new TaskCompletionSource<T>();
int remainingTasks = tasks.Count;
foreach (var task in tasks)
{
task.ContinueWith(t =>
{
if (task.Status == TaskStatus.RanToCompletion)
tcs.TrySetResult(t.Result);
else if (Interlocked.Decrement(ref remainingTasks) == 0)
tcs.SetException(new AggregateException(
tasks.SelectMany(t2 => t2.Exception?.InnerExceptions ?? Enumerable.Empty<Exception>())));
});
}
return tcs.Task;
}
}
我们没有ToList
我们的输入,因为它已经是我们可以使用的集合,它编译(巨大的优势)并且它处理因异常由于某种原因没有一个固有的例外(它是完全可能的)。