这里是一个示例程序,它对控制台输入进行了两次订阅(此处可观察到的源不相关)。在第一个订阅中,它使用Observable.SelectMany;在第二个订阅中,它使用内部使用System.Threading.Tasks.Dataflow包的类似SelectMany运算符。每个输入中的某些输入都会引发异常。异常已正确转发到Observer onError,该异常将其重新引发到默认的Subscribe实现中。观察到的行为是,在SelectMany中发生异常的情况下,该进程将继续运行,而在SelectManyPreseveOrder中发生异常的情况下,该进程将以未处理的异常终止。是什么导致了不同的行为?有没有办法在SelectManyPreserveOrder运算符中实现“更友好”的行为?这是一个使用Rx.Linq 2.2.5和System.Threading.Tasks.Dataflow 4.10.0的.net 4.6.1控制台应用程序:
class Program
{
static async Task Main()
{
AppDomain.CurrentDomain.UnhandledException += (sender, args) => Console.WriteLine("App domain unhandled exception");
TaskScheduler.UnobservedTaskException += (sender, args) => Console.WriteLine("Unobserved task exception");
var consoleInput = Helper.ConsoleInput();
consoleInput.SelectMany(async input =>
{
await Task.Delay(50).ConfigureAwait(false);
if (input == "1")
throw new Exception("This exception is swallowed");
return input;
})
.Subscribe(s => Console.WriteLine($"SelectMany: {s}"));
consoleInput.SelectManyPreserveOrder(async input =>
{
await Task.Delay(50).ConfigureAwait(false);
if (input == "2")
throw new Exception("This exception kills the process");
return input;
})
.Subscribe(s => Console.WriteLine($"SelectMany (TPL Dataflow): {s}"));
await Task.Delay(TimeSpan.FromMinutes(10)).ConfigureAwait(false);
}
}
public static class ObservableExtension
{
public static IObservable<TResult> SelectManyPreserveOrder<TSource, TResult>(this IObservable<TSource> source, Func<TSource, Task<TResult>> selector, int maxParallelBatches = 1)
{
return source.FromTplDataflow(() =>
new TransformBlock<TSource, TResult>(selector,
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = maxParallelBatches }));
}
public static IObservable<TResult> FromTplDataflow<T, TResult>(
this IObservable<T> source, Func<IPropagatorBlock<T, TResult>> blockFactory)
{
return Observable.Defer(() =>
{
var block = blockFactory();
return Observable.Using(() =>
{
var disposable = source.Subscribe(block.AsObserver());
return Disposable.Create(dispose: () => disposable.Dispose());
}, r => block.AsObservable());
});
}
}
public static class Helper
{
public static IObservable<string> ConsoleInput()
{
return
Observable
.FromAsync(() => Console.In.ReadLineAsync())
.Repeat()
.Publish()
.RefCount()
.SubscribeOn(Scheduler.Default);
}
}
有趣的是,从未调用过UnobservedTaskException处理程序。
这里抛出异常,但它被抛出在unobserved Task continuation中。在.NET 4.5及更高版本中,运行时系统将自动处理未观察到的任务异常。 Here's a good article by Stephen Toub talking about that change。
重要部分:
为了使开发人员更容易编写基于Task的异步代码,.NET 4.5更改了未观察到的异常的默认异常行为。尽管未观察到的异常仍将引发UnobservedTaskException事件(不这样做将是一个重大更改),但默认情况下该过程不会崩溃。相反,无论事件处理程序是否观察到异常,异常都会在引发事件后最终被吞噬。
SelectMany执行推迟到查询被实际“调用”为止。
代替.Subscribe()调用.ToArray()或.ToList()来执行SelectMany。那应该抛出异常。