异步任务的顺序处理

问题描述 投票:31回答:5

假设以下同步代码:

try
{
    Foo();
    Bar();
    Fubar();
    Console.WriteLine("All done");
}
catch(Exception e) // For illustration purposes only. Catch specific exceptions!
{
    Console.WriteLine(e);
}

现在假定所有这些方法都具有Async对应项,由于某种原因我不得不使用它们,因此仅将整个内容包装在一个新任务中是不可行的。我将如何实现相同的行为?我所说的“相同”是:

  1. 如果抛出异常,则为异常执行处理程序。
  2. 如果引发异常,则停止执行以下方法。

我唯一能想到的是恐怖

var fooTask = FooAsync();
fooTask.ContinueWith(t => HandleError(t.Exception),
                     TaskContinuationOptions.OnlyOnFaulted);
fooTask.ContinueWith(
    t =>
    {
        var barTask = BarAsync();
        barTask.ContinueWith(t => HandleError(t.Exception),
                             TaskContinuationOptions.OnlyOnFaulted);
        barTask.ContinueWith(
            t =>
            {
                var fubarTask = FubarAsync();
                fubarTask.ContinueWith(t => HandleError(t.Exception),
                                       TaskContinuationOptions.OnlyOnFaulted);
                fubarTask.ContinueWith(
                    t => Console.WriteLine("All done"),
                    TaskContinuationOptions.OnlyOnRanToCompletion);
            }, 
            TaskContinuationOptions.OnlyOnRanToCompletion);
    }, 
    TaskContinuationOptions.OnlyOnRanToCompletion);

请注意:

  • 我需要一个适用于.NET 4的解决方案,因此async/await不再存在。但是,如果可以使用async/await,请随时演示如何操作。
  • 我不需要使用TPL。如果使用TPL不可能,则可以采用另一种方法,例如使用Reactive Extensions吗?
c# .net .net-4.0 task-parallel-library
5个回答
27
投票

以下是async的工作方式:

try
{
    await FooAsync();
    await BarAsync();
    await FubarAsync();
    Console.WriteLine("All done");
}
catch(Exception e) // For illustration purposes only. Catch specific exceptions!
{
    Console.WriteLine(e);
}

如果安装(预发行版)Microsoft.Bcl.Async package,则在.NET 4.0上可以使用。


由于您被困在VS2010上,因此可以使用Stephen Toub's Then的变体:

Then

您可以这样使用它:

public static Task Then(this Task first, Func<Task> next)
{
  var tcs = new TaskCompletionSource<object>();
  first.ContinueWith(_ =>
  {
    if (first.IsFaulted) tcs.TrySetException(first.Exception.InnerExceptions);
    else if (first.IsCanceled) tcs.TrySetCanceled();
    else
    {
      try
      {
        next().ContinueWith(t =>
        {
          if (t.IsFaulted) tcs.TrySetException(t.Exception.InnerExceptions);
          else if (t.IsCanceled) tcs.TrySetCanceled();
          else tcs.TrySetResult(null);
        }, TaskContinuationOptions.ExecuteSynchronously);
      }
      catch (Exception exc) { tcs.TrySetException(exc); }
    }
  }, TaskContinuationOptions.ExecuteSynchronously);
  return tcs.Task; 
}

使用Rx,它看起来像这样(假设您尚未将var task = FooAsync().Then(() => BarAsync()).Then(() => FubarAsync()); task.ContinueWith(t => { if (t.IsFaulted || t.IsCanceled) { var e = t.Exception.InnerException; // exception handling } else { Console.WriteLine("All done"); } }, TaskContinuationOptions.ExcecuteSynchronously); 方法公开为async):

IObservable<Unit>

我认为。无论如何,我不是Rx管理员。 :)


7
投票

为了完整起见,这就是我将执行Chris Sinclair建议的辅助方法的方式:

FooAsync().ToObservable()
    .SelectMany(_ => BarAsync().ToObservable())
    .SelectMany(_ => FubarAsync().ToObservable())
    .Subscribe(_ => { Console.WriteLine("All done"); },
        e => { Console.WriteLine(e); });

这可确保仅在前一个任务成功完成时才请求每个后续任务。假定public void RunSequential(Action onComplete, Action<Exception> errorHandler, params Func<Task>[] actions) { RunSequential(onComplete, errorHandler, actions.AsEnumerable().GetEnumerator()); } public void RunSequential(Action onComplete, Action<Exception> errorHandler, IEnumerator<Func<Task>> actions) { if(!actions.MoveNext()) { onComplete(); return; } var task = actions.Current(); task.ContinueWith(t => errorHandler(t.Exception), TaskContinuationOptions.OnlyOnFaulted); task.ContinueWith(t => RunSequential(onComplete, errorHandler, actions), TaskContinuationOptions.OnlyOnRanToCompletion); } 返回一个已经运行的任务。


5
投票

您这里拥有的基本上是一个Func<Task>。您希望按顺序运行每个异步项目,但需要一些错误处理支持。这是一个这样的实现:

ForEachAsync

我还增加了对已取消任务的支持,只是为了更笼统,因为它几乎不需要做。

[将每个任务添加为上一个任务的延续,并且始终确保任何异常都会导致设置最终任务的异常。

这里是一个示例用法:

public static Task ForEachAsync(IEnumerable<Func<Task>> tasks)
{
    var tcs = new TaskCompletionSource<bool>();

    Task currentTask = Task.FromResult(false);

    foreach (Func<Task> function in tasks)
    {
        currentTask.ContinueWith(t => tcs.TrySetException(t.Exception.InnerExceptions)
            , TaskContinuationOptions.OnlyOnFaulted);
        currentTask.ContinueWith(t => tcs.TrySetCanceled()
                , TaskContinuationOptions.OnlyOnCanceled);
        Task<Task> continuation = currentTask.ContinueWith(t => function()
            , TaskContinuationOptions.OnlyOnRanToCompletion);
        currentTask = continuation.Unwrap();
    }

    currentTask.ContinueWith(t => tcs.TrySetException(t.Exception.InnerExceptions)
            , TaskContinuationOptions.OnlyOnFaulted);
    currentTask.ContinueWith(t => tcs.TrySetCanceled()
            , TaskContinuationOptions.OnlyOnCanceled);
    currentTask.ContinueWith(t => tcs.TrySetResult(true)
            , TaskContinuationOptions.OnlyOnRanToCompletion);

    return tcs.Task;
}

3
投票

您应该能够创建一个方法来合并两个任务,并且只有在第一个成功的情况下才启动第二个任务。

public static Task FooAsync()
{
    Console.WriteLine("Started Foo");
    return Task.Delay(1000)
        .ContinueWith(t => Console.WriteLine("Finished Foo"));
}

public static Task BarAsync()
{
    return Task.Factory.StartNew(() => { throw new Exception(); });
}

private static void Main(string[] args)
{
    List<Func<Task>> list = new List<Func<Task>>();

    list.Add(() => FooAsync());
    list.Add(() => FooAsync());
    list.Add(() => FooAsync());
    list.Add(() => FooAsync());
    list.Add(() => BarAsync());

    Task task = ForEachAsync(list);

    task.ContinueWith(t => Console.WriteLine(t.Exception.ToString())
        , TaskContinuationOptions.OnlyOnFaulted);
    task.ContinueWith(t => Console.WriteLine("Done!")
        , TaskContinuationOptions.OnlyOnRanToCompletion);
}

然后您可以将任务链接在一起:

public static Task Then(this Task parent, Task next)
{
    TaskCompletionSource<object> tcs = new TaskCompletionSource<object>();
    parent.ContinueWith(pt =>
    {
        if (pt.IsFaulted)
        {
            tcs.SetException(pt.Exception.InnerException);
        }
        else
        {
            next.ContinueWith(nt =>
            {
                if (nt.IsFaulted)
                {
                    tcs.SetException(nt.Exception.InnerException);
                }
                else { tcs.SetResult(null); }
            });
            next.Start();
        }
    });
    return tcs.Task;
}

如果您的任务立即开始,您可以将它们包装在Task outer = FooAsync() .Then(BarAsync()) .Then(FubarAsync()); outer.ContinueWith(t => { if(t.IsFaulted) { //handle exception } }); 中:

Func

1
投票

现在,我还没有真正使用TPL,所以这只是黑暗中的一击。考虑到@Servy提到的内容,也许它不会完全异步运行。但是我认为我会发布它,如果它不合常规,您可以将我投票弃用,也可以删除它(或者我们可以解决需要解决的问题)public static Task Then(this Task parent, Func<Task> nextFunc) { TaskCompletionSource<object> tcs = new TaskCompletionSource<object>(); parent.ContinueWith(pt => { if (pt.IsFaulted) { tcs.SetException(pt.Exception.InnerException); } else { Task next = nextFunc(); next.ContinueWith(nt => { if (nt.IsFaulted) { tcs.SetException(nt.Exception.InnerException); } else { tcs.SetResult(null); } }); } }); return tcs.Task; }

它的用法如下:

public void RunAsync(Action onComplete, Action<Exception> errorHandler, params Action[] actions) { if (actions.Length == 0) { //what to do when no actions/tasks provided? onComplete(); return; } List<Task> tasks = new List<Task>(actions.Length); foreach(var action in actions) { Task task = new Task(action); task.ContinueWith(t => errorHandler(t.Exception), TaskContinuationOptions.OnlyOnFaulted); tasks.Add(task); } //last task calls onComplete tasks[actions.Length - 1].ContinueWith(t => onComplete(), TaskContinuationOptions.OnlyOnRanToCompletion); //wire all tasks to execute the next one, except of course, the last task for (int i = 0; i <= actions.Length - 2; i++) { var nextTask = tasks[i + 1]; tasks[i].ContinueWith(t => nextTask.Start(), TaskContinuationOptions.OnlyOnRanToCompletion); } tasks[0].Start(); }

有什么想法?否决票? :)

((我当然更喜欢异步/等待)] >>

编辑:根据您对RunAsync(() => Console.WriteLine("All done"), ex => Console.WriteLine(ex), Foo, Bar, Fubar); 的评论,这将是适当的实现吗?

Func<Task>

© www.soinside.com 2019 - 2024. All rights reserved.