我想同时执行多个异步任务。每个任务将运行一个 HTTP 请求,该请求要么成功完成,要么引发异常。我需要
await
直到第一个任务成功完成,或者直到所有任务都失败。
Task.WhenAny
方法的重载,以便排除未成功完成的任务?
等待任意任务,满足条件则返回任务。否则再次等待其他任务,直到没有更多任务可以等待。
public static async Task<Task> WhenAny( IEnumerable<Task> tasks, Predicate<Task> condition )
{
var tasklist = tasks.ToList();
while ( tasklist.Count > 0 )
{
var task = await Task.WhenAny( tasklist );
if ( condition( task ) )
return task;
tasklist.Remove( task );
}
return null;
}
简单检查一下
var tasks = new List<Task> {
Task.FromException( new Exception() ),
Task.FromException( new Exception() ),
Task.FromException( new Exception() ),
Task.CompletedTask, };
var completedTask = WhenAny( tasks, t => t.Status == TaskStatus.RanToCompletion ).Result;
if ( tasks.IndexOf( completedTask ) != 3 )
throw new Exception( "not expected" );
public static Task<T> GetFirstResult<T>(
ICollection<Func<CancellationToken, Task<T>>> taskFactories,
Predicate<T> predicate) where T : class
{
var tcs = new TaskCompletionSource<T>();
var cts = new CancellationTokenSource();
int completedCount = 0;
// in case you have a lot of tasks you might need to throttle them
//(e.g. so you don't try to send 99999999 requests at the same time)
// see: http://stackoverflow.com/a/25877042/67824
foreach (var taskFactory in taskFactories)
{
taskFactory(cts.Token).ContinueWith(t =>
{
if (t.Exception != null)
{
Console.WriteLine($"Task completed with exception: {t.Exception}");
}
else if (predicate(t.Result))
{
cts.Cancel();
tcs.TrySetResult(t.Result);
}
if (Interlocked.Increment(ref completedCount) == taskFactories.Count)
{
tcs.SetException(new InvalidOperationException("All tasks failed"));
}
}, cts.Token);
}
return tcs.Task;
}
使用示例:
using System.Net.Http;
var client = new HttpClient();
var response = await GetFirstResult(
new Func<CancellationToken, Task<HttpResponseMessage>>[]
{
ct => client.GetAsync("http://microsoft123456.com", ct),
ct => client.GetAsync("http://microsoft123456.com", ct),
ct => client.GetAsync("http://microsoft123456.com", ct),
ct => client.GetAsync("http://microsoft123456.com", ct),
ct => client.GetAsync("http://microsoft123456.com", ct),
ct => client.GetAsync("http://microsoft123456.com", ct),
ct => client.GetAsync("http://microsoft123456.com", ct),
ct => client.GetAsync("http://microsoft.com", ct),
ct => client.GetAsync("http://microsoft123456.com", ct),
ct => client.GetAsync("http://microsoft123456.com", ct),
},
rm => rm.IsSuccessStatusCode);
Console.WriteLine($"Successful response: {response}");
public static Task<Task<T>> WhenFirst<T>(IEnumerable<Task<T>> tasks, Func<Task<T>, bool> predicate)
{
if (tasks == null) throw new ArgumentNullException(nameof(tasks));
if (predicate == null) throw new ArgumentNullException(nameof(predicate));
var tasksArray = (tasks as IReadOnlyList<Task<T>>) ?? tasks.ToArray();
if (tasksArray.Count == 0) throw new ArgumentException("Empty task list", nameof(tasks));
if (tasksArray.Any(t => t == null)) throw new ArgumentException("Tasks contains a null reference", nameof(tasks));
var tcs = new TaskCompletionSource<Task<T>>();
var count = tasksArray.Count;
Action<Task<T>> continuation = t =>
{
if (predicate(t))
{
tcs.TrySetResult(t);
}
if (Interlocked.Decrement(ref count) == 0)
{
tcs.TrySetResult(null);
}
};
foreach (var task in tasksArray)
{
task.ContinueWith(continuation);
}
return tcs.Task;
}
使用示例:
var task = await WhenFirst(tasks, t => t.Status == TaskStatus.RanToCompletion);
if (task != null)
var value = await task;
请注意,这不会传播失败任务的异常(就像
WhenAny
不会传播)。
您还可以为非通用
Task
创建一个版本。
GetFirstResult
实现的更复杂版本,具有与 Eli Arbel 的 WhenFirst
方法中类似的 API。想法是相同的:在每个任务上附加一个可取消的延续,并在已完成的任务满足 CancellationTokenSource
时取消 predicate
。此实现避免使用 TaskCompletionSource<T>
,因此避免了由于实现中的错误而导致异步 WhenFirst
方法永远无法完成的风险:
public static async Task<Task<TResult>> WhenFirst<TResult>(
Task<TResult>[] tasks,
Func<Task<TResult>, bool> predicate)
{
ArgumentNullException.ThrowIfNull(tasks);
ArgumentNullException.ThrowIfNull(predicate);
using CancellationTokenSource cts = new();
Task<TResult> selectedTask = null;
IEnumerable<Task> continuations = tasks
.Where(task => task is not null)
.TakeWhile(_ => !cts.IsCancellationRequested)
.Select(task => task.ContinueWith(t =>
{
bool result;
try { result = predicate(t); } catch { cts.Cancel(); throw; }
if (result)
if (Interlocked.CompareExchange(ref selectedTask, t, null) is null)
cts.Cancel();
}, cts.Token, TaskContinuationOptions.DenyChildAttach |
TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default));
Task whenAll = Task.WhenAll(continuations);
try { await whenAll.ConfigureAwait(false); }
catch when (whenAll.IsCanceled) { } // Ignore
return selectedTask;
}
此实施的卖点是:
WhenFirst
方法异步完成时,所有工作都已完成并且所有内容都已清理。不会将任何后续任务作为“即发即忘”任务留下来,附加在原始任务上。predicate
中的异常会作为返回的 Task<Task<TResult>>
的错误进行传播。predicate
。predicate
,则不会在剩余任务上附加延续(性能优化)。predicate
,则该方法的异步结果是null
任务。换句话说,WhenFirst().Result
是 null
。null
数组中的 tasks
任务。如果它找到一个 null
,它不会抛出异常。签名使用
Task<T>[]
数组而不是 IEnumerable<Task<T>>
序列作为参数的原因是因为我不想处理序列枚举期间出现错误的可能性。传递延迟序列作为参数,换句话说,依靠 WhenFirst()
调用来实例化任务,无论如何对我来说听起来都是不可能的场景。
另一种方法,与Rufo爵士的答案非常相似,但使用
AsyncEnumerable
和Ix.NET
实现一个小辅助方法来在任何任务完成后立即对其进行流式处理:
static IAsyncEnumerable<Task<T>> WhenCompleted<T>(IEnumerable<Task<T>> source) =>
AsyncEnumerable.Create(_ =>
{
var tasks = source.ToList();
Task<T> current = null;
return AsyncEnumerator.Create(
async () => tasks.Any() && tasks.Remove(current = await Task.WhenAny(tasks)),
() => current,
async () => { });
});
}
然后可以按完成顺序处理任务,例如按要求返回第一个匹配的:
await WhenCompleted(tasks).FirstOrDefault(t => t.Status == TaskStatus.RanToCompletion)
只是想添加一些使用 List.Remove 的答案 @Peebo 和 @SirRufo (因为我还不能发表评论)
我会考虑使用:
var tasks = source.ToHashSet();
而不是:
var tasks = source.ToList();
这样删除会更有效率